# Very rudimentary test of thread module
# Create a bunch of threads, let each do some work, wait until all are done
from test
.test_support
import verbose
mutex
= thread
.allocate_lock()
rmutex
= thread
.allocate_lock() # for calls to random
done
= thread
.allocate_lock()
delay
= random
.random() * numtasks
print 'task', ident
, 'will run for', round(delay
, 1), 'sec'
print 'task', ident
, 'done'
global next_ident
, running
next_ident
= next_ident
+ 1
print 'creating task', next_ident
thread
.start_new_thread(task
, (next_ident
,))
for i
in range(numtasks
):
print 'waiting for all tasks to complete'
self
.checkin
= thread
.allocate_lock()
self
.checkout
= thread
.allocate_lock()
checkin
, checkout
= self
.checkin
, self
.checkout
self
.waiting
= self
.waiting
+ 1
if self
.waiting
== self
.n
:
self
.waiting
= self
.n
- 1
self
.waiting
= self
.waiting
- 1
for i
in range(numtrips
):
# give it a good chance to enter the next
# barrier before the others are all out
delay
= random
.random() * numtasks
print 'task', ident
, 'will run for', round(delay
, 1), 'sec'
print 'task', ident
, 'entering barrier', i
print 'task', ident
, 'leaving barrier', i
# Must release mutex before releasing done, else the main thread can
# exit and set mutex to None as part of global teardown; then
# mutex.release() raises AttributeError.
print '\n*** Barrier Test ***'
raise ValueError, "'done' should have remained acquired"
for i
in range(numtasks
):
thread
.start_new_thread(task2
, (i
,))