Commit | Line | Data |
---|---|---|
86530b38 AT |
1 | # Test case for the os.poll() function |
2 | ||
3 | import sys, os, select, random | |
4 | from test.test_support import verify, verbose, TestSkipped, TESTFN | |
5 | ||
6 | try: | |
7 | select.poll | |
8 | except AttributeError: | |
9 | raise TestSkipped, "select.poll not defined -- skipping test_poll" | |
10 | ||
11 | ||
12 | def find_ready_matching(ready, flag): | |
13 | match = [] | |
14 | for fd, mode in ready: | |
15 | if mode & flag: | |
16 | match.append(fd) | |
17 | return match | |
18 | ||
19 | def test_poll1(): | |
20 | """Basic functional test of poll object | |
21 | ||
22 | Create a bunch of pipe and test that poll works with them. | |
23 | """ | |
24 | print 'Running poll test 1' | |
25 | p = select.poll() | |
26 | ||
27 | NUM_PIPES = 12 | |
28 | MSG = " This is a test." | |
29 | MSG_LEN = len(MSG) | |
30 | readers = [] | |
31 | writers = [] | |
32 | r2w = {} | |
33 | w2r = {} | |
34 | ||
35 | for i in range(NUM_PIPES): | |
36 | rd, wr = os.pipe() | |
37 | p.register(rd, select.POLLIN) | |
38 | p.register(wr, select.POLLOUT) | |
39 | readers.append(rd) | |
40 | writers.append(wr) | |
41 | r2w[rd] = wr | |
42 | w2r[wr] = rd | |
43 | ||
44 | while writers: | |
45 | ready = p.poll() | |
46 | ready_writers = find_ready_matching(ready, select.POLLOUT) | |
47 | if not ready_writers: | |
48 | raise RuntimeError, "no pipes ready for writing" | |
49 | wr = random.choice(ready_writers) | |
50 | os.write(wr, MSG) | |
51 | ||
52 | ready = p.poll() | |
53 | ready_readers = find_ready_matching(ready, select.POLLIN) | |
54 | if not ready_readers: | |
55 | raise RuntimeError, "no pipes ready for reading" | |
56 | rd = random.choice(ready_readers) | |
57 | buf = os.read(rd, MSG_LEN) | |
58 | verify(len(buf) == MSG_LEN) | |
59 | print buf | |
60 | os.close(r2w[rd]) ; os.close( rd ) | |
61 | p.unregister( r2w[rd] ) | |
62 | p.unregister( rd ) | |
63 | writers.remove(r2w[rd]) | |
64 | ||
65 | poll_unit_tests() | |
66 | print 'Poll test 1 complete' | |
67 | ||
68 | def poll_unit_tests(): | |
69 | # returns NVAL for invalid file descriptor | |
70 | FD = 42 | |
71 | try: | |
72 | os.close(FD) | |
73 | except OSError: | |
74 | pass | |
75 | p = select.poll() | |
76 | p.register(FD) | |
77 | r = p.poll() | |
78 | verify(r[0] == (FD, select.POLLNVAL)) | |
79 | ||
80 | f = open(TESTFN, 'w') | |
81 | fd = f.fileno() | |
82 | p = select.poll() | |
83 | p.register(f) | |
84 | r = p.poll() | |
85 | verify(r[0][0] == fd) | |
86 | f.close() | |
87 | r = p.poll() | |
88 | verify(r[0] == (fd, select.POLLNVAL)) | |
89 | os.unlink(TESTFN) | |
90 | ||
91 | # type error for invalid arguments | |
92 | p = select.poll() | |
93 | try: | |
94 | p.register(p) | |
95 | except TypeError: | |
96 | pass | |
97 | else: | |
98 | print "Bogus register call did not raise TypeError" | |
99 | try: | |
100 | p.unregister(p) | |
101 | except TypeError: | |
102 | pass | |
103 | else: | |
104 | print "Bogus unregister call did not raise TypeError" | |
105 | ||
106 | # can't unregister non-existent object | |
107 | p = select.poll() | |
108 | try: | |
109 | p.unregister(3) | |
110 | except KeyError: | |
111 | pass | |
112 | else: | |
113 | print "Bogus unregister call did not raise KeyError" | |
114 | ||
115 | # Test error cases | |
116 | pollster = select.poll() | |
117 | class Nope: | |
118 | pass | |
119 | ||
120 | class Almost: | |
121 | def fileno(self): | |
122 | return 'fileno' | |
123 | ||
124 | try: | |
125 | pollster.register( Nope(), 0 ) | |
126 | except TypeError: pass | |
127 | else: print 'expected TypeError exception, not raised' | |
128 | ||
129 | try: | |
130 | pollster.register( Almost(), 0 ) | |
131 | except TypeError: pass | |
132 | else: print 'expected TypeError exception, not raised' | |
133 | ||
134 | ||
135 | # Another test case for poll(). This is copied from the test case for | |
136 | # select(), modified to use poll() instead. | |
137 | ||
138 | def test_poll2(): | |
139 | print 'Running poll test 2' | |
140 | cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' | |
141 | p = os.popen(cmd, 'r') | |
142 | pollster = select.poll() | |
143 | pollster.register( p, select.POLLIN ) | |
144 | for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: | |
145 | if verbose: | |
146 | print 'timeout =', tout | |
147 | fdlist = pollster.poll(tout) | |
148 | if (fdlist == []): | |
149 | continue | |
150 | fd, flags = fdlist[0] | |
151 | if flags & select.POLLHUP: | |
152 | line = p.readline() | |
153 | if line != "": | |
154 | print 'error: pipe seems to be closed, but still returns data' | |
155 | continue | |
156 | ||
157 | elif flags & select.POLLIN: | |
158 | line = p.readline() | |
159 | if verbose: | |
160 | print repr(line) | |
161 | if not line: | |
162 | if verbose: | |
163 | print 'EOF' | |
164 | break | |
165 | continue | |
166 | else: | |
167 | print 'Unexpected return value from select.poll:', fdlist | |
168 | p.close() | |
169 | print 'Poll test 2 complete' | |
170 | ||
171 | test_poll1() | |
172 | test_poll2() |