Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | """Thread module emulating a subset of Java's threading model.""" |
2 | ||
3 | import sys as _sys | |
4 | ||
5 | try: | |
6 | import thread | |
7 | except ImportError: | |
8 | del _sys.modules[__name__] | |
9 | raise | |
10 | ||
11 | from time import time as _time, sleep as _sleep | |
12 | from traceback import format_exc as _format_exc | |
13 | from collections import deque | |
14 | ||
15 | # Rename some stuff so "from threading import *" is safe | |
16 | __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', | |
17 | 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', | |
18 | 'Timer', 'setprofile', 'settrace', 'local'] | |
19 | ||
20 | _start_new_thread = thread.start_new_thread | |
21 | _allocate_lock = thread.allocate_lock | |
22 | _get_ident = thread.get_ident | |
23 | ThreadError = thread.error | |
24 | del thread | |
25 | ||
26 | ||
27 | # Debug support (adapted from ihooks.py). | |
28 | # All the major classes here derive from _Verbose. We force that to | |
29 | # be a new-style class so that all the major classes here are new-style. | |
30 | # This helps debugging (type(instance) is more revealing for instances | |
31 | # of new-style classes). | |
32 | ||
33 | _VERBOSE = False | |
34 | ||
35 | if __debug__: | |
36 | ||
37 | class _Verbose(object): | |
38 | ||
39 | def __init__(self, verbose=None): | |
40 | if verbose is None: | |
41 | verbose = _VERBOSE | |
42 | self.__verbose = verbose | |
43 | ||
44 | def _note(self, format, *args): | |
45 | if self.__verbose: | |
46 | format = format % args | |
47 | format = "%s: %s\n" % ( | |
48 | currentThread().getName(), format) | |
49 | _sys.stderr.write(format) | |
50 | ||
51 | else: | |
52 | # Disable this when using "python -O" | |
53 | class _Verbose(object): | |
54 | def __init__(self, verbose=None): | |
55 | pass | |
56 | def _note(self, *args): | |
57 | pass | |
58 | ||
59 | # Support for profile and trace hooks | |
60 | ||
61 | _profile_hook = None | |
62 | _trace_hook = None | |
63 | ||
64 | def setprofile(func): | |
65 | global _profile_hook | |
66 | _profile_hook = func | |
67 | ||
68 | def settrace(func): | |
69 | global _trace_hook | |
70 | _trace_hook = func | |
71 | ||
72 | # Synchronization classes | |
73 | ||
74 | Lock = _allocate_lock | |
75 | ||
76 | def RLock(*args, **kwargs): | |
77 | return _RLock(*args, **kwargs) | |
78 | ||
79 | class _RLock(_Verbose): | |
80 | ||
81 | def __init__(self, verbose=None): | |
82 | _Verbose.__init__(self, verbose) | |
83 | self.__block = _allocate_lock() | |
84 | self.__owner = None | |
85 | self.__count = 0 | |
86 | ||
87 | def __repr__(self): | |
88 | return "<%s(%s, %d)>" % ( | |
89 | self.__class__.__name__, | |
90 | self.__owner and self.__owner.getName(), | |
91 | self.__count) | |
92 | ||
93 | def acquire(self, blocking=1): | |
94 | me = currentThread() | |
95 | if self.__owner is me: | |
96 | self.__count = self.__count + 1 | |
97 | if __debug__: | |
98 | self._note("%s.acquire(%s): recursive success", self, blocking) | |
99 | return 1 | |
100 | rc = self.__block.acquire(blocking) | |
101 | if rc: | |
102 | self.__owner = me | |
103 | self.__count = 1 | |
104 | if __debug__: | |
105 | self._note("%s.acquire(%s): initial success", self, blocking) | |
106 | else: | |
107 | if __debug__: | |
108 | self._note("%s.acquire(%s): failure", self, blocking) | |
109 | return rc | |
110 | ||
111 | def release(self): | |
112 | me = currentThread() | |
113 | assert self.__owner is me, "release() of un-acquire()d lock" | |
114 | self.__count = count = self.__count - 1 | |
115 | if not count: | |
116 | self.__owner = None | |
117 | self.__block.release() | |
118 | if __debug__: | |
119 | self._note("%s.release(): final release", self) | |
120 | else: | |
121 | if __debug__: | |
122 | self._note("%s.release(): non-final release", self) | |
123 | ||
124 | # Internal methods used by condition variables | |
125 | ||
126 | def _acquire_restore(self, (count, owner)): | |
127 | self.__block.acquire() | |
128 | self.__count = count | |
129 | self.__owner = owner | |
130 | if __debug__: | |
131 | self._note("%s._acquire_restore()", self) | |
132 | ||
133 | def _release_save(self): | |
134 | if __debug__: | |
135 | self._note("%s._release_save()", self) | |
136 | count = self.__count | |
137 | self.__count = 0 | |
138 | owner = self.__owner | |
139 | self.__owner = None | |
140 | self.__block.release() | |
141 | return (count, owner) | |
142 | ||
143 | def _is_owned(self): | |
144 | return self.__owner is currentThread() | |
145 | ||
146 | ||
147 | def Condition(*args, **kwargs): | |
148 | return _Condition(*args, **kwargs) | |
149 | ||
150 | class _Condition(_Verbose): | |
151 | ||
152 | def __init__(self, lock=None, verbose=None): | |
153 | _Verbose.__init__(self, verbose) | |
154 | if lock is None: | |
155 | lock = RLock() | |
156 | self.__lock = lock | |
157 | # Export the lock's acquire() and release() methods | |
158 | self.acquire = lock.acquire | |
159 | self.release = lock.release | |
160 | # If the lock defines _release_save() and/or _acquire_restore(), | |
161 | # these override the default implementations (which just call | |
162 | # release() and acquire() on the lock). Ditto for _is_owned(). | |
163 | try: | |
164 | self._release_save = lock._release_save | |
165 | except AttributeError: | |
166 | pass | |
167 | try: | |
168 | self._acquire_restore = lock._acquire_restore | |
169 | except AttributeError: | |
170 | pass | |
171 | try: | |
172 | self._is_owned = lock._is_owned | |
173 | except AttributeError: | |
174 | pass | |
175 | self.__waiters = [] | |
176 | ||
177 | def __repr__(self): | |
178 | return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) | |
179 | ||
180 | def _release_save(self): | |
181 | self.__lock.release() # No state to save | |
182 | ||
183 | def _acquire_restore(self, x): | |
184 | self.__lock.acquire() # Ignore saved state | |
185 | ||
186 | def _is_owned(self): | |
187 | # Return True if lock is owned by currentThread. | |
188 | # This method is called only if __lock doesn't have _is_owned(). | |
189 | if self.__lock.acquire(0): | |
190 | self.__lock.release() | |
191 | return False | |
192 | else: | |
193 | return True | |
194 | ||
195 | def wait(self, timeout=None): | |
196 | assert self._is_owned(), "wait() of un-acquire()d lock" | |
197 | waiter = _allocate_lock() | |
198 | waiter.acquire() | |
199 | self.__waiters.append(waiter) | |
200 | saved_state = self._release_save() | |
201 | try: # restore state no matter what (e.g., KeyboardInterrupt) | |
202 | if timeout is None: | |
203 | waiter.acquire() | |
204 | if __debug__: | |
205 | self._note("%s.wait(): got it", self) | |
206 | else: | |
207 | # Balancing act: We can't afford a pure busy loop, so we | |
208 | # have to sleep; but if we sleep the whole timeout time, | |
209 | # we'll be unresponsive. The scheme here sleeps very | |
210 | # little at first, longer as time goes on, but never longer | |
211 | # than 20 times per second (or the timeout time remaining). | |
212 | endtime = _time() + timeout | |
213 | delay = 0.0005 # 500 us -> initial delay of 1 ms | |
214 | while True: | |
215 | gotit = waiter.acquire(0) | |
216 | if gotit: | |
217 | break | |
218 | remaining = endtime - _time() | |
219 | if remaining <= 0: | |
220 | break | |
221 | delay = min(delay * 2, remaining, .05) | |
222 | _sleep(delay) | |
223 | if not gotit: | |
224 | if __debug__: | |
225 | self._note("%s.wait(%s): timed out", self, timeout) | |
226 | try: | |
227 | self.__waiters.remove(waiter) | |
228 | except ValueError: | |
229 | pass | |
230 | else: | |
231 | if __debug__: | |
232 | self._note("%s.wait(%s): got it", self, timeout) | |
233 | finally: | |
234 | self._acquire_restore(saved_state) | |
235 | ||
236 | def notify(self, n=1): | |
237 | assert self._is_owned(), "notify() of un-acquire()d lock" | |
238 | __waiters = self.__waiters | |
239 | waiters = __waiters[:n] | |
240 | if not waiters: | |
241 | if __debug__: | |
242 | self._note("%s.notify(): no waiters", self) | |
243 | return | |
244 | self._note("%s.notify(): notifying %d waiter%s", self, n, | |
245 | n!=1 and "s" or "") | |
246 | for waiter in waiters: | |
247 | waiter.release() | |
248 | try: | |
249 | __waiters.remove(waiter) | |
250 | except ValueError: | |
251 | pass | |
252 | ||
253 | def notifyAll(self): | |
254 | self.notify(len(self.__waiters)) | |
255 | ||
256 | ||
257 | def Semaphore(*args, **kwargs): | |
258 | return _Semaphore(*args, **kwargs) | |
259 | ||
260 | class _Semaphore(_Verbose): | |
261 | ||
262 | # After Tim Peters' semaphore class, but not quite the same (no maximum) | |
263 | ||
264 | def __init__(self, value=1, verbose=None): | |
265 | assert value >= 0, "Semaphore initial value must be >= 0" | |
266 | _Verbose.__init__(self, verbose) | |
267 | self.__cond = Condition(Lock()) | |
268 | self.__value = value | |
269 | ||
270 | def acquire(self, blocking=1): | |
271 | rc = False | |
272 | self.__cond.acquire() | |
273 | while self.__value == 0: | |
274 | if not blocking: | |
275 | break | |
276 | if __debug__: | |
277 | self._note("%s.acquire(%s): blocked waiting, value=%s", | |
278 | self, blocking, self.__value) | |
279 | self.__cond.wait() | |
280 | else: | |
281 | self.__value = self.__value - 1 | |
282 | if __debug__: | |
283 | self._note("%s.acquire: success, value=%s", | |
284 | self, self.__value) | |
285 | rc = True | |
286 | self.__cond.release() | |
287 | return rc | |
288 | ||
289 | def release(self): | |
290 | self.__cond.acquire() | |
291 | self.__value = self.__value + 1 | |
292 | if __debug__: | |
293 | self._note("%s.release: success, value=%s", | |
294 | self, self.__value) | |
295 | self.__cond.notify() | |
296 | self.__cond.release() | |
297 | ||
298 | ||
299 | def BoundedSemaphore(*args, **kwargs): | |
300 | return _BoundedSemaphore(*args, **kwargs) | |
301 | ||
302 | class _BoundedSemaphore(_Semaphore): | |
303 | """Semaphore that checks that # releases is <= # acquires""" | |
304 | def __init__(self, value=1, verbose=None): | |
305 | _Semaphore.__init__(self, value, verbose) | |
306 | self._initial_value = value | |
307 | ||
308 | def release(self): | |
309 | if self._Semaphore__value >= self._initial_value: | |
310 | raise ValueError, "Semaphore released too many times" | |
311 | return _Semaphore.release(self) | |
312 | ||
313 | ||
314 | def Event(*args, **kwargs): | |
315 | return _Event(*args, **kwargs) | |
316 | ||
317 | class _Event(_Verbose): | |
318 | ||
319 | # After Tim Peters' event class (without is_posted()) | |
320 | ||
321 | def __init__(self, verbose=None): | |
322 | _Verbose.__init__(self, verbose) | |
323 | self.__cond = Condition(Lock()) | |
324 | self.__flag = False | |
325 | ||
326 | def isSet(self): | |
327 | return self.__flag | |
328 | ||
329 | def set(self): | |
330 | self.__cond.acquire() | |
331 | try: | |
332 | self.__flag = True | |
333 | self.__cond.notifyAll() | |
334 | finally: | |
335 | self.__cond.release() | |
336 | ||
337 | def clear(self): | |
338 | self.__cond.acquire() | |
339 | try: | |
340 | self.__flag = False | |
341 | finally: | |
342 | self.__cond.release() | |
343 | ||
344 | def wait(self, timeout=None): | |
345 | self.__cond.acquire() | |
346 | try: | |
347 | if not self.__flag: | |
348 | self.__cond.wait(timeout) | |
349 | finally: | |
350 | self.__cond.release() | |
351 | ||
352 | # Helper to generate new thread names | |
353 | _counter = 0 | |
354 | def _newname(template="Thread-%d"): | |
355 | global _counter | |
356 | _counter = _counter + 1 | |
357 | return template % _counter | |
358 | ||
359 | # Active thread administration | |
360 | _active_limbo_lock = _allocate_lock() | |
361 | _active = {} | |
362 | _limbo = {} | |
363 | ||
364 | ||
365 | # Main class for threads | |
366 | ||
367 | class Thread(_Verbose): | |
368 | ||
369 | __initialized = False | |
370 | # Need to store a reference to sys.exc_info for printing | |
371 | # out exceptions when a thread tries to use a global var. during interp. | |
372 | # shutdown and thus raises an exception about trying to perform some | |
373 | # operation on/with a NoneType | |
374 | __exc_info = _sys.exc_info | |
375 | ||
376 | def __init__(self, group=None, target=None, name=None, | |
377 | args=(), kwargs={}, verbose=None): | |
378 | assert group is None, "group argument must be None for now" | |
379 | _Verbose.__init__(self, verbose) | |
380 | self.__target = target | |
381 | self.__name = str(name or _newname()) | |
382 | self.__args = args | |
383 | self.__kwargs = kwargs | |
384 | self.__daemonic = self._set_daemon() | |
385 | self.__started = False | |
386 | self.__stopped = False | |
387 | self.__block = Condition(Lock()) | |
388 | self.__initialized = True | |
389 | # sys.stderr is not stored in the class like | |
390 | # sys.exc_info since it can be changed between instances | |
391 | self.__stderr = _sys.stderr | |
392 | ||
393 | def _set_daemon(self): | |
394 | # Overridden in _MainThread and _DummyThread | |
395 | return currentThread().isDaemon() | |
396 | ||
397 | def __repr__(self): | |
398 | assert self.__initialized, "Thread.__init__() was not called" | |
399 | status = "initial" | |
400 | if self.__started: | |
401 | status = "started" | |
402 | if self.__stopped: | |
403 | status = "stopped" | |
404 | if self.__daemonic: | |
405 | status = status + " daemon" | |
406 | return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) | |
407 | ||
408 | def start(self): | |
409 | assert self.__initialized, "Thread.__init__() not called" | |
410 | assert not self.__started, "thread already started" | |
411 | if __debug__: | |
412 | self._note("%s.start(): starting thread", self) | |
413 | _active_limbo_lock.acquire() | |
414 | _limbo[self] = self | |
415 | _active_limbo_lock.release() | |
416 | _start_new_thread(self.__bootstrap, ()) | |
417 | self.__started = True | |
418 | _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) | |
419 | ||
420 | def run(self): | |
421 | if self.__target: | |
422 | self.__target(*self.__args, **self.__kwargs) | |
423 | ||
424 | def __bootstrap(self): | |
425 | try: | |
426 | self.__started = True | |
427 | _active_limbo_lock.acquire() | |
428 | _active[_get_ident()] = self | |
429 | del _limbo[self] | |
430 | _active_limbo_lock.release() | |
431 | if __debug__: | |
432 | self._note("%s.__bootstrap(): thread started", self) | |
433 | ||
434 | if _trace_hook: | |
435 | self._note("%s.__bootstrap(): registering trace hook", self) | |
436 | _sys.settrace(_trace_hook) | |
437 | if _profile_hook: | |
438 | self._note("%s.__bootstrap(): registering profile hook", self) | |
439 | _sys.setprofile(_profile_hook) | |
440 | ||
441 | try: | |
442 | self.run() | |
443 | except SystemExit: | |
444 | if __debug__: | |
445 | self._note("%s.__bootstrap(): raised SystemExit", self) | |
446 | except: | |
447 | if __debug__: | |
448 | self._note("%s.__bootstrap(): unhandled exception", self) | |
449 | # If sys.stderr is no more (most likely from interpreter | |
450 | # shutdown) use self.__stderr. Otherwise still use sys (as in | |
451 | # _sys) in case sys.stderr was redefined since the creation of | |
452 | # self. | |
453 | if _sys: | |
454 | _sys.stderr.write("Exception in thread %s:\n%s\n" % | |
455 | (self.getName(), _format_exc())) | |
456 | else: | |
457 | # Do the best job possible w/o a huge amt. of code to | |
458 | # approximate a traceback (code ideas from | |
459 | # Lib/traceback.py) | |
460 | exc_type, exc_value, exc_tb = self.__exc_info() | |
461 | try: | |
462 | print>>self.__stderr, ( | |
463 | "Exception in thread " + self.getName() + | |
464 | " (most likely raised during interpreter shutdown):") | |
465 | print>>self.__stderr, ( | |
466 | "Traceback (most recent call last):") | |
467 | while exc_tb: | |
468 | print>>self.__stderr, ( | |
469 | ' File "%s", line %s, in %s' % | |
470 | (exc_tb.tb_frame.f_code.co_filename, | |
471 | exc_tb.tb_lineno, | |
472 | exc_tb.tb_frame.f_code.co_name)) | |
473 | exc_tb = exc_tb.tb_next | |
474 | print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) | |
475 | # Make sure that exc_tb gets deleted since it is a memory | |
476 | # hog; deleting everything else is just for thoroughness | |
477 | finally: | |
478 | del exc_type, exc_value, exc_tb | |
479 | else: | |
480 | if __debug__: | |
481 | self._note("%s.__bootstrap(): normal return", self) | |
482 | finally: | |
483 | self.__stop() | |
484 | try: | |
485 | self.__delete() | |
486 | except: | |
487 | pass | |
488 | ||
489 | def __stop(self): | |
490 | self.__block.acquire() | |
491 | self.__stopped = True | |
492 | self.__block.notifyAll() | |
493 | self.__block.release() | |
494 | ||
495 | def __delete(self): | |
496 | "Remove current thread from the dict of currently running threads." | |
497 | ||
498 | # Notes about running with dummy_thread: | |
499 | # | |
500 | # Must take care to not raise an exception if dummy_thread is being | |
501 | # used (and thus this module is being used as an instance of | |
502 | # dummy_threading). dummy_thread.get_ident() always returns -1 since | |
503 | # there is only one thread if dummy_thread is being used. Thus | |
504 | # len(_active) is always <= 1 here, and any Thread instance created | |
505 | # overwrites the (if any) thread currently registered in _active. | |
506 | # | |
507 | # An instance of _MainThread is always created by 'threading'. This | |
508 | # gets overwritten the instant an instance of Thread is created; both | |
509 | # threads return -1 from dummy_thread.get_ident() and thus have the | |
510 | # same key in the dict. So when the _MainThread instance created by | |
511 | # 'threading' tries to clean itself up when atexit calls this method | |
512 | # it gets a KeyError if another Thread instance was created. | |
513 | # | |
514 | # This all means that KeyError from trying to delete something from | |
515 | # _active if dummy_threading is being used is a red herring. But | |
516 | # since it isn't if dummy_threading is *not* being used then don't | |
517 | # hide the exception. | |
518 | ||
519 | _active_limbo_lock.acquire() | |
520 | try: | |
521 | try: | |
522 | del _active[_get_ident()] | |
523 | except KeyError: | |
524 | if 'dummy_threading' not in _sys.modules: | |
525 | raise | |
526 | finally: | |
527 | _active_limbo_lock.release() | |
528 | ||
529 | def join(self, timeout=None): | |
530 | assert self.__initialized, "Thread.__init__() not called" | |
531 | assert self.__started, "cannot join thread before it is started" | |
532 | assert self is not currentThread(), "cannot join current thread" | |
533 | if __debug__: | |
534 | if not self.__stopped: | |
535 | self._note("%s.join(): waiting until thread stops", self) | |
536 | self.__block.acquire() | |
537 | if timeout is None: | |
538 | while not self.__stopped: | |
539 | self.__block.wait() | |
540 | if __debug__: | |
541 | self._note("%s.join(): thread stopped", self) | |
542 | else: | |
543 | deadline = _time() + timeout | |
544 | while not self.__stopped: | |
545 | delay = deadline - _time() | |
546 | if delay <= 0: | |
547 | if __debug__: | |
548 | self._note("%s.join(): timed out", self) | |
549 | break | |
550 | self.__block.wait(delay) | |
551 | else: | |
552 | if __debug__: | |
553 | self._note("%s.join(): thread stopped", self) | |
554 | self.__block.release() | |
555 | ||
556 | def getName(self): | |
557 | assert self.__initialized, "Thread.__init__() not called" | |
558 | return self.__name | |
559 | ||
560 | def setName(self, name): | |
561 | assert self.__initialized, "Thread.__init__() not called" | |
562 | self.__name = str(name) | |
563 | ||
564 | def isAlive(self): | |
565 | assert self.__initialized, "Thread.__init__() not called" | |
566 | return self.__started and not self.__stopped | |
567 | ||
568 | def isDaemon(self): | |
569 | assert self.__initialized, "Thread.__init__() not called" | |
570 | return self.__daemonic | |
571 | ||
572 | def setDaemon(self, daemonic): | |
573 | assert self.__initialized, "Thread.__init__() not called" | |
574 | assert not self.__started, "cannot set daemon status of active thread" | |
575 | self.__daemonic = daemonic | |
576 | ||
577 | # The timer class was contributed by Itamar Shtull-Trauring | |
578 | ||
579 | def Timer(*args, **kwargs): | |
580 | return _Timer(*args, **kwargs) | |
581 | ||
582 | class _Timer(Thread): | |
583 | """Call a function after a specified number of seconds: | |
584 | ||
585 | t = Timer(30.0, f, args=[], kwargs={}) | |
586 | t.start() | |
587 | t.cancel() # stop the timer's action if it's still waiting | |
588 | """ | |
589 | ||
590 | def __init__(self, interval, function, args=[], kwargs={}): | |
591 | Thread.__init__(self) | |
592 | self.interval = interval | |
593 | self.function = function | |
594 | self.args = args | |
595 | self.kwargs = kwargs | |
596 | self.finished = Event() | |
597 | ||
598 | def cancel(self): | |
599 | """Stop the timer if it hasn't finished yet""" | |
600 | self.finished.set() | |
601 | ||
602 | def run(self): | |
603 | self.finished.wait(self.interval) | |
604 | if not self.finished.isSet(): | |
605 | self.function(*self.args, **self.kwargs) | |
606 | self.finished.set() | |
607 | ||
608 | # Special thread class to represent the main thread | |
609 | # This is garbage collected through an exit handler | |
610 | ||
611 | class _MainThread(Thread): | |
612 | ||
613 | def __init__(self): | |
614 | Thread.__init__(self, name="MainThread") | |
615 | self._Thread__started = True | |
616 | _active_limbo_lock.acquire() | |
617 | _active[_get_ident()] = self | |
618 | _active_limbo_lock.release() | |
619 | import atexit | |
620 | atexit.register(self.__exitfunc) | |
621 | ||
622 | def _set_daemon(self): | |
623 | return False | |
624 | ||
625 | def __exitfunc(self): | |
626 | self._Thread__stop() | |
627 | t = _pickSomeNonDaemonThread() | |
628 | if t: | |
629 | if __debug__: | |
630 | self._note("%s: waiting for other threads", self) | |
631 | while t: | |
632 | t.join() | |
633 | t = _pickSomeNonDaemonThread() | |
634 | if __debug__: | |
635 | self._note("%s: exiting", self) | |
636 | self._Thread__delete() | |
637 | ||
638 | def _pickSomeNonDaemonThread(): | |
639 | for t in enumerate(): | |
640 | if not t.isDaemon() and t.isAlive(): | |
641 | return t | |
642 | return None | |
643 | ||
644 | ||
645 | # Dummy thread class to represent threads not started here. | |
646 | # These aren't garbage collected when they die, | |
647 | # nor can they be waited for. | |
648 | # Their purpose is to return *something* from currentThread(). | |
649 | # They are marked as daemon threads so we won't wait for them | |
650 | # when we exit (conform previous semantics). | |
651 | ||
652 | class _DummyThread(Thread): | |
653 | ||
654 | def __init__(self): | |
655 | Thread.__init__(self, name=_newname("Dummy-%d")) | |
656 | self._Thread__started = True | |
657 | _active_limbo_lock.acquire() | |
658 | _active[_get_ident()] = self | |
659 | _active_limbo_lock.release() | |
660 | ||
661 | def _set_daemon(self): | |
662 | return True | |
663 | ||
664 | def join(self, timeout=None): | |
665 | assert False, "cannot join a dummy thread" | |
666 | ||
667 | ||
668 | # Global API functions | |
669 | ||
670 | def currentThread(): | |
671 | try: | |
672 | return _active[_get_ident()] | |
673 | except KeyError: | |
674 | ##print "currentThread(): no current thread for", _get_ident() | |
675 | return _DummyThread() | |
676 | ||
677 | def activeCount(): | |
678 | _active_limbo_lock.acquire() | |
679 | count = len(_active) + len(_limbo) | |
680 | _active_limbo_lock.release() | |
681 | return count | |
682 | ||
683 | def enumerate(): | |
684 | _active_limbo_lock.acquire() | |
685 | active = _active.values() + _limbo.values() | |
686 | _active_limbo_lock.release() | |
687 | return active | |
688 | ||
689 | # Create the main thread object | |
690 | ||
691 | _MainThread() | |
692 | ||
693 | # get thread-local implementation, either from the thread | |
694 | # module, or from the python fallback | |
695 | ||
696 | try: | |
697 | from thread import _local as local | |
698 | except ImportError: | |
699 | from _threading_local import local | |
700 | ||
701 | ||
702 | # Self-test code | |
703 | ||
704 | def _test(): | |
705 | ||
706 | class BoundedQueue(_Verbose): | |
707 | ||
708 | def __init__(self, limit): | |
709 | _Verbose.__init__(self) | |
710 | self.mon = RLock() | |
711 | self.rc = Condition(self.mon) | |
712 | self.wc = Condition(self.mon) | |
713 | self.limit = limit | |
714 | self.queue = deque() | |
715 | ||
716 | def put(self, item): | |
717 | self.mon.acquire() | |
718 | while len(self.queue) >= self.limit: | |
719 | self._note("put(%s): queue full", item) | |
720 | self.wc.wait() | |
721 | self.queue.append(item) | |
722 | self._note("put(%s): appended, length now %d", | |
723 | item, len(self.queue)) | |
724 | self.rc.notify() | |
725 | self.mon.release() | |
726 | ||
727 | def get(self): | |
728 | self.mon.acquire() | |
729 | while not self.queue: | |
730 | self._note("get(): queue empty") | |
731 | self.rc.wait() | |
732 | item = self.queue.popleft() | |
733 | self._note("get(): got %s, %d left", item, len(self.queue)) | |
734 | self.wc.notify() | |
735 | self.mon.release() | |
736 | return item | |
737 | ||
738 | class ProducerThread(Thread): | |
739 | ||
740 | def __init__(self, queue, quota): | |
741 | Thread.__init__(self, name="Producer") | |
742 | self.queue = queue | |
743 | self.quota = quota | |
744 | ||
745 | def run(self): | |
746 | from random import random | |
747 | counter = 0 | |
748 | while counter < self.quota: | |
749 | counter = counter + 1 | |
750 | self.queue.put("%s.%d" % (self.getName(), counter)) | |
751 | _sleep(random() * 0.00001) | |
752 | ||
753 | ||
754 | class ConsumerThread(Thread): | |
755 | ||
756 | def __init__(self, queue, count): | |
757 | Thread.__init__(self, name="Consumer") | |
758 | self.queue = queue | |
759 | self.count = count | |
760 | ||
761 | def run(self): | |
762 | while self.count > 0: | |
763 | item = self.queue.get() | |
764 | print item | |
765 | self.count = self.count - 1 | |
766 | ||
767 | NP = 3 | |
768 | QL = 4 | |
769 | NI = 5 | |
770 | ||
771 | Q = BoundedQueue(QL) | |
772 | P = [] | |
773 | for i in range(NP): | |
774 | t = ProducerThread(Q, NI) | |
775 | t.setName("Producer-%d" % (i+1)) | |
776 | P.append(t) | |
777 | C = ConsumerThread(Q, NI*NP) | |
778 | for t in P: | |
779 | t.start() | |
780 | _sleep(0.000001) | |
781 | C.start() | |
782 | for t in P: | |
783 | t.join() | |
784 | C.join() | |
785 | ||
786 | if __name__ == '__main__': | |
787 | _test() |