Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | # Some simple Queue module tests, plus some failure conditions |
2 | # to ensure the Queue locks remain stable. | |
3 | import Queue | |
4 | import sys | |
5 | import threading | |
6 | import time | |
7 | ||
8 | from test.test_support import verify, TestFailed, verbose | |
9 | ||
10 | QUEUE_SIZE = 5 | |
11 | ||
12 | # A thread to run a function that unclogs a blocked Queue. | |
13 | class _TriggerThread(threading.Thread): | |
14 | def __init__(self, fn, args): | |
15 | self.fn = fn | |
16 | self.args = args | |
17 | self.startedEvent = threading.Event() | |
18 | threading.Thread.__init__(self) | |
19 | ||
20 | def run(self): | |
21 | # The sleep isn't necessary, but is intended to give the blocking | |
22 | # function in the main thread a chance at actually blocking before | |
23 | # we unclog it. But if the sleep is longer than the timeout-based | |
24 | # tests wait in their blocking functions, those tests will fail. | |
25 | # So we give them much longer timeout values compared to the | |
26 | # sleep here (I aimed at 10 seconds for blocking functions -- | |
27 | # they should never actually wait that long - they should make | |
28 | # progress as soon as we call self.fn()). | |
29 | time.sleep(0.1) | |
30 | self.startedEvent.set() | |
31 | self.fn(*self.args) | |
32 | ||
33 | # Execute a function that blocks, and in a separate thread, a function that | |
34 | # triggers the release. Returns the result of the blocking function. | |
35 | # Caution: block_func must guarantee to block until trigger_func is | |
36 | # called, and trigger_func must guarantee to change queue state so that | |
37 | # block_func can make enough progress to return. In particular, a | |
38 | # block_func that just raises an exception regardless of whether trigger_func | |
39 | # is called will lead to timing-dependent sporadic failures, and one of | |
40 | # those went rarely seen but undiagnosed for years. Now block_func | |
41 | # must be unexceptional. If block_func is supposed to raise an exception, | |
42 | # call _doExceptionalBlockingTest() instead. | |
43 | def _doBlockingTest(block_func, block_args, trigger_func, trigger_args): | |
44 | t = _TriggerThread(trigger_func, trigger_args) | |
45 | t.start() | |
46 | result = block_func(*block_args) | |
47 | # If block_func returned before our thread made the call, we failed! | |
48 | if not t.startedEvent.isSet(): | |
49 | raise TestFailed("blocking function '%r' appeared not to block" % | |
50 | block_func) | |
51 | t.join(10) # make sure the thread terminates | |
52 | if t.isAlive(): | |
53 | raise TestFailed("trigger function '%r' appeared to not return" % | |
54 | trigger_func) | |
55 | return result | |
56 | ||
57 | # Call this instead if block_func is supposed to raise an exception. | |
58 | def _doExceptionalBlockingTest(block_func, block_args, trigger_func, | |
59 | trigger_args, expected_exception_class): | |
60 | t = _TriggerThread(trigger_func, trigger_args) | |
61 | t.start() | |
62 | try: | |
63 | try: | |
64 | block_func(*block_args) | |
65 | except expected_exception_class: | |
66 | raise | |
67 | else: | |
68 | raise TestFailed("expected exception of kind %r" % | |
69 | expected_exception_class) | |
70 | finally: | |
71 | t.join(10) # make sure the thread terminates | |
72 | if t.isAlive(): | |
73 | raise TestFailed("trigger function '%r' appeared to not return" % | |
74 | trigger_func) | |
75 | if not t.startedEvent.isSet(): | |
76 | raise TestFailed("trigger thread ended but event never set") | |
77 | ||
78 | # A Queue subclass that can provoke failure at a moment's notice :) | |
79 | class FailingQueueException(Exception): | |
80 | pass | |
81 | ||
82 | class FailingQueue(Queue.Queue): | |
83 | def __init__(self, *args): | |
84 | self.fail_next_put = False | |
85 | self.fail_next_get = False | |
86 | Queue.Queue.__init__(self, *args) | |
87 | def _put(self, item): | |
88 | if self.fail_next_put: | |
89 | self.fail_next_put = False | |
90 | raise FailingQueueException, "You Lose" | |
91 | return Queue.Queue._put(self, item) | |
92 | def _get(self): | |
93 | if self.fail_next_get: | |
94 | self.fail_next_get = False | |
95 | raise FailingQueueException, "You Lose" | |
96 | return Queue.Queue._get(self) | |
97 | ||
98 | def FailingQueueTest(q): | |
99 | if not q.empty(): | |
100 | raise RuntimeError, "Call this function with an empty queue" | |
101 | for i in range(QUEUE_SIZE-1): | |
102 | q.put(i) | |
103 | # Test a failing non-blocking put. | |
104 | q.fail_next_put = True | |
105 | try: | |
106 | q.put("oops", block=0) | |
107 | raise TestFailed("The queue didn't fail when it should have") | |
108 | except FailingQueueException: | |
109 | pass | |
110 | q.fail_next_put = True | |
111 | try: | |
112 | q.put("oops", timeout=0.1) | |
113 | raise TestFailed("The queue didn't fail when it should have") | |
114 | except FailingQueueException: | |
115 | pass | |
116 | q.put("last") | |
117 | verify(q.full(), "Queue should be full") | |
118 | # Test a failing blocking put | |
119 | q.fail_next_put = True | |
120 | try: | |
121 | _doBlockingTest(q.put, ("full",), q.get, ()) | |
122 | raise TestFailed("The queue didn't fail when it should have") | |
123 | except FailingQueueException: | |
124 | pass | |
125 | # Check the Queue isn't damaged. | |
126 | # put failed, but get succeeded - re-add | |
127 | q.put("last") | |
128 | # Test a failing timeout put | |
129 | q.fail_next_put = True | |
130 | try: | |
131 | _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (), | |
132 | FailingQueueException) | |
133 | raise TestFailed("The queue didn't fail when it should have") | |
134 | except FailingQueueException: | |
135 | pass | |
136 | # Check the Queue isn't damaged. | |
137 | # put failed, but get succeeded - re-add | |
138 | q.put("last") | |
139 | verify(q.full(), "Queue should be full") | |
140 | q.get() | |
141 | verify(not q.full(), "Queue should not be full") | |
142 | q.put("last") | |
143 | verify(q.full(), "Queue should be full") | |
144 | # Test a blocking put | |
145 | _doBlockingTest( q.put, ("full",), q.get, ()) | |
146 | # Empty it | |
147 | for i in range(QUEUE_SIZE): | |
148 | q.get() | |
149 | verify(q.empty(), "Queue should be empty") | |
150 | q.put("first") | |
151 | q.fail_next_get = True | |
152 | try: | |
153 | q.get() | |
154 | raise TestFailed("The queue didn't fail when it should have") | |
155 | except FailingQueueException: | |
156 | pass | |
157 | verify(not q.empty(), "Queue should not be empty") | |
158 | q.fail_next_get = True | |
159 | try: | |
160 | q.get(timeout=0.1) | |
161 | raise TestFailed("The queue didn't fail when it should have") | |
162 | except FailingQueueException: | |
163 | pass | |
164 | verify(not q.empty(), "Queue should not be empty") | |
165 | q.get() | |
166 | verify(q.empty(), "Queue should be empty") | |
167 | q.fail_next_get = True | |
168 | try: | |
169 | _doExceptionalBlockingTest(q.get, (), q.put, ('empty',), | |
170 | FailingQueueException) | |
171 | raise TestFailed("The queue didn't fail when it should have") | |
172 | except FailingQueueException: | |
173 | pass | |
174 | # put succeeded, but get failed. | |
175 | verify(not q.empty(), "Queue should not be empty") | |
176 | q.get() | |
177 | verify(q.empty(), "Queue should be empty") | |
178 | ||
179 | def SimpleQueueTest(q): | |
180 | if not q.empty(): | |
181 | raise RuntimeError, "Call this function with an empty queue" | |
182 | # I guess we better check things actually queue correctly a little :) | |
183 | q.put(111) | |
184 | q.put(222) | |
185 | verify(q.get() == 111 and q.get() == 222, | |
186 | "Didn't seem to queue the correct data!") | |
187 | for i in range(QUEUE_SIZE-1): | |
188 | q.put(i) | |
189 | verify(not q.empty(), "Queue should not be empty") | |
190 | verify(not q.full(), "Queue should not be full") | |
191 | q.put("last") | |
192 | verify(q.full(), "Queue should be full") | |
193 | try: | |
194 | q.put("full", block=0) | |
195 | raise TestFailed("Didn't appear to block with a full queue") | |
196 | except Queue.Full: | |
197 | pass | |
198 | try: | |
199 | q.put("full", timeout=0.01) | |
200 | raise TestFailed("Didn't appear to time-out with a full queue") | |
201 | except Queue.Full: | |
202 | pass | |
203 | # Test a blocking put | |
204 | _doBlockingTest(q.put, ("full",), q.get, ()) | |
205 | _doBlockingTest(q.put, ("full", True, 10), q.get, ()) | |
206 | # Empty it | |
207 | for i in range(QUEUE_SIZE): | |
208 | q.get() | |
209 | verify(q.empty(), "Queue should be empty") | |
210 | try: | |
211 | q.get(block=0) | |
212 | raise TestFailed("Didn't appear to block with an empty queue") | |
213 | except Queue.Empty: | |
214 | pass | |
215 | try: | |
216 | q.get(timeout=0.01) | |
217 | raise TestFailed("Didn't appear to time-out with an empty queue") | |
218 | except Queue.Empty: | |
219 | pass | |
220 | # Test a blocking get | |
221 | _doBlockingTest(q.get, (), q.put, ('empty',)) | |
222 | _doBlockingTest(q.get, (True, 10), q.put, ('empty',)) | |
223 | ||
224 | def test(): | |
225 | q = Queue.Queue(QUEUE_SIZE) | |
226 | # Do it a couple of times on the same queue | |
227 | SimpleQueueTest(q) | |
228 | SimpleQueueTest(q) | |
229 | if verbose: | |
230 | print "Simple Queue tests seemed to work" | |
231 | q = FailingQueue(QUEUE_SIZE) | |
232 | FailingQueueTest(q) | |
233 | FailingQueueTest(q) | |
234 | if verbose: | |
235 | print "Failing Queue tests seemed to work" | |
236 | ||
237 | test() |