b55dd01f22a0ac8000f23c78fbdececa7f3dcca8
# Some simple Queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
from test
.test_support
import verify
, TestFailed
, verbose
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading
.Thread
):
def __init__(self
, fn
, args
):
self
.startedEvent
= threading
.Event()
threading
.Thread
.__init
__(self
)
# The sleep isn't necessary, but is intended to give the blocking
# function in the main thread a chance at actually blocking before
# we unclog it. But if the sleep is longer than the timeout-based
# tests wait in their blocking functions, those tests will fail.
# So we give them much longer timeout values compared to the
# sleep here (I aimed at 10 seconds for blocking functions --
# they should never actually wait that long - they should make
# progress as soon as we call self.fn()).
# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function.
# Caution: block_func must guarantee to block until trigger_func is
# called, and trigger_func must guarantee to change queue state so that
# block_func can make enough progress to return. In particular, a
# block_func that just raises an exception regardless of whether trigger_func
# is called will lead to timing-dependent sporadic failures, and one of
# those went rarely seen but undiagnosed for years. Now block_func
# must be unexceptional. If block_func is supposed to raise an exception,
# call _doExceptionalBlockingTest() instead.
def _doBlockingTest(block_func
, block_args
, trigger_func
, trigger_args
):
t
= _TriggerThread(trigger_func
, trigger_args
)
result
= block_func(*block_args
)
# If block_func returned before our thread made the call, we failed!
if not t
.startedEvent
.isSet():
raise TestFailed("blocking function '%r' appeared not to block" %
t
.join(10) # make sure the thread terminates
raise TestFailed("trigger function '%r' appeared to not return" %
# Call this instead if block_func is supposed to raise an exception.
def _doExceptionalBlockingTest(block_func
, block_args
, trigger_func
,
trigger_args
, expected_exception_class
):
t
= _TriggerThread(trigger_func
, trigger_args
)
except expected_exception_class
:
raise TestFailed("expected exception of kind %r" %
expected_exception_class
)
t
.join(10) # make sure the thread terminates
raise TestFailed("trigger function '%r' appeared to not return" %
if not t
.startedEvent
.isSet():
raise TestFailed("trigger thread ended but event never set")
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
class FailingQueue(Queue
.Queue
):
def __init__(self
, *args
):
self
.fail_next_put
= False
self
.fail_next_get
= False
Queue
.Queue
.__init
__(self
, *args
)
self
.fail_next_put
= False
raise FailingQueueException
, "You Lose"
return Queue
.Queue
._put
(self
, item
)
self
.fail_next_get
= False
raise FailingQueueException
, "You Lose"
return Queue
.Queue
._get
(self
)
raise RuntimeError, "Call this function with an empty queue"
for i
in range(QUEUE_SIZE
-1):
# Test a failing non-blocking put.
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
q
.put("oops", timeout
=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
verify(q
.full(), "Queue should be full")
# Test a failing blocking put
_doBlockingTest(q
.put
, ("full",), q
.get
, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
# Test a failing timeout put
_doExceptionalBlockingTest(q
.put
, ("full", True, 10), q
.get
, (),
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
verify(q
.full(), "Queue should be full")
verify(not q
.full(), "Queue should not be full")
verify(q
.full(), "Queue should be full")
_doBlockingTest( q
.put
, ("full",), q
.get
, ())
for i
in range(QUEUE_SIZE
):
verify(q
.empty(), "Queue should be empty")
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
verify(not q
.empty(), "Queue should not be empty")
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
verify(not q
.empty(), "Queue should not be empty")
verify(q
.empty(), "Queue should be empty")
_doExceptionalBlockingTest(q
.get
, (), q
.put
, ('empty',),
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException
:
# put succeeded, but get failed.
verify(not q
.empty(), "Queue should not be empty")
verify(q
.empty(), "Queue should be empty")
raise RuntimeError, "Call this function with an empty queue"
# I guess we better check things actually queue correctly a little :)
verify(q
.get() == 111 and q
.get() == 222,
"Didn't seem to queue the correct data!")
for i
in range(QUEUE_SIZE
-1):
verify(not q
.empty(), "Queue should not be empty")
verify(not q
.full(), "Queue should not be full")
verify(q
.full(), "Queue should be full")
raise TestFailed("Didn't appear to block with a full queue")
q
.put("full", timeout
=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
_doBlockingTest(q
.put
, ("full",), q
.get
, ())
_doBlockingTest(q
.put
, ("full", True, 10), q
.get
, ())
for i
in range(QUEUE_SIZE
):
verify(q
.empty(), "Queue should be empty")
raise TestFailed("Didn't appear to block with an empty queue")
raise TestFailed("Didn't appear to time-out with an empty queue")
_doBlockingTest(q
.get
, (), q
.put
, ('empty',))
_doBlockingTest(q
.get
, (True, 10), q
.put
, ('empty',))
q
= Queue
.Queue(QUEUE_SIZE
)
# Do it a couple of times on the same queue
print "Simple Queue tests seemed to work"
q
= FailingQueue(QUEUE_SIZE
)
print "Failing Queue tests seemed to work"