Initial commit of OpenSPARC T2 design and verification files.
[OpenSPARC-T2-DV] / tools / src / nas,5.n2.os.2 / lib / python / lib / python2.4 / test / test_socketserver.py
CommitLineData
86530b38
AT
1# Test suite for SocketServer.py
2
3from test import test_support
4from test.test_support import verbose, verify, TESTFN, TestSkipped
5test_support.requires('network')
6
7from SocketServer import *
8import socket
9import select
10import time
11import threading
12import os
13
14NREQ = 3
15DELAY = 0.5
16
17class MyMixinHandler:
18 def handle(self):
19 time.sleep(DELAY)
20 line = self.rfile.readline()
21 time.sleep(DELAY)
22 self.wfile.write(line)
23
24class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
25 pass
26
27class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
28 pass
29
30class MyMixinServer:
31 def serve_a_few(self):
32 for i in range(NREQ):
33 self.handle_request()
34 def handle_error(self, request, client_address):
35 self.close_request(request)
36 self.server_close()
37 raise
38
39teststring = "hello world\n"
40
41def receive(sock, n, timeout=20):
42 r, w, x = select.select([sock], [], [], timeout)
43 if sock in r:
44 return sock.recv(n)
45 else:
46 raise RuntimeError, "timed out on %r" % (sock,)
47
48def testdgram(proto, addr):
49 s = socket.socket(proto, socket.SOCK_DGRAM)
50 s.sendto(teststring, addr)
51 buf = data = receive(s, 100)
52 while data and '\n' not in buf:
53 data = receive(s, 100)
54 buf += data
55 verify(buf == teststring)
56 s.close()
57
58def teststream(proto, addr):
59 s = socket.socket(proto, socket.SOCK_STREAM)
60 s.connect(addr)
61 s.sendall(teststring)
62 buf = data = receive(s, 100)
63 while data and '\n' not in buf:
64 data = receive(s, 100)
65 buf += data
66 verify(buf == teststring)
67 s.close()
68
69class ServerThread(threading.Thread):
70 def __init__(self, addr, svrcls, hdlrcls):
71 threading.Thread.__init__(self)
72 self.__addr = addr
73 self.__svrcls = svrcls
74 self.__hdlrcls = hdlrcls
75 def run(self):
76 class svrcls(MyMixinServer, self.__svrcls):
77 pass
78 if verbose: print "thread: creating server"
79 svr = svrcls(self.__addr, self.__hdlrcls)
80 if verbose: print "thread: serving three times"
81 svr.serve_a_few()
82 if verbose: print "thread: done"
83
84seed = 0
85def pickport():
86 global seed
87 seed += 1
88 return 10000 + (os.getpid() % 1000)*10 + seed
89
90host = "localhost"
91testfiles = []
92def pickaddr(proto):
93 if proto == socket.AF_INET:
94 return (host, pickport())
95 else:
96 fn = TESTFN + str(pickport())
97 if os.name == 'os2':
98 # AF_UNIX socket names on OS/2 require a specific prefix
99 # which can't include a drive letter and must also use
100 # backslashes as directory separators
101 if fn[1] == ':':
102 fn = fn[2:]
103 if fn[0] in (os.sep, os.altsep):
104 fn = fn[1:]
105 fn = os.path.join('\socket', fn)
106 if os.sep == '/':
107 fn = fn.replace(os.sep, os.altsep)
108 else:
109 fn = fn.replace(os.altsep, os.sep)
110 testfiles.append(fn)
111 return fn
112
113def cleanup():
114 for fn in testfiles:
115 try:
116 os.remove(fn)
117 except os.error:
118 pass
119 testfiles[:] = []
120
121def testloop(proto, servers, hdlrcls, testfunc):
122 for svrcls in servers:
123 addr = pickaddr(proto)
124 if verbose:
125 print "ADDR =", addr
126 print "CLASS =", svrcls
127 t = ServerThread(addr, svrcls, hdlrcls)
128 if verbose: print "server created"
129 t.start()
130 if verbose: print "server running"
131 for i in range(NREQ):
132 time.sleep(DELAY)
133 if verbose: print "test client", i
134 testfunc(proto, addr)
135 if verbose: print "waiting for server"
136 t.join()
137 if verbose: print "done"
138
139tcpservers = [TCPServer, ThreadingTCPServer]
140if hasattr(os, 'fork') and os.name not in ('os2',):
141 tcpservers.append(ForkingTCPServer)
142udpservers = [UDPServer, ThreadingUDPServer]
143if hasattr(os, 'fork') and os.name not in ('os2',):
144 udpservers.append(ForkingUDPServer)
145
146if not hasattr(socket, 'AF_UNIX'):
147 streamservers = []
148 dgramservers = []
149else:
150 class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
151 streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
152 if hasattr(os, 'fork') and os.name not in ('os2',):
153 streamservers.append(ForkingUnixStreamServer)
154 class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
155 dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
156 if hasattr(os, 'fork') and os.name not in ('os2',):
157 dgramservers.append(ForkingUnixDatagramServer)
158
159def testall():
160 testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
161 testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
162 if hasattr(socket, 'AF_UNIX'):
163 testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
164 # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
165 # client address so this cannot work:
166 ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
167
168def test_main():
169 import imp
170 if imp.lock_held():
171 # If the import lock is held, the threads will hang.
172 raise TestSkipped("can't run when import lock is held")
173
174 try:
175 testall()
176 finally:
177 cleanup()
178
179if __name__ == "__main__":
180 test_main()