"""Thread module emulating a subset of Java's threading model."""
del _sys
.modules
[__name__
]
from time
import time
as _time
, sleep
as _sleep
from traceback
import format_exc
as _format_exc
from collections
import deque
# Rename some stuff so "from threading import *" is safe
__all__
= ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Timer', 'setprofile', 'settrace', 'local']
_start_new_thread
= thread
.start_new_thread
_allocate_lock
= thread
.allocate_lock
_get_ident
= thread
.get_ident
ThreadError
= thread
.error
# Debug support (adapted from ihooks.py).
# All the major classes here derive from _Verbose. We force that to
# be a new-style class so that all the major classes here are new-style.
# This helps debugging (type(instance) is more revealing for instances
def __init__(self
, verbose
=None):
def _note(self
, format
, *args
):
currentThread().getName(), format
)
_sys
.stderr
.write(format
)
# Disable this when using "python -O"
def __init__(self
, verbose
=None):
# Support for profile and trace hooks
# Synchronization classes
def RLock(*args
, **kwargs
):
return _RLock(*args
, **kwargs
)
def __init__(self
, verbose
=None):
_Verbose
.__init
__(self
, verbose
)
self
.__block
= _allocate_lock()
return "<%s(%s, %d)>" % (
self
.__owner
and self
.__owner
.getName(),
def acquire(self
, blocking
=1):
self
.__count
= self
.__count
+ 1
self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
rc
= self
.__block
.acquire(blocking
)
self
._note
("%s.acquire(%s): initial success", self
, blocking
)
self
._note
("%s.acquire(%s): failure", self
, blocking
)
assert self
.__owner
is me
, "release() of un-acquire()d lock"
self
.__count
= count
= self
.__count
- 1
self
._note
("%s.release(): final release", self
)
self
._note
("%s.release(): non-final release", self
)
# Internal methods used by condition variables
def _acquire_restore(self
, (count
, owner
)):
self
._note
("%s._acquire_restore()", self
)
self
._note
("%s._release_save()", self
)
return self
.__owner
is currentThread()
def Condition(*args
, **kwargs
):
return _Condition(*args
, **kwargs
)
class _Condition(_Verbose
):
def __init__(self
, lock
=None, verbose
=None):
_Verbose
.__init
__(self
, verbose
)
# Export the lock's acquire() and release() methods
self
.acquire
= lock
.acquire
self
.release
= lock
.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
self
._release
_save
= lock
._release
_save
self
._acquire
_restore
= lock
._acquire
_restore
self
._is
_owned
= lock
._is
_owned
return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
self
.__lock
.release() # No state to save
def _acquire_restore(self
, x
):
self
.__lock
.acquire() # Ignore saved state
# Return True if lock is owned by currentThread.
# This method is called only if __lock doesn't have _is_owned().
if self
.__lock
.acquire(0):
def wait(self
, timeout
=None):
assert self
._is
_owned
(), "wait() of un-acquire()d lock"
waiter
= _allocate_lock()
self
.__waiters
.append(waiter
)
saved_state
= self
._release
_save
()
try: # restore state no matter what (e.g., KeyboardInterrupt)
self
._note
("%s.wait(): got it", self
)
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime
= _time() + timeout
delay
= 0.0005 # 500 us -> initial delay of 1 ms
gotit
= waiter
.acquire(0)
remaining
= endtime
- _time()
delay
= min(delay
* 2, remaining
, .05)
self
._note
("%s.wait(%s): timed out", self
, timeout
)
self
.__waiters
.remove(waiter
)
self
._note
("%s.wait(%s): got it", self
, timeout
)
self
._acquire
_restore
(saved_state
)
assert self
._is
_owned
(), "notify() of un-acquire()d lock"
__waiters
= self
.__waiters
self
._note
("%s.notify(): no waiters", self
)
self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
self
.notify(len(self
.__waiters
))
def Semaphore(*args
, **kwargs
):
return _Semaphore(*args
, **kwargs
)
class _Semaphore(_Verbose
):
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self
, value
=1, verbose
=None):
assert value
>= 0, "Semaphore initial value must be >= 0"
_Verbose
.__init
__(self
, verbose
)
self
.__cond
= Condition(Lock())
def acquire(self
, blocking
=1):
self
._note
("%s.acquire(%s): blocked waiting, value=%s",
self
, blocking
, self
.__value
)
self
.__value
= self
.__value
- 1
self
._note
("%s.acquire: success, value=%s",
self
.__value
= self
.__value
+ 1
self
._note
("%s.release: success, value=%s",
def BoundedSemaphore(*args
, **kwargs
):
return _BoundedSemaphore(*args
, **kwargs
)
class _BoundedSemaphore(_Semaphore
):
"""Semaphore that checks that # releases is <= # acquires"""
def __init__(self
, value
=1, verbose
=None):
_Semaphore
.__init
__(self
, value
, verbose
)
self
._initial
_value
= value
if self
._Semaphore
__value
>= self
._initial
_value
:
raise ValueError, "Semaphore released too many times"
return _Semaphore
.release(self
)
def Event(*args
, **kwargs
):
return _Event(*args
, **kwargs
)
# After Tim Peters' event class (without is_posted())
def __init__(self
, verbose
=None):
_Verbose
.__init
__(self
, verbose
)
self
.__cond
= Condition(Lock())
def wait(self
, timeout
=None):
self
.__cond
.wait(timeout
)
# Helper to generate new thread names
def _newname(template
="Thread-%d"):
return template
% _counter
# Active thread administration
_active_limbo_lock
= _allocate_lock()
# Need to store a reference to sys.exc_info for printing
# out exceptions when a thread tries to use a global var. during interp.
# shutdown and thus raises an exception about trying to perform some
# operation on/with a NoneType
__exc_info
= _sys
.exc_info
def __init__(self
, group
=None, target
=None, name
=None,
args
=(), kwargs
={}, verbose
=None):
assert group
is None, "group argument must be None for now"
_Verbose
.__init
__(self
, verbose
)
self
.__name
= str(name
or _newname())
self
.__daemonic
= self
._set
_daemon
()
self
.__block
= Condition(Lock())
self
.__initialized
= True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
self
.__stderr
= _sys
.stderr
# Overridden in _MainThread and _DummyThread
return currentThread().isDaemon()
assert self
.__initialized
, "Thread.__init__() was not called"
status
= status
+ " daemon"
return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
assert self
.__initialized
, "Thread.__init__() not called"
assert not self
.__started
, "thread already started"
self
._note
("%s.start(): starting thread", self
)
_active_limbo_lock
.acquire()
_active_limbo_lock
.release()
_start_new_thread(self
.__bootstrap
, ())
_sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
self
.__target
(*self
.__args
, **self
.__kwargs
)
_active_limbo_lock
.acquire()
_active
[_get_ident()] = self
_active_limbo_lock
.release()
self
._note
("%s.__bootstrap(): thread started", self
)
self
._note
("%s.__bootstrap(): registering trace hook", self
)
_sys
.settrace(_trace_hook
)
self
._note
("%s.__bootstrap(): registering profile hook", self
)
_sys
.setprofile(_profile_hook
)
self
._note
("%s.__bootstrap(): raised SystemExit", self
)
self
._note
("%s.__bootstrap(): unhandled exception", self
)
# If sys.stderr is no more (most likely from interpreter
# shutdown) use self.__stderr. Otherwise still use sys (as in
# _sys) in case sys.stderr was redefined since the creation of
_sys
.stderr
.write("Exception in thread %s:\n%s\n" %
(self
.getName(), _format_exc()))
# Do the best job possible w/o a huge amt. of code to
# approximate a traceback (code ideas from
exc_type
, exc_value
, exc_tb
= self
.__exc
_info
()
"Exception in thread " + self
.getName() +
" (most likely raised during interpreter shutdown):")
"Traceback (most recent call last):")
' File "%s", line %s, in %s' %
(exc_tb
.tb_frame
.f_code
.co_filename
,
exc_tb
.tb_frame
.f_code
.co_name
))
print>>self
.__stderr
, ("%s: %s" % (exc_type
, exc_value
))
# Make sure that exc_tb gets deleted since it is a memory
# hog; deleting everything else is just for thoroughness
del exc_type
, exc_value
, exc_tb
self
._note
("%s.__bootstrap(): normal return", self
)
"Remove current thread from the dict of currently running threads."
# Notes about running with dummy_thread:
# Must take care to not raise an exception if dummy_thread is being
# used (and thus this module is being used as an instance of
# dummy_threading). dummy_thread.get_ident() always returns -1 since
# there is only one thread if dummy_thread is being used. Thus
# len(_active) is always <= 1 here, and any Thread instance created
# overwrites the (if any) thread currently registered in _active.
# An instance of _MainThread is always created by 'threading'. This
# gets overwritten the instant an instance of Thread is created; both
# threads return -1 from dummy_thread.get_ident() and thus have the
# same key in the dict. So when the _MainThread instance created by
# 'threading' tries to clean itself up when atexit calls this method
# it gets a KeyError if another Thread instance was created.
# This all means that KeyError from trying to delete something from
# _active if dummy_threading is being used is a red herring. But
# since it isn't if dummy_threading is *not* being used then don't
_active_limbo_lock
.acquire()
del _active
[_get_ident()]
if 'dummy_threading' not in _sys
.modules
:
_active_limbo_lock
.release()
def join(self
, timeout
=None):
assert self
.__initialized
, "Thread.__init__() not called"
assert self
.__started
, "cannot join thread before it is started"
assert self
is not currentThread(), "cannot join current thread"
self
._note
("%s.join(): waiting until thread stops", self
)
while not self
.__stopped
:
self
._note
("%s.join(): thread stopped", self
)
deadline
= _time() + timeout
while not self
.__stopped
:
delay
= deadline
- _time()
self
._note
("%s.join(): timed out", self
)
self
._note
("%s.join(): thread stopped", self
)
assert self
.__initialized
, "Thread.__init__() not called"
assert self
.__initialized
, "Thread.__init__() not called"
assert self
.__initialized
, "Thread.__init__() not called"
return self
.__started
and not self
.__stopped
assert self
.__initialized
, "Thread.__init__() not called"
def setDaemon(self
, daemonic
):
assert self
.__initialized
, "Thread.__init__() not called"
assert not self
.__started
, "cannot set daemon status of active thread"
self
.__daemonic
= daemonic
# The timer class was contributed by Itamar Shtull-Trauring
def Timer(*args
, **kwargs
):
return _Timer(*args
, **kwargs
)
"""Call a function after a specified number of seconds:
t = Timer(30.0, f, args=[], kwargs={})
t.cancel() # stop the timer's action if it's still waiting
def __init__(self
, interval
, function
, args
=[], kwargs
={}):
"""Stop the timer if it hasn't finished yet"""
self
.finished
.wait(self
.interval
)
if not self
.finished
.isSet():
self
.function(*self
.args
, **self
.kwargs
)
# Special thread class to represent the main thread
# This is garbage collected through an exit handler
class _MainThread(Thread
):
Thread
.__init
__(self
, name
="MainThread")
self
._Thread
__started
= True
_active_limbo_lock
.acquire()
_active
[_get_ident()] = self
_active_limbo_lock
.release()
atexit
.register(self
.__exitfunc
)
t
= _pickSomeNonDaemonThread()
self
._note
("%s: waiting for other threads", self
)
t
= _pickSomeNonDaemonThread()
self
._note
("%s: exiting", self
)
def _pickSomeNonDaemonThread():
if not t
.isDaemon() and t
.isAlive():
# Dummy thread class to represent threads not started here.
# These aren't garbage collected when they die,
# nor can they be waited for.
# Their purpose is to return *something* from currentThread().
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).
class _DummyThread(Thread
):
Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
self
._Thread
__started
= True
_active_limbo_lock
.acquire()
_active
[_get_ident()] = self
_active_limbo_lock
.release()
def join(self
, timeout
=None):
assert False, "cannot join a dummy thread"
return _active
[_get_ident()]
##print "currentThread(): no current thread for", _get_ident()
_active_limbo_lock
.acquire()
count
= len(_active
) + len(_limbo
)
_active_limbo_lock
.release()
_active_limbo_lock
.acquire()
active
= _active
.values() + _limbo
.values()
_active_limbo_lock
.release()
# Create the main thread object
# get thread-local implementation, either from the thread
# module, or from the python fallback
from thread
import _local
as local
from _threading_local
import local
class BoundedQueue(_Verbose
):
def __init__(self
, limit
):
self
.rc
= Condition(self
.mon
)
self
.wc
= Condition(self
.mon
)
while len(self
.queue
) >= self
.limit
:
self
._note
("put(%s): queue full", item
)
self
._note
("put(%s): appended, length now %d",
self
._note
("get(): queue empty")
item
= self
.queue
.popleft()
self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
class ProducerThread(Thread
):
def __init__(self
, queue
, quota
):
Thread
.__init
__(self
, name
="Producer")
from random
import random
while counter
< self
.quota
:
self
.queue
.put("%s.%d" % (self
.getName(), counter
))
_sleep(random() * 0.00001)
class ConsumerThread(Thread
):
def __init__(self
, queue
, count
):
Thread
.__init
__(self
, name
="Consumer")
self
.count
= self
.count
- 1
t
= ProducerThread(Q
, NI
)
t
.setName("Producer-%d" % (i
+1))
C
= ConsumerThread(Q
, NI
*NP
)
if __name__
== '__main__':