# Test case for the os.poll() function
import sys
, os
, select
, random
from test
.test_support
import verify
, verbose
, TestSkipped
, TESTFN
raise TestSkipped
, "select.poll not defined -- skipping test_poll"
def find_ready_matching(ready
, flag
):
"""Basic functional test of poll object
Create a bunch of pipe and test that poll works with them.
print 'Running poll test 1'
for i
in range(NUM_PIPES
):
p
.register(rd
, select
.POLLIN
)
p
.register(wr
, select
.POLLOUT
)
ready_writers
= find_ready_matching(ready
, select
.POLLOUT
)
raise RuntimeError, "no pipes ready for writing"
wr
= random
.choice(ready_writers
)
ready_readers
= find_ready_matching(ready
, select
.POLLIN
)
raise RuntimeError, "no pipes ready for reading"
rd
= random
.choice(ready_readers
)
buf
= os
.read(rd
, MSG_LEN
)
verify(len(buf
) == MSG_LEN
)
os
.close(r2w
[rd
]) ; os
.close( rd
)
print 'Poll test 1 complete'
# returns NVAL for invalid file descriptor
verify(r
[0] == (FD
, select
.POLLNVAL
))
verify(r
[0] == (fd
, select
.POLLNVAL
))
# type error for invalid arguments
print "Bogus register call did not raise TypeError"
print "Bogus unregister call did not raise TypeError"
# can't unregister non-existent object
print "Bogus unregister call did not raise KeyError"
pollster
.register( Nope(), 0 )
else: print 'expected TypeError exception, not raised'
pollster
.register( Almost(), 0 )
else: print 'expected TypeError exception, not raised'
# Another test case for poll(). This is copied from the test case for
# select(), modified to use poll() instead.
print 'Running poll test 2'
cmd
= 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
pollster
.register( p
, select
.POLLIN
)
for tout
in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
fdlist
= pollster
.poll(tout
)
if flags
& select
.POLLHUP
:
print 'error: pipe seems to be closed, but still returns data'
elif flags
& select
.POLLIN
:
print 'Unexpected return value from select.poll:', fdlist
print 'Poll test 2 complete'