Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """PyUnit testing that threads honor our signal semantics""" |
2 | ||
3 | import unittest | |
4 | import thread | |
5 | import signal | |
6 | import os | |
7 | import sys | |
8 | from test.test_support import run_unittest, TestSkipped | |
9 | ||
10 | if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos': | |
11 | raise TestSkipped, "Can't test signal on %s" % sys.platform | |
12 | ||
13 | process_pid = os.getpid() | |
14 | signalled_all=thread.allocate_lock() | |
15 | ||
16 | ||
17 | def registerSignals((for_usr1, for_usr2, for_alrm)): | |
18 | usr1 = signal.signal(signal.SIGUSR1, for_usr1) | |
19 | usr2 = signal.signal(signal.SIGUSR2, for_usr2) | |
20 | alrm = signal.signal(signal.SIGALRM, for_alrm) | |
21 | return usr1, usr2, alrm | |
22 | ||
23 | ||
24 | # The signal handler. Just note that the signal occured and | |
25 | # from who. | |
26 | def handle_signals(sig,frame): | |
27 | signal_blackboard[sig]['tripped'] += 1 | |
28 | signal_blackboard[sig]['tripped_by'] = thread.get_ident() | |
29 | ||
30 | # a function that will be spawned as a separate thread. | |
31 | def send_signals(): | |
32 | os.kill(process_pid, signal.SIGUSR1) | |
33 | os.kill(process_pid, signal.SIGUSR2) | |
34 | signalled_all.release() | |
35 | ||
36 | class ThreadSignals(unittest.TestCase): | |
37 | """Test signal handling semantics of threads. | |
38 | We spawn a thread, have the thread send two signals, and | |
39 | wait for it to finish. Check that we got both signals | |
40 | and that they were run by the main thread. | |
41 | """ | |
42 | def test_signals(self): | |
43 | signalled_all.acquire() | |
44 | self.spawnSignallingThread() | |
45 | signalled_all.acquire() | |
46 | # the signals that we asked the kernel to send | |
47 | # will come back, but we don't know when. | |
48 | # (it might even be after the thread exits | |
49 | # and might be out of order.) If we haven't seen | |
50 | # the signals yet, send yet another signal and | |
51 | # wait for it return. | |
52 | if signal_blackboard[signal.SIGUSR2]['tripped'] == 0 \ | |
53 | or signal_blackboard[signal.SIGUSR2]['tripped'] == 0: | |
54 | signal.alarm(1) | |
55 | signal.pause() | |
56 | signal.alarm(0) | |
57 | ||
58 | self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) | |
59 | self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], | |
60 | thread.get_ident()) | |
61 | self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) | |
62 | self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], | |
63 | thread.get_ident()) | |
64 | signalled_all.release() | |
65 | ||
66 | def spawnSignallingThread(self): | |
67 | thread.start_new_thread(send_signals, ()) | |
68 | ||
69 | ||
70 | def test_main(): | |
71 | global signal_blackboard | |
72 | ||
73 | signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, | |
74 | signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, | |
75 | signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } | |
76 | ||
77 | oldsigs = registerSignals((handle_signals, handle_signals, handle_signals)) | |
78 | try: | |
79 | run_unittest(ThreadSignals) | |
80 | finally: | |
81 | registerSignals(oldsigs) | |
82 | ||
83 | if __name__ == '__main__': | |
84 | test_main() |