Initial commit of OpenSPARC T2 architecture model.
[OpenSPARC-T2-SAM] / sam-t2 / devtools / amd64 / lib / python2.4 / test / test_socket.py
CommitLineData
920dae64
AT
1#!/usr/bin/env python
2
3import unittest
4from test import test_support
5
6import socket
7import select
8import time
9import thread, threading
10import Queue
11import sys
12from weakref import proxy
13
14PORT = 50007
15HOST = 'localhost'
16MSG = 'Michael Gilfix was here\n'
17
18class SocketTCPTest(unittest.TestCase):
19
20 def setUp(self):
21 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
23 self.serv.bind((HOST, PORT))
24 self.serv.listen(1)
25
26 def tearDown(self):
27 self.serv.close()
28 self.serv = None
29
30class SocketUDPTest(unittest.TestCase):
31
32 def setUp(self):
33 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
34 self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
35 self.serv.bind((HOST, PORT))
36
37 def tearDown(self):
38 self.serv.close()
39 self.serv = None
40
41class ThreadableTest:
42 """Threadable Test class
43
44 The ThreadableTest class makes it easy to create a threaded
45 client/server pair from an existing unit test. To create a
46 new threaded class from an existing unit test, use multiple
47 inheritance:
48
49 class NewClass (OldClass, ThreadableTest):
50 pass
51
52 This class defines two new fixture functions with obvious
53 purposes for overriding:
54
55 clientSetUp ()
56 clientTearDown ()
57
58 Any new test functions within the class must then define
59 tests in pairs, where the test name is preceeded with a
60 '_' to indicate the client portion of the test. Ex:
61
62 def testFoo(self):
63 # Server portion
64
65 def _testFoo(self):
66 # Client portion
67
68 Any exceptions raised by the clients during their tests
69 are caught and transferred to the main thread to alert
70 the testing framework.
71
72 Note, the server setup function cannot call any blocking
73 functions that rely on the client thread during setup,
74 unless serverExplicityReady() is called just before
75 the blocking call (such as in setting up a client/server
76 connection and performing the accept() in setUp().
77 """
78
79 def __init__(self):
80 # Swap the true setup function
81 self.__setUp = self.setUp
82 self.__tearDown = self.tearDown
83 self.setUp = self._setUp
84 self.tearDown = self._tearDown
85
86 def serverExplicitReady(self):
87 """This method allows the server to explicitly indicate that
88 it wants the client thread to proceed. This is useful if the
89 server is about to execute a blocking routine that is
90 dependent upon the client thread during its setup routine."""
91 self.server_ready.set()
92
93 def _setUp(self):
94 self.server_ready = threading.Event()
95 self.client_ready = threading.Event()
96 self.done = threading.Event()
97 self.queue = Queue.Queue(1)
98
99 # Do some munging to start the client test.
100 methodname = self.id()
101 i = methodname.rfind('.')
102 methodname = methodname[i+1:]
103 test_method = getattr(self, '_' + methodname)
104 self.client_thread = thread.start_new_thread(
105 self.clientRun, (test_method,))
106
107 self.__setUp()
108 if not self.server_ready.isSet():
109 self.server_ready.set()
110 self.client_ready.wait()
111
112 def _tearDown(self):
113 self.__tearDown()
114 self.done.wait()
115
116 if not self.queue.empty():
117 msg = self.queue.get()
118 self.fail(msg)
119
120 def clientRun(self, test_func):
121 self.server_ready.wait()
122 self.client_ready.set()
123 self.clientSetUp()
124 if not callable(test_func):
125 raise TypeError, "test_func must be a callable function"
126 try:
127 test_func()
128 except Exception, strerror:
129 self.queue.put(strerror)
130 self.clientTearDown()
131
132 def clientSetUp(self):
133 raise NotImplementedError, "clientSetUp must be implemented."
134
135 def clientTearDown(self):
136 self.done.set()
137 thread.exit()
138
139class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
140
141 def __init__(self, methodName='runTest'):
142 SocketTCPTest.__init__(self, methodName=methodName)
143 ThreadableTest.__init__(self)
144
145 def clientSetUp(self):
146 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
147
148 def clientTearDown(self):
149 self.cli.close()
150 self.cli = None
151 ThreadableTest.clientTearDown(self)
152
153class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
154
155 def __init__(self, methodName='runTest'):
156 SocketUDPTest.__init__(self, methodName=methodName)
157 ThreadableTest.__init__(self)
158
159 def clientSetUp(self):
160 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
161
162class SocketConnectedTest(ThreadedTCPSocketTest):
163
164 def __init__(self, methodName='runTest'):
165 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
166
167 def setUp(self):
168 ThreadedTCPSocketTest.setUp(self)
169 # Indicate explicitly we're ready for the client thread to
170 # proceed and then perform the blocking call to accept
171 self.serverExplicitReady()
172 conn, addr = self.serv.accept()
173 self.cli_conn = conn
174
175 def tearDown(self):
176 self.cli_conn.close()
177 self.cli_conn = None
178 ThreadedTCPSocketTest.tearDown(self)
179
180 def clientSetUp(self):
181 ThreadedTCPSocketTest.clientSetUp(self)
182 self.cli.connect((HOST, PORT))
183 self.serv_conn = self.cli
184
185 def clientTearDown(self):
186 self.serv_conn.close()
187 self.serv_conn = None
188 ThreadedTCPSocketTest.clientTearDown(self)
189
190class SocketPairTest(unittest.TestCase, ThreadableTest):
191
192 def __init__(self, methodName='runTest'):
193 unittest.TestCase.__init__(self, methodName=methodName)
194 ThreadableTest.__init__(self)
195
196 def setUp(self):
197 self.serv, self.cli = socket.socketpair()
198
199 def tearDown(self):
200 self.serv.close()
201 self.serv = None
202
203 def clientSetUp(self):
204 pass
205
206 def clientTearDown(self):
207 self.cli.close()
208 self.cli = None
209 ThreadableTest.clientTearDown(self)
210
211
212#######################################################################
213## Begin Tests
214
215class GeneralModuleTests(unittest.TestCase):
216
217 def test_weakref(self):
218 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
219 p = proxy(s)
220 self.assertEqual(p.fileno(), s.fileno())
221 s.close()
222 s = None
223 try:
224 p.fileno()
225 except ReferenceError:
226 pass
227 else:
228 self.fail('Socket proxy still exists')
229
230 def testSocketError(self):
231 # Testing socket module exceptions
232 def raise_error(*args, **kwargs):
233 raise socket.error
234 def raise_herror(*args, **kwargs):
235 raise socket.herror
236 def raise_gaierror(*args, **kwargs):
237 raise socket.gaierror
238 self.failUnlessRaises(socket.error, raise_error,
239 "Error raising socket exception.")
240 self.failUnlessRaises(socket.error, raise_herror,
241 "Error raising socket exception.")
242 self.failUnlessRaises(socket.error, raise_gaierror,
243 "Error raising socket exception.")
244
245 def testCrucialConstants(self):
246 # Testing for mission critical constants
247 socket.AF_INET
248 socket.SOCK_STREAM
249 socket.SOCK_DGRAM
250 socket.SOCK_RAW
251 socket.SOCK_RDM
252 socket.SOCK_SEQPACKET
253 socket.SOL_SOCKET
254 socket.SO_REUSEADDR
255
256 def testHostnameRes(self):
257 # Testing hostname resolution mechanisms
258 hostname = socket.gethostname()
259 try:
260 ip = socket.gethostbyname(hostname)
261 except socket.error:
262 # Probably name lookup wasn't set up right; skip this test
263 return
264 self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
265 try:
266 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
267 except socket.error:
268 # Probably a similar problem as above; skip this test
269 return
270 all_host_names = [hostname, hname] + aliases
271 fqhn = socket.getfqdn()
272 if not fqhn in all_host_names:
273 self.fail("Error testing host resolution mechanisms.")
274
275 def testRefCountGetNameInfo(self):
276 # Testing reference count for getnameinfo
277 import sys
278 if hasattr(sys, "getrefcount"):
279 try:
280 # On some versions, this loses a reference
281 orig = sys.getrefcount(__name__)
282 socket.getnameinfo(__name__,0)
283 except SystemError:
284 if sys.getrefcount(__name__) <> orig:
285 self.fail("socket.getnameinfo loses a reference")
286
287 def testInterpreterCrash(self):
288 # Making sure getnameinfo doesn't crash the interpreter
289 try:
290 # On some versions, this crashes the interpreter.
291 socket.getnameinfo(('x', 0, 0, 0), 0)
292 except socket.error:
293 pass
294
295 def testNtoH(self):
296 # This just checks that htons etc. are their own inverse,
297 # when looking at the lower 16 or 32 bits.
298 sizes = {socket.htonl: 32, socket.ntohl: 32,
299 socket.htons: 16, socket.ntohs: 16}
300 for func, size in sizes.items():
301 mask = (1L<<size) - 1
302 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
303 self.assertEqual(i & mask, func(func(i&mask)) & mask)
304
305 swapped = func(mask)
306 self.assertEqual(swapped & mask, mask)
307 self.assertRaises(OverflowError, func, 1L<<34)
308
309 def testGetServBy(self):
310 eq = self.assertEqual
311 # Find one service that exists, then check all the related interfaces.
312 # I've ordered this by protocols that have both a tcp and udp
313 # protocol, at least for modern Linuxes.
314 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
315 'darwin'):
316 # avoid the 'echo' service on this platform, as there is an
317 # assumption breaking non-standard port/protocol entry
318 services = ('daytime', 'qotd', 'domain')
319 else:
320 services = ('echo', 'daytime', 'domain')
321 for service in services:
322 try:
323 port = socket.getservbyname(service, 'tcp')
324 break
325 except socket.error:
326 pass
327 else:
328 raise socket.error
329 # Try same call with optional protocol omitted
330 port2 = socket.getservbyname(service)
331 eq(port, port2)
332 # Try udp, but don't barf it it doesn't exist
333 try:
334 udpport = socket.getservbyname(service, 'udp')
335 except socket.error:
336 udpport = None
337 else:
338 eq(udpport, port)
339 # Now make sure the lookup by port returns the same service name
340 eq(socket.getservbyport(port2), service)
341 eq(socket.getservbyport(port, 'tcp'), service)
342 if udpport is not None:
343 eq(socket.getservbyport(udpport, 'udp'), service)
344
345 def testDefaultTimeout(self):
346 # Testing default timeout
347 # The default timeout should initially be None
348 self.assertEqual(socket.getdefaulttimeout(), None)
349 s = socket.socket()
350 self.assertEqual(s.gettimeout(), None)
351 s.close()
352
353 # Set the default timeout to 10, and see if it propagates
354 socket.setdefaulttimeout(10)
355 self.assertEqual(socket.getdefaulttimeout(), 10)
356 s = socket.socket()
357 self.assertEqual(s.gettimeout(), 10)
358 s.close()
359
360 # Reset the default timeout to None, and see if it propagates
361 socket.setdefaulttimeout(None)
362 self.assertEqual(socket.getdefaulttimeout(), None)
363 s = socket.socket()
364 self.assertEqual(s.gettimeout(), None)
365 s.close()
366
367 # Check that setting it to an invalid value raises ValueError
368 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
369
370 # Check that setting it to an invalid type raises TypeError
371 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
372
373 def testIPv4toString(self):
374 if not hasattr(socket, 'inet_pton'):
375 return # No inet_pton() on this platform
376 from socket import inet_aton as f, inet_pton, AF_INET
377 g = lambda a: inet_pton(AF_INET, a)
378
379 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
380 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
381 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
382 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
383 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
384
385 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
386 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
387 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
388 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
389
390 def testIPv6toString(self):
391 if not hasattr(socket, 'inet_pton'):
392 return # No inet_pton() on this platform
393 try:
394 from socket import inet_pton, AF_INET6, has_ipv6
395 if not has_ipv6:
396 return
397 except ImportError:
398 return
399 f = lambda a: inet_pton(AF_INET6, a)
400
401 self.assertEquals('\x00' * 16, f('::'))
402 self.assertEquals('\x00' * 16, f('0::0'))
403 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
404 self.assertEquals(
405 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
406 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
407 )
408
409 def testStringToIPv4(self):
410 if not hasattr(socket, 'inet_ntop'):
411 return # No inet_ntop() on this platform
412 from socket import inet_ntoa as f, inet_ntop, AF_INET
413 g = lambda a: inet_ntop(AF_INET, a)
414
415 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
416 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
417 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
418 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
419
420 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
421 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
422 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
423
424 def testStringToIPv6(self):
425 if not hasattr(socket, 'inet_ntop'):
426 return # No inet_ntop() on this platform
427 try:
428 from socket import inet_ntop, AF_INET6, has_ipv6
429 if not has_ipv6:
430 return
431 except ImportError:
432 return
433 f = lambda a: inet_ntop(AF_INET6, a)
434
435 self.assertEquals('::', f('\x00' * 16))
436 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
437 self.assertEquals(
438 'aef:b01:506:1001:ffff:9997:55:170',
439 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
440 )
441
442 # XXX The following don't test module-level functionality...
443
444 def testSockName(self):
445 # Testing getsockname()
446 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
447 sock.bind(("0.0.0.0", PORT+1))
448 name = sock.getsockname()
449 self.assertEqual(name, ("0.0.0.0", PORT+1))
450
451 def testGetSockOpt(self):
452 # Testing getsockopt()
453 # We know a socket should start without reuse==0
454 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
456 self.failIf(reuse != 0, "initial mode is reuse")
457
458 def testSetSockOpt(self):
459 # Testing setsockopt()
460 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
461 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
462 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
463 self.failIf(reuse == 0, "failed to set reuse mode")
464
465 def testSendAfterClose(self):
466 # testing send() after close() with timeout
467 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
468 sock.settimeout(1)
469 sock.close()
470 self.assertRaises(socket.error, sock.send, "spam")
471
472class BasicTCPTest(SocketConnectedTest):
473
474 def __init__(self, methodName='runTest'):
475 SocketConnectedTest.__init__(self, methodName=methodName)
476
477 def testRecv(self):
478 # Testing large receive over TCP
479 msg = self.cli_conn.recv(1024)
480 self.assertEqual(msg, MSG)
481
482 def _testRecv(self):
483 self.serv_conn.send(MSG)
484
485 def testOverFlowRecv(self):
486 # Testing receive in chunks over TCP
487 seg1 = self.cli_conn.recv(len(MSG) - 3)
488 seg2 = self.cli_conn.recv(1024)
489 msg = seg1 + seg2
490 self.assertEqual(msg, MSG)
491
492 def _testOverFlowRecv(self):
493 self.serv_conn.send(MSG)
494
495 def testRecvFrom(self):
496 # Testing large recvfrom() over TCP
497 msg, addr = self.cli_conn.recvfrom(1024)
498 self.assertEqual(msg, MSG)
499
500 def _testRecvFrom(self):
501 self.serv_conn.send(MSG)
502
503 def testOverFlowRecvFrom(self):
504 # Testing recvfrom() in chunks over TCP
505 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
506 seg2, addr = self.cli_conn.recvfrom(1024)
507 msg = seg1 + seg2
508 self.assertEqual(msg, MSG)
509
510 def _testOverFlowRecvFrom(self):
511 self.serv_conn.send(MSG)
512
513 def testSendAll(self):
514 # Testing sendall() with a 2048 byte string over TCP
515 msg = ''
516 while 1:
517 read = self.cli_conn.recv(1024)
518 if not read:
519 break
520 msg += read
521 self.assertEqual(msg, 'f' * 2048)
522
523 def _testSendAll(self):
524 big_chunk = 'f' * 2048
525 self.serv_conn.sendall(big_chunk)
526
527 def testFromFd(self):
528 # Testing fromfd()
529 if not hasattr(socket, "fromfd"):
530 return # On Windows, this doesn't exist
531 fd = self.cli_conn.fileno()
532 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
533 msg = sock.recv(1024)
534 self.assertEqual(msg, MSG)
535
536 def _testFromFd(self):
537 self.serv_conn.send(MSG)
538
539 def testShutdown(self):
540 # Testing shutdown()
541 msg = self.cli_conn.recv(1024)
542 self.assertEqual(msg, MSG)
543
544 def _testShutdown(self):
545 self.serv_conn.send(MSG)
546 self.serv_conn.shutdown(2)
547
548class BasicUDPTest(ThreadedUDPSocketTest):
549
550 def __init__(self, methodName='runTest'):
551 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
552
553 def testSendtoAndRecv(self):
554 # Testing sendto() and Recv() over UDP
555 msg = self.serv.recv(len(MSG))
556 self.assertEqual(msg, MSG)
557
558 def _testSendtoAndRecv(self):
559 self.cli.sendto(MSG, 0, (HOST, PORT))
560
561 def testRecvFrom(self):
562 # Testing recvfrom() over UDP
563 msg, addr = self.serv.recvfrom(len(MSG))
564 self.assertEqual(msg, MSG)
565
566 def _testRecvFrom(self):
567 self.cli.sendto(MSG, 0, (HOST, PORT))
568
569class BasicSocketPairTest(SocketPairTest):
570
571 def __init__(self, methodName='runTest'):
572 SocketPairTest.__init__(self, methodName=methodName)
573
574 def testRecv(self):
575 msg = self.serv.recv(1024)
576 self.assertEqual(msg, MSG)
577
578 def _testRecv(self):
579 self.cli.send(MSG)
580
581 def testSend(self):
582 self.serv.send(MSG)
583
584 def _testSend(self):
585 msg = self.cli.recv(1024)
586 self.assertEqual(msg, MSG)
587
588class NonBlockingTCPTests(ThreadedTCPSocketTest):
589
590 def __init__(self, methodName='runTest'):
591 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
592
593 def testSetBlocking(self):
594 # Testing whether set blocking works
595 self.serv.setblocking(0)
596 start = time.time()
597 try:
598 self.serv.accept()
599 except socket.error:
600 pass
601 end = time.time()
602 self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
603
604 def _testSetBlocking(self):
605 pass
606
607 def testAccept(self):
608 # Testing non-blocking accept
609 self.serv.setblocking(0)
610 try:
611 conn, addr = self.serv.accept()
612 except socket.error:
613 pass
614 else:
615 self.fail("Error trying to do non-blocking accept.")
616 read, write, err = select.select([self.serv], [], [])
617 if self.serv in read:
618 conn, addr = self.serv.accept()
619 else:
620 self.fail("Error trying to do accept after select.")
621
622 def _testAccept(self):
623 time.sleep(0.1)
624 self.cli.connect((HOST, PORT))
625
626 def testConnect(self):
627 # Testing non-blocking connect
628 conn, addr = self.serv.accept()
629
630 def _testConnect(self):
631 self.cli.settimeout(10)
632 self.cli.connect((HOST, PORT))
633
634 def testRecv(self):
635 # Testing non-blocking recv
636 conn, addr = self.serv.accept()
637 conn.setblocking(0)
638 try:
639 msg = conn.recv(len(MSG))
640 except socket.error:
641 pass
642 else:
643 self.fail("Error trying to do non-blocking recv.")
644 read, write, err = select.select([conn], [], [])
645 if conn in read:
646 msg = conn.recv(len(MSG))
647 self.assertEqual(msg, MSG)
648 else:
649 self.fail("Error during select call to non-blocking socket.")
650
651 def _testRecv(self):
652 self.cli.connect((HOST, PORT))
653 time.sleep(0.1)
654 self.cli.send(MSG)
655
656class FileObjectClassTestCase(SocketConnectedTest):
657
658 bufsize = -1 # Use default buffer size
659
660 def __init__(self, methodName='runTest'):
661 SocketConnectedTest.__init__(self, methodName=methodName)
662
663 def setUp(self):
664 SocketConnectedTest.setUp(self)
665 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
666
667 def tearDown(self):
668 self.serv_file.close()
669 self.assert_(self.serv_file.closed)
670 self.serv_file = None
671 SocketConnectedTest.tearDown(self)
672
673 def clientSetUp(self):
674 SocketConnectedTest.clientSetUp(self)
675 self.cli_file = self.serv_conn.makefile('wb')
676
677 def clientTearDown(self):
678 self.cli_file.close()
679 self.assert_(self.cli_file.closed)
680 self.cli_file = None
681 SocketConnectedTest.clientTearDown(self)
682
683 def testSmallRead(self):
684 # Performing small file read test
685 first_seg = self.serv_file.read(len(MSG)-3)
686 second_seg = self.serv_file.read(3)
687 msg = first_seg + second_seg
688 self.assertEqual(msg, MSG)
689
690 def _testSmallRead(self):
691 self.cli_file.write(MSG)
692 self.cli_file.flush()
693
694 def testFullRead(self):
695 # read until EOF
696 msg = self.serv_file.read()
697 self.assertEqual(msg, MSG)
698
699 def _testFullRead(self):
700 self.cli_file.write(MSG)
701 self.cli_file.close()
702
703 def testUnbufferedRead(self):
704 # Performing unbuffered file read test
705 buf = ''
706 while 1:
707 char = self.serv_file.read(1)
708 if not char:
709 break
710 buf += char
711 self.assertEqual(buf, MSG)
712
713 def _testUnbufferedRead(self):
714 self.cli_file.write(MSG)
715 self.cli_file.flush()
716
717 def testReadline(self):
718 # Performing file readline test
719 line = self.serv_file.readline()
720 self.assertEqual(line, MSG)
721
722 def _testReadline(self):
723 self.cli_file.write(MSG)
724 self.cli_file.flush()
725
726 def testClosedAttr(self):
727 self.assert_(not self.serv_file.closed)
728
729 def _testClosedAttr(self):
730 self.assert_(not self.cli_file.closed)
731
732class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
733
734 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
735
736 In this case (and in this case only), it should be possible to
737 create a file object, read a line from it, create another file
738 object, read another line from it, without loss of data in the
739 first file object's buffer. Note that httplib relies on this
740 when reading multiple requests from the same socket."""
741
742 bufsize = 0 # Use unbuffered mode
743
744 def testUnbufferedReadline(self):
745 # Read a line, create a new file object, read another line with it
746 line = self.serv_file.readline() # first line
747 self.assertEqual(line, "A. " + MSG) # first line
748 self.serv_file = self.cli_conn.makefile('rb', 0)
749 line = self.serv_file.readline() # second line
750 self.assertEqual(line, "B. " + MSG) # second line
751
752 def _testUnbufferedReadline(self):
753 self.cli_file.write("A. " + MSG)
754 self.cli_file.write("B. " + MSG)
755 self.cli_file.flush()
756
757class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
758
759 bufsize = 1 # Default-buffered for reading; line-buffered for writing
760
761
762class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
763
764 bufsize = 2 # Exercise the buffering code
765
766class TCPTimeoutTest(SocketTCPTest):
767
768 def testTCPTimeout(self):
769 def raise_timeout(*args, **kwargs):
770 self.serv.settimeout(1.0)
771 self.serv.accept()
772 self.failUnlessRaises(socket.timeout, raise_timeout,
773 "Error generating a timeout exception (TCP)")
774
775 def testTimeoutZero(self):
776 ok = False
777 try:
778 self.serv.settimeout(0.0)
779 foo = self.serv.accept()
780 except socket.timeout:
781 self.fail("caught timeout instead of error (TCP)")
782 except socket.error:
783 ok = True
784 except:
785 self.fail("caught unexpected exception (TCP)")
786 if not ok:
787 self.fail("accept() returned success when we did not expect it")
788
789class UDPTimeoutTest(SocketTCPTest):
790
791 def testUDPTimeout(self):
792 def raise_timeout(*args, **kwargs):
793 self.serv.settimeout(1.0)
794 self.serv.recv(1024)
795 self.failUnlessRaises(socket.timeout, raise_timeout,
796 "Error generating a timeout exception (UDP)")
797
798 def testTimeoutZero(self):
799 ok = False
800 try:
801 self.serv.settimeout(0.0)
802 foo = self.serv.recv(1024)
803 except socket.timeout:
804 self.fail("caught timeout instead of error (UDP)")
805 except socket.error:
806 ok = True
807 except:
808 self.fail("caught unexpected exception (UDP)")
809 if not ok:
810 self.fail("recv() returned success when we did not expect it")
811
812class TestExceptions(unittest.TestCase):
813
814 def testExceptionTree(self):
815 self.assert_(issubclass(socket.error, Exception))
816 self.assert_(issubclass(socket.herror, socket.error))
817 self.assert_(issubclass(socket.gaierror, socket.error))
818 self.assert_(issubclass(socket.timeout, socket.error))
819
820
821def test_main():
822 tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions]
823 if sys.platform != 'mac':
824 tests.extend([ BasicUDPTest, UDPTimeoutTest ])
825
826 tests.extend([
827 NonBlockingTCPTests,
828 FileObjectClassTestCase,
829 UnbufferedFileObjectClassTestCase,
830 LineBufferedFileObjectClassTestCase,
831 SmallBufferedFileObjectClassTestCase
832 ])
833 if hasattr(socket, "socketpair"):
834 tests.append(BasicSocketPairTest)
835 test_support.run_unittest(*tests)
836
837if __name__ == "__main__":
838 test_main()