Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """Generic thread tests. |
2 | ||
3 | Meant to be used by dummy_thread and thread. To allow for different modules | |
4 | to be used, test_main() can be called with the module to use as the thread | |
5 | implementation as its sole argument. | |
6 | ||
7 | """ | |
8 | import dummy_thread as _thread | |
9 | import time | |
10 | import Queue | |
11 | import random | |
12 | import unittest | |
13 | from test import test_support | |
14 | ||
15 | DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as | |
16 | # the 'thread' module. | |
17 | ||
18 | class LockTests(unittest.TestCase): | |
19 | """Test lock objects.""" | |
20 | ||
21 | def setUp(self): | |
22 | # Create a lock | |
23 | self.lock = _thread.allocate_lock() | |
24 | ||
25 | def test_initlock(self): | |
26 | #Make sure locks start locked | |
27 | self.failUnless(not self.lock.locked(), | |
28 | "Lock object is not initialized unlocked.") | |
29 | ||
30 | def test_release(self): | |
31 | # Test self.lock.release() | |
32 | self.lock.acquire() | |
33 | self.lock.release() | |
34 | self.failUnless(not self.lock.locked(), | |
35 | "Lock object did not release properly.") | |
36 | ||
37 | def test_improper_release(self): | |
38 | #Make sure release of an unlocked thread raises _thread.error | |
39 | self.failUnlessRaises(_thread.error, self.lock.release) | |
40 | ||
41 | def test_cond_acquire_success(self): | |
42 | #Make sure the conditional acquiring of the lock works. | |
43 | self.failUnless(self.lock.acquire(0), | |
44 | "Conditional acquiring of the lock failed.") | |
45 | ||
46 | def test_cond_acquire_fail(self): | |
47 | #Test acquiring locked lock returns False | |
48 | self.lock.acquire(0) | |
49 | self.failUnless(not self.lock.acquire(0), | |
50 | "Conditional acquiring of a locked lock incorrectly " | |
51 | "succeeded.") | |
52 | ||
53 | def test_uncond_acquire_success(self): | |
54 | #Make sure unconditional acquiring of a lock works. | |
55 | self.lock.acquire() | |
56 | self.failUnless(self.lock.locked(), | |
57 | "Uncondional locking failed.") | |
58 | ||
59 | def test_uncond_acquire_return_val(self): | |
60 | #Make sure that an unconditional locking returns True. | |
61 | self.failUnless(self.lock.acquire(1) is True, | |
62 | "Unconditional locking did not return True.") | |
63 | ||
64 | def test_uncond_acquire_blocking(self): | |
65 | #Make sure that unconditional acquiring of a locked lock blocks. | |
66 | def delay_unlock(to_unlock, delay): | |
67 | """Hold on to lock for a set amount of time before unlocking.""" | |
68 | time.sleep(delay) | |
69 | to_unlock.release() | |
70 | ||
71 | self.lock.acquire() | |
72 | start_time = int(time.time()) | |
73 | _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) | |
74 | if test_support.verbose: | |
75 | ||
76 | print "*** Waiting for thread to release the lock "\ | |
77 | "(approx. %s sec.) ***" % DELAY | |
78 | self.lock.acquire() | |
79 | end_time = int(time.time()) | |
80 | if test_support.verbose: | |
81 | print "done" | |
82 | self.failUnless((end_time - start_time) >= DELAY, | |
83 | "Blocking by unconditional acquiring failed.") | |
84 | ||
85 | class MiscTests(unittest.TestCase): | |
86 | """Miscellaneous tests.""" | |
87 | ||
88 | def test_exit(self): | |
89 | #Make sure _thread.exit() raises SystemExit | |
90 | self.failUnlessRaises(SystemExit, _thread.exit) | |
91 | ||
92 | def test_ident(self): | |
93 | #Test sanity of _thread.get_ident() | |
94 | self.failUnless(isinstance(_thread.get_ident(), int), | |
95 | "_thread.get_ident() returned a non-integer") | |
96 | self.failUnless(_thread.get_ident() != 0, | |
97 | "_thread.get_ident() returned 0") | |
98 | ||
99 | def test_LockType(self): | |
100 | #Make sure _thread.LockType is the same type as _thread.allocate_locke() | |
101 | self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType), | |
102 | "_thread.LockType is not an instance of what is " | |
103 | "returned by _thread.allocate_lock()") | |
104 | ||
105 | def test_interrupt_main(self): | |
106 | #Calling start_new_thread with a function that executes interrupt_main | |
107 | # should raise KeyboardInterrupt upon completion. | |
108 | def call_interrupt(): | |
109 | _thread.interrupt_main() | |
110 | self.failUnlessRaises(KeyboardInterrupt, _thread.start_new_thread, | |
111 | call_interrupt, tuple()) | |
112 | ||
113 | def test_interrupt_in_main(self): | |
114 | # Make sure that if interrupt_main is called in main threat that | |
115 | # KeyboardInterrupt is raised instantly. | |
116 | self.failUnlessRaises(KeyboardInterrupt, _thread.interrupt_main) | |
117 | ||
118 | class ThreadTests(unittest.TestCase): | |
119 | """Test thread creation.""" | |
120 | ||
121 | def test_arg_passing(self): | |
122 | #Make sure that parameter passing works. | |
123 | def arg_tester(queue, arg1=False, arg2=False): | |
124 | """Use to test _thread.start_new_thread() passes args properly.""" | |
125 | queue.put((arg1, arg2)) | |
126 | ||
127 | testing_queue = Queue.Queue(1) | |
128 | _thread.start_new_thread(arg_tester, (testing_queue, True, True)) | |
129 | result = testing_queue.get() | |
130 | self.failUnless(result[0] and result[1], | |
131 | "Argument passing for thread creation using tuple failed") | |
132 | _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, | |
133 | 'arg1':True, 'arg2':True}) | |
134 | result = testing_queue.get() | |
135 | self.failUnless(result[0] and result[1], | |
136 | "Argument passing for thread creation using kwargs failed") | |
137 | _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) | |
138 | result = testing_queue.get() | |
139 | self.failUnless(result[0] and result[1], | |
140 | "Argument passing for thread creation using both tuple" | |
141 | " and kwargs failed") | |
142 | ||
143 | def test_multi_creation(self): | |
144 | #Make sure multiple threads can be created. | |
145 | def queue_mark(queue, delay): | |
146 | """Wait for ``delay`` seconds and then put something into ``queue``""" | |
147 | time.sleep(delay) | |
148 | queue.put(_thread.get_ident()) | |
149 | ||
150 | thread_count = 5 | |
151 | testing_queue = Queue.Queue(thread_count) | |
152 | if test_support.verbose: | |
153 | ||
154 | print "*** Testing multiple thread creation "\ | |
155 | "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count) | |
156 | for count in xrange(thread_count): | |
157 | if DELAY: | |
158 | local_delay = round(random.random(), 1) | |
159 | else: | |
160 | local_delay = 0 | |
161 | _thread.start_new_thread(queue_mark, | |
162 | (testing_queue, local_delay)) | |
163 | time.sleep(DELAY) | |
164 | if test_support.verbose: | |
165 | print 'done' | |
166 | self.failUnless(testing_queue.qsize() == thread_count, | |
167 | "Not all %s threads executed properly after %s sec." % | |
168 | (thread_count, DELAY)) | |
169 | ||
170 | def test_main(imported_module=None): | |
171 | global _thread, DELAY | |
172 | if imported_module: | |
173 | _thread = imported_module | |
174 | DELAY = 2 | |
175 | if test_support.verbose: | |
176 | ||
177 | print "*** Using %s as _thread module ***" % _thread | |
178 | test_support.run_unittest(LockTests, MiscTests, ThreadTests) | |
179 | ||
180 | if __name__ == '__main__': | |
181 | test_main() |