Meant to be used by dummy_thread and thread. To allow for different modules
to be used, test_main() can be called with the module to use as the thread
implementation as its sole argument.
import dummy_thread
as _thread
from test
import test_support
DELAY
= 0 # Set > 0 when testing a module other than dummy_thread, such as
class LockTests(unittest
.TestCase
):
self
.lock
= _thread
.allocate_lock()
#Make sure locks start locked
self
.failUnless(not self
.lock
.locked(),
"Lock object is not initialized unlocked.")
# Test self.lock.release()
self
.failUnless(not self
.lock
.locked(),
"Lock object did not release properly.")
def test_improper_release(self
):
#Make sure release of an unlocked thread raises _thread.error
self
.failUnlessRaises(_thread
.error
, self
.lock
.release
)
def test_cond_acquire_success(self
):
#Make sure the conditional acquiring of the lock works.
self
.failUnless(self
.lock
.acquire(0),
"Conditional acquiring of the lock failed.")
def test_cond_acquire_fail(self
):
#Test acquiring locked lock returns False
self
.failUnless(not self
.lock
.acquire(0),
"Conditional acquiring of a locked lock incorrectly "
def test_uncond_acquire_success(self
):
#Make sure unconditional acquiring of a lock works.
self
.failUnless(self
.lock
.locked(),
"Uncondional locking failed.")
def test_uncond_acquire_return_val(self
):
#Make sure that an unconditional locking returns True.
self
.failUnless(self
.lock
.acquire(1) is True,
"Unconditional locking did not return True.")
def test_uncond_acquire_blocking(self
):
#Make sure that unconditional acquiring of a locked lock blocks.
def delay_unlock(to_unlock
, delay
):
"""Hold on to lock for a set amount of time before unlocking."""
start_time
= int(time
.time())
_thread
.start_new_thread(delay_unlock
,(self
.lock
, DELAY
))
print "*** Waiting for thread to release the lock "\
"(approx. %s sec.) ***" % DELAY
end_time
= int(time
.time())
self
.failUnless((end_time
- start_time
) >= DELAY
,
"Blocking by unconditional acquiring failed.")
class MiscTests(unittest
.TestCase
):
"""Miscellaneous tests."""
#Make sure _thread.exit() raises SystemExit
self
.failUnlessRaises(SystemExit, _thread
.exit
)
#Test sanity of _thread.get_ident()
self
.failUnless(isinstance(_thread
.get_ident(), int),
"_thread.get_ident() returned a non-integer")
self
.failUnless(_thread
.get_ident() != 0,
"_thread.get_ident() returned 0")
#Make sure _thread.LockType is the same type as _thread.allocate_locke()
self
.failUnless(isinstance(_thread
.allocate_lock(), _thread
.LockType
),
"_thread.LockType is not an instance of what is "
"returned by _thread.allocate_lock()")
def test_interrupt_main(self
):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
self
.failUnlessRaises(KeyboardInterrupt, _thread
.start_new_thread
,
def test_interrupt_in_main(self
):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self
.failUnlessRaises(KeyboardInterrupt, _thread
.interrupt_main
)
class ThreadTests(unittest
.TestCase
):
"""Test thread creation."""
def test_arg_passing(self
):
#Make sure that parameter passing works.
def arg_tester(queue
, arg1
=False, arg2
=False):
"""Use to test _thread.start_new_thread() passes args properly."""
testing_queue
= Queue
.Queue(1)
_thread
.start_new_thread(arg_tester
, (testing_queue
, True, True))
result
= testing_queue
.get()
self
.failUnless(result
[0] and result
[1],
"Argument passing for thread creation using tuple failed")
_thread
.start_new_thread(arg_tester
, tuple(), {'queue':testing_queue
,
'arg1':True, 'arg2':True})
result
= testing_queue
.get()
self
.failUnless(result
[0] and result
[1],
"Argument passing for thread creation using kwargs failed")
_thread
.start_new_thread(arg_tester
, (testing_queue
, True), {'arg2':True})
result
= testing_queue
.get()
self
.failUnless(result
[0] and result
[1],
"Argument passing for thread creation using both tuple"
def test_multi_creation(self
):
#Make sure multiple threads can be created.
def queue_mark(queue
, delay
):
"""Wait for ``delay`` seconds and then put something into ``queue``"""
queue
.put(_thread
.get_ident())
testing_queue
= Queue
.Queue(thread_count
)
print "*** Testing multiple thread creation "\
"(will take approx. %s to %s sec.) ***" % (DELAY
, thread_count
)
for count
in xrange(thread_count
):
local_delay
= round(random
.random(), 1)
_thread
.start_new_thread(queue_mark
,
(testing_queue
, local_delay
))
self
.failUnless(testing_queue
.qsize() == thread_count
,
"Not all %s threads executed properly after %s sec." %
def test_main(imported_module
=None):
_thread
= imported_module
print "*** Using %s as _thread module ***" % _thread
test_support
.run_unittest(LockTests
, MiscTests
, ThreadTests
)
if __name__
== '__main__':