Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | import unittest |
2 | from test import test_support | |
3 | ||
4 | import os, socket | |
5 | import StringIO | |
6 | ||
7 | import urllib2 | |
8 | from urllib2 import Request, OpenerDirector | |
9 | ||
10 | # XXX | |
11 | # Request | |
12 | # CacheFTPHandler (hard to write) | |
13 | # parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter | |
14 | # and Greg Stein, since they're doing Digest Authentication) | |
15 | # Authentication stuff (ditto) | |
16 | # ProxyHandler, CustomProxy, CustomProxyHandler (I don't use a proxy) | |
17 | # GopherHandler (haven't used gopher for a decade or so...) | |
18 | ||
19 | class TrivialTests(unittest.TestCase): | |
20 | def test_trivial(self): | |
21 | # A couple trivial tests | |
22 | ||
23 | self.assertRaises(ValueError, urllib2.urlopen, 'bogus url') | |
24 | ||
25 | # XXX Name hacking to get this to work on Windows. | |
26 | fname = os.path.abspath(urllib2.__file__).replace('\\', '/') | |
27 | if fname[1:2] == ":": | |
28 | fname = fname[2:] | |
29 | # And more hacking to get it to work on MacOS. This assumes | |
30 | # urllib.pathname2url works, unfortunately... | |
31 | if os.name == 'mac': | |
32 | fname = '/' + fname.replace(':', '/') | |
33 | elif os.name == 'riscos': | |
34 | import string | |
35 | fname = os.expand(fname) | |
36 | fname = fname.translate(string.maketrans("/.", "./")) | |
37 | ||
38 | file_url = "file://%s" % fname | |
39 | f = urllib2.urlopen(file_url) | |
40 | ||
41 | buf = f.read() | |
42 | f.close() | |
43 | ||
44 | def test_parse_http_list(self): | |
45 | tests = [('a,b,c', ['a', 'b', 'c']), | |
46 | ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), | |
47 | ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), | |
48 | ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] | |
49 | for string, list in tests: | |
50 | self.assertEquals(urllib2.parse_http_list(string), list) | |
51 | ||
52 | ||
53 | class MockOpener: | |
54 | addheaders = [] | |
55 | def open(self, req, data=None): | |
56 | self.req, self.data = req, data | |
57 | def error(self, proto, *args): | |
58 | self.proto, self.args = proto, args | |
59 | ||
60 | class MockFile: | |
61 | def read(self, count=None): pass | |
62 | def readline(self, count=None): pass | |
63 | def close(self): pass | |
64 | ||
65 | class MockHeaders(dict): | |
66 | def getheaders(self, name): | |
67 | return self.values() | |
68 | ||
69 | class MockResponse(StringIO.StringIO): | |
70 | def __init__(self, code, msg, headers, data, url=None): | |
71 | StringIO.StringIO.__init__(self, data) | |
72 | self.code, self.msg, self.headers, self.url = code, msg, headers, url | |
73 | def info(self): | |
74 | return self.headers | |
75 | def geturl(self): | |
76 | return self.url | |
77 | ||
78 | class MockCookieJar: | |
79 | def add_cookie_header(self, request): | |
80 | self.ach_req = request | |
81 | def extract_cookies(self, response, request): | |
82 | self.ec_req, self.ec_r = request, response | |
83 | ||
84 | class FakeMethod: | |
85 | def __init__(self, meth_name, action, handle): | |
86 | self.meth_name = meth_name | |
87 | self.handle = handle | |
88 | self.action = action | |
89 | def __call__(self, *args): | |
90 | return self.handle(self.meth_name, self.action, *args) | |
91 | ||
92 | class MockHandler: | |
93 | def __init__(self, methods): | |
94 | self._define_methods(methods) | |
95 | def _define_methods(self, methods): | |
96 | for spec in methods: | |
97 | if len(spec) == 2: name, action = spec | |
98 | else: name, action = spec, None | |
99 | meth = FakeMethod(name, action, self.handle) | |
100 | setattr(self.__class__, name, meth) | |
101 | def handle(self, fn_name, action, *args, **kwds): | |
102 | self.parent.calls.append((self, fn_name, args, kwds)) | |
103 | if action is None: | |
104 | return None | |
105 | elif action == "return self": | |
106 | return self | |
107 | elif action == "return response": | |
108 | res = MockResponse(200, "OK", {}, "") | |
109 | return res | |
110 | elif action == "return request": | |
111 | return Request("http://blah/") | |
112 | elif action.startswith("error"): | |
113 | code = action[action.rfind(" ")+1:] | |
114 | try: | |
115 | code = int(code) | |
116 | except ValueError: | |
117 | pass | |
118 | res = MockResponse(200, "OK", {}, "") | |
119 | return self.parent.error("http", args[0], res, code, "", {}) | |
120 | elif action == "raise": | |
121 | raise urllib2.URLError("blah") | |
122 | assert False | |
123 | def close(self): pass | |
124 | def add_parent(self, parent): | |
125 | self.parent = parent | |
126 | self.parent.calls = [] | |
127 | def __lt__(self, other): | |
128 | if not hasattr(other, "handler_order"): | |
129 | # No handler_order, leave in original order. Yuck. | |
130 | return True | |
131 | return self.handler_order < other.handler_order | |
132 | ||
133 | def add_ordered_mock_handlers(opener, meth_spec): | |
134 | """Create MockHandlers and add them to an OpenerDirector. | |
135 | ||
136 | meth_spec: list of lists of tuples and strings defining methods to define | |
137 | on handlers. eg: | |
138 | ||
139 | [["http_error", "ftp_open"], ["http_open"]] | |
140 | ||
141 | defines methods .http_error() and .ftp_open() on one handler, and | |
142 | .http_open() on another. These methods just record their arguments and | |
143 | return None. Using a tuple instead of a string causes the method to | |
144 | perform some action (see MockHandler.handle()), eg: | |
145 | ||
146 | [["http_error"], [("http_open", "return request")]] | |
147 | ||
148 | defines .http_error() on one handler (which simply returns None), and | |
149 | .http_open() on another handler, which returns a Request object. | |
150 | ||
151 | """ | |
152 | handlers = [] | |
153 | count = 0 | |
154 | for meths in meth_spec: | |
155 | class MockHandlerSubclass(MockHandler): pass | |
156 | h = MockHandlerSubclass(meths) | |
157 | h.handler_order = count | |
158 | h.add_parent(opener) | |
159 | count = count + 1 | |
160 | handlers.append(h) | |
161 | opener.add_handler(h) | |
162 | return handlers | |
163 | ||
164 | class OpenerDirectorTests(unittest.TestCase): | |
165 | ||
166 | def test_handled(self): | |
167 | # handler returning non-None means no more handlers will be called | |
168 | o = OpenerDirector() | |
169 | meth_spec = [ | |
170 | ["http_open", "ftp_open", "http_error_302"], | |
171 | ["ftp_open"], | |
172 | [("http_open", "return self")], | |
173 | [("http_open", "return self")], | |
174 | ] | |
175 | handlers = add_ordered_mock_handlers(o, meth_spec) | |
176 | ||
177 | req = Request("http://example.com/") | |
178 | r = o.open(req) | |
179 | # Second .http_open() gets called, third doesn't, since second returned | |
180 | # non-None. Handlers without .http_open() never get any methods called | |
181 | # on them. | |
182 | # In fact, second mock handler defining .http_open() returns self | |
183 | # (instead of response), which becomes the OpenerDirector's return | |
184 | # value. | |
185 | self.assertEqual(r, handlers[2]) | |
186 | calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] | |
187 | for expected, got in zip(calls, o.calls): | |
188 | handler, name, args, kwds = got | |
189 | self.assertEqual((handler, name), expected) | |
190 | self.assertEqual(args, (req,)) | |
191 | ||
192 | def test_handler_order(self): | |
193 | o = OpenerDirector() | |
194 | handlers = [] | |
195 | for meths, handler_order in [ | |
196 | ([("http_open", "return self")], 500), | |
197 | (["http_open"], 0), | |
198 | ]: | |
199 | class MockHandlerSubclass(MockHandler): pass | |
200 | h = MockHandlerSubclass(meths) | |
201 | h.handler_order = handler_order | |
202 | handlers.append(h) | |
203 | o.add_handler(h) | |
204 | ||
205 | r = o.open("http://example.com/") | |
206 | # handlers called in reverse order, thanks to their sort order | |
207 | self.assertEqual(o.calls[0][0], handlers[1]) | |
208 | self.assertEqual(o.calls[1][0], handlers[0]) | |
209 | ||
210 | def test_raise(self): | |
211 | # raising URLError stops processing of request | |
212 | o = OpenerDirector() | |
213 | meth_spec = [ | |
214 | [("http_open", "raise")], | |
215 | [("http_open", "return self")], | |
216 | ] | |
217 | handlers = add_ordered_mock_handlers(o, meth_spec) | |
218 | ||
219 | req = Request("http://example.com/") | |
220 | self.assertRaises(urllib2.URLError, o.open, req) | |
221 | self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) | |
222 | ||
223 | ## def test_error(self): | |
224 | ## # XXX this doesn't actually seem to be used in standard library, | |
225 | ## # but should really be tested anyway... | |
226 | ||
227 | def test_http_error(self): | |
228 | # XXX http_error_default | |
229 | # http errors are a special case | |
230 | o = OpenerDirector() | |
231 | meth_spec = [ | |
232 | [("http_open", "error 302")], | |
233 | [("http_error_400", "raise"), "http_open"], | |
234 | [("http_error_302", "return response"), "http_error_303", | |
235 | "http_error"], | |
236 | [("http_error_302")], | |
237 | ] | |
238 | handlers = add_ordered_mock_handlers(o, meth_spec) | |
239 | ||
240 | class Unknown: | |
241 | def __eq__(self, other): return True | |
242 | ||
243 | req = Request("http://example.com/") | |
244 | r = o.open(req) | |
245 | assert len(o.calls) == 2 | |
246 | calls = [(handlers[0], "http_open", (req,)), | |
247 | (handlers[2], "http_error_302", | |
248 | (req, Unknown(), 302, "", {}))] | |
249 | for expected, got in zip(calls, o.calls): | |
250 | handler, method_name, args = expected | |
251 | self.assertEqual((handler, method_name), got[:2]) | |
252 | self.assertEqual(args, got[2]) | |
253 | ||
254 | def test_processors(self): | |
255 | # *_request / *_response methods get called appropriately | |
256 | o = OpenerDirector() | |
257 | meth_spec = [ | |
258 | [("http_request", "return request"), | |
259 | ("http_response", "return response")], | |
260 | [("http_request", "return request"), | |
261 | ("http_response", "return response")], | |
262 | ] | |
263 | handlers = add_ordered_mock_handlers(o, meth_spec) | |
264 | ||
265 | req = Request("http://example.com/") | |
266 | r = o.open(req) | |
267 | # processor methods are called on *all* handlers that define them, | |
268 | # not just the first handler that handles the request | |
269 | calls = [ | |
270 | (handlers[0], "http_request"), (handlers[1], "http_request"), | |
271 | (handlers[0], "http_response"), (handlers[1], "http_response")] | |
272 | ||
273 | for i, (handler, name, args, kwds) in enumerate(o.calls): | |
274 | if i < 2: | |
275 | # *_request | |
276 | self.assertEqual((handler, name), calls[i]) | |
277 | self.assertEqual(len(args), 1) | |
278 | self.assert_(isinstance(args[0], Request)) | |
279 | else: | |
280 | # *_response | |
281 | self.assertEqual((handler, name), calls[i]) | |
282 | self.assertEqual(len(args), 2) | |
283 | self.assert_(isinstance(args[0], Request)) | |
284 | # response from opener.open is None, because there's no | |
285 | # handler that defines http_open to handle it | |
286 | self.assert_(args[1] is None or | |
287 | isinstance(args[1], MockResponse)) | |
288 | ||
289 | ||
290 | def sanepathname2url(path): | |
291 | import urllib | |
292 | urlpath = urllib.pathname2url(path) | |
293 | if os.name == "nt" and urlpath.startswith("///"): | |
294 | urlpath = urlpath[2:] | |
295 | # XXX don't ask me about the mac... | |
296 | return urlpath | |
297 | ||
298 | class HandlerTests(unittest.TestCase): | |
299 | ||
300 | def test_ftp(self): | |
301 | class MockFTPWrapper: | |
302 | def __init__(self, data): self.data = data | |
303 | def retrfile(self, filename, filetype): | |
304 | self.filename, self.filetype = filename, filetype | |
305 | return StringIO.StringIO(self.data), len(self.data) | |
306 | ||
307 | class NullFTPHandler(urllib2.FTPHandler): | |
308 | def __init__(self, data): self.data = data | |
309 | def connect_ftp(self, user, passwd, host, port, dirs): | |
310 | self.user, self.passwd = user, passwd | |
311 | self.host, self.port = host, port | |
312 | self.dirs = dirs | |
313 | self.ftpwrapper = MockFTPWrapper(self.data) | |
314 | return self.ftpwrapper | |
315 | ||
316 | import ftplib, socket | |
317 | data = "rheum rhaponicum" | |
318 | h = NullFTPHandler(data) | |
319 | o = h.parent = MockOpener() | |
320 | ||
321 | for url, host, port, type_, dirs, filename, mimetype in [ | |
322 | ("ftp://localhost/foo/bar/baz.html", | |
323 | "localhost", ftplib.FTP_PORT, "I", | |
324 | ["foo", "bar"], "baz.html", "text/html"), | |
325 | ("ftp://localhost:80/foo/bar/", | |
326 | "localhost", 80, "D", | |
327 | ["foo", "bar"], "", None), | |
328 | ("ftp://localhost/baz.gif;type=a", | |
329 | "localhost", ftplib.FTP_PORT, "A", | |
330 | [], "baz.gif", None), # XXX really this should guess image/gif | |
331 | ]: | |
332 | r = h.ftp_open(Request(url)) | |
333 | # ftp authentication not yet implemented by FTPHandler | |
334 | self.assert_(h.user == h.passwd == "") | |
335 | self.assertEqual(h.host, socket.gethostbyname(host)) | |
336 | self.assertEqual(h.port, port) | |
337 | self.assertEqual(h.dirs, dirs) | |
338 | self.assertEqual(h.ftpwrapper.filename, filename) | |
339 | self.assertEqual(h.ftpwrapper.filetype, type_) | |
340 | headers = r.info() | |
341 | self.assertEqual(headers.get("Content-type"), mimetype) | |
342 | self.assertEqual(int(headers["Content-length"]), len(data)) | |
343 | ||
344 | def test_file(self): | |
345 | import time, rfc822, socket | |
346 | h = urllib2.FileHandler() | |
347 | o = h.parent = MockOpener() | |
348 | ||
349 | TESTFN = test_support.TESTFN | |
350 | urlpath = sanepathname2url(os.path.abspath(TESTFN)) | |
351 | towrite = "hello, world\n" | |
352 | for url in [ | |
353 | "file://localhost%s" % urlpath, | |
354 | "file://%s" % urlpath, | |
355 | "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), | |
356 | "file://%s%s" % (socket.gethostbyname(socket.gethostname()), | |
357 | urlpath), | |
358 | ]: | |
359 | f = open(TESTFN, "wb") | |
360 | try: | |
361 | try: | |
362 | f.write(towrite) | |
363 | finally: | |
364 | f.close() | |
365 | ||
366 | r = h.file_open(Request(url)) | |
367 | try: | |
368 | data = r.read() | |
369 | headers = r.info() | |
370 | newurl = r.geturl() | |
371 | finally: | |
372 | r.close() | |
373 | stats = os.stat(TESTFN) | |
374 | modified = rfc822.formatdate(stats.st_mtime) | |
375 | finally: | |
376 | os.remove(TESTFN) | |
377 | self.assertEqual(data, towrite) | |
378 | self.assertEqual(headers["Content-type"], "text/plain") | |
379 | self.assertEqual(headers["Content-length"], "13") | |
380 | self.assertEqual(headers["Last-modified"], modified) | |
381 | ||
382 | for url in [ | |
383 | "file://localhost:80%s" % urlpath, | |
384 | # XXXX bug: these fail with socket.gaierror, should be URLError | |
385 | ## "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), | |
386 | ## os.getcwd(), TESTFN), | |
387 | ## "file://somerandomhost.ontheinternet.com%s/%s" % | |
388 | ## (os.getcwd(), TESTFN), | |
389 | ]: | |
390 | try: | |
391 | f = open(TESTFN, "wb") | |
392 | try: | |
393 | f.write(towrite) | |
394 | finally: | |
395 | f.close() | |
396 | ||
397 | self.assertRaises(urllib2.URLError, | |
398 | h.file_open, Request(url)) | |
399 | finally: | |
400 | os.remove(TESTFN) | |
401 | ||
402 | h = urllib2.FileHandler() | |
403 | o = h.parent = MockOpener() | |
404 | # XXXX why does // mean ftp (and /// mean not ftp!), and where | |
405 | # is file: scheme specified? I think this is really a bug, and | |
406 | # what was intended was to distinguish between URLs like: | |
407 | # file:/blah.txt (a file) | |
408 | # file://localhost/blah.txt (a file) | |
409 | # file:///blah.txt (a file) | |
410 | # file://ftp.example.com/blah.txt (an ftp URL) | |
411 | for url, ftp in [ | |
412 | ("file://ftp.example.com//foo.txt", True), | |
413 | ("file://ftp.example.com///foo.txt", False), | |
414 | # XXXX bug: fails with OSError, should be URLError | |
415 | ("file://ftp.example.com/foo.txt", False), | |
416 | ]: | |
417 | req = Request(url) | |
418 | try: | |
419 | h.file_open(req) | |
420 | # XXXX remove OSError when bug fixed | |
421 | except (urllib2.URLError, OSError): | |
422 | self.assert_(not ftp) | |
423 | else: | |
424 | self.assert_(o.req is req) | |
425 | self.assertEqual(req.type, "ftp") | |
426 | ||
427 | def test_http(self): | |
428 | class MockHTTPResponse: | |
429 | def __init__(self, fp, msg, status, reason): | |
430 | self.fp = fp | |
431 | self.msg = msg | |
432 | self.status = status | |
433 | self.reason = reason | |
434 | def read(self): | |
435 | return '' | |
436 | class MockHTTPClass: | |
437 | def __init__(self): | |
438 | self.req_headers = [] | |
439 | self.data = None | |
440 | self.raise_on_endheaders = False | |
441 | def __call__(self, host): | |
442 | self.host = host | |
443 | return self | |
444 | def set_debuglevel(self, level): | |
445 | self.level = level | |
446 | def request(self, method, url, body=None, headers={}): | |
447 | self.method = method | |
448 | self.selector = url | |
449 | self.req_headers += headers.items() | |
450 | if body: | |
451 | self.data = body | |
452 | if self.raise_on_endheaders: | |
453 | import socket | |
454 | raise socket.error() | |
455 | def getresponse(self): | |
456 | return MockHTTPResponse(MockFile(), {}, 200, "OK") | |
457 | ||
458 | h = urllib2.AbstractHTTPHandler() | |
459 | o = h.parent = MockOpener() | |
460 | ||
461 | url = "http://example.com/" | |
462 | for method, data in [("GET", None), ("POST", "blah")]: | |
463 | req = Request(url, data, {"Foo": "bar"}) | |
464 | req.add_unredirected_header("Spam", "eggs") | |
465 | http = MockHTTPClass() | |
466 | r = h.do_open(http, req) | |
467 | ||
468 | # result attributes | |
469 | r.read; r.readline # wrapped MockFile methods | |
470 | r.info; r.geturl # addinfourl methods | |
471 | r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() | |
472 | hdrs = r.info() | |
473 | hdrs.get; hdrs.has_key # r.info() gives dict from .getreply() | |
474 | self.assertEqual(r.geturl(), url) | |
475 | ||
476 | self.assertEqual(http.host, "example.com") | |
477 | self.assertEqual(http.level, 0) | |
478 | self.assertEqual(http.method, method) | |
479 | self.assertEqual(http.selector, "/") | |
480 | self.assertEqual(http.req_headers, | |
481 | [("Connection", "close"), | |
482 | ("Foo", "bar"), ("Spam", "eggs")]) | |
483 | self.assertEqual(http.data, data) | |
484 | ||
485 | # check socket.error converted to URLError | |
486 | http.raise_on_endheaders = True | |
487 | self.assertRaises(urllib2.URLError, h.do_open, http, req) | |
488 | ||
489 | # check adding of standard headers | |
490 | o.addheaders = [("Spam", "eggs")] | |
491 | for data in "", None: # POST, GET | |
492 | req = Request("http://example.com/", data) | |
493 | r = MockResponse(200, "OK", {}, "") | |
494 | newreq = h.do_request_(req) | |
495 | if data is None: # GET | |
496 | self.assert_("Content-length" not in req.unredirected_hdrs) | |
497 | self.assert_("Content-type" not in req.unredirected_hdrs) | |
498 | else: # POST | |
499 | self.assertEqual(req.unredirected_hdrs["Content-length"], "0") | |
500 | self.assertEqual(req.unredirected_hdrs["Content-type"], | |
501 | "application/x-www-form-urlencoded") | |
502 | # XXX the details of Host could be better tested | |
503 | self.assertEqual(req.unredirected_hdrs["Host"], "example.com") | |
504 | self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") | |
505 | ||
506 | # don't clobber existing headers | |
507 | req.add_unredirected_header("Content-length", "foo") | |
508 | req.add_unredirected_header("Content-type", "bar") | |
509 | req.add_unredirected_header("Host", "baz") | |
510 | req.add_unredirected_header("Spam", "foo") | |
511 | newreq = h.do_request_(req) | |
512 | self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") | |
513 | self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") | |
514 | self.assertEqual(req.unredirected_hdrs["Host"], "baz") | |
515 | self.assertEqual(req.unredirected_hdrs["Spam"], "foo") | |
516 | ||
517 | def test_errors(self): | |
518 | h = urllib2.HTTPErrorProcessor() | |
519 | o = h.parent = MockOpener() | |
520 | ||
521 | url = "http://example.com/" | |
522 | req = Request(url) | |
523 | # 200 OK is passed through | |
524 | r = MockResponse(200, "OK", {}, "", url) | |
525 | newr = h.http_response(req, r) | |
526 | self.assert_(r is newr) | |
527 | self.assert_(not hasattr(o, "proto")) # o.error not called | |
528 | # anything else calls o.error (and MockOpener returns None, here) | |
529 | r = MockResponse(201, "Created", {}, "", url) | |
530 | self.assert_(h.http_response(req, r) is None) | |
531 | self.assertEqual(o.proto, "http") # o.error called | |
532 | self.assertEqual(o.args, (req, r, 201, "Created", {})) | |
533 | ||
534 | def test_cookies(self): | |
535 | cj = MockCookieJar() | |
536 | h = urllib2.HTTPCookieProcessor(cj) | |
537 | o = h.parent = MockOpener() | |
538 | ||
539 | req = Request("http://example.com/") | |
540 | r = MockResponse(200, "OK", {}, "") | |
541 | newreq = h.http_request(req) | |
542 | self.assert_(cj.ach_req is req is newreq) | |
543 | self.assertEquals(req.get_origin_req_host(), "example.com") | |
544 | self.assert_(not req.is_unverifiable()) | |
545 | newr = h.http_response(req, r) | |
546 | self.assert_(cj.ec_req is req) | |
547 | self.assert_(cj.ec_r is r is newr) | |
548 | ||
549 | def test_redirect(self): | |
550 | from_url = "http://example.com/a.html" | |
551 | to_url = "http://example.com/b.html" | |
552 | h = urllib2.HTTPRedirectHandler() | |
553 | o = h.parent = MockOpener() | |
554 | ||
555 | # ordinary redirect behaviour | |
556 | for code in 301, 302, 303, 307: | |
557 | for data in None, "blah\nblah\n": | |
558 | method = getattr(h, "http_error_%s" % code) | |
559 | req = Request(from_url, data) | |
560 | req.add_header("Nonsense", "viking=withhold") | |
561 | req.add_unredirected_header("Spam", "spam") | |
562 | try: | |
563 | method(req, MockFile(), code, "Blah", | |
564 | MockHeaders({"location": to_url})) | |
565 | except urllib2.HTTPError: | |
566 | # 307 in response to POST requires user OK | |
567 | self.assert_(code == 307 and data is not None) | |
568 | self.assertEqual(o.req.get_full_url(), to_url) | |
569 | try: | |
570 | self.assertEqual(o.req.get_method(), "GET") | |
571 | except AttributeError: | |
572 | self.assert_(not o.req.has_data()) | |
573 | self.assertEqual(o.req.headers["Nonsense"], | |
574 | "viking=withhold") | |
575 | self.assert_("Spam" not in o.req.headers) | |
576 | self.assert_("Spam" not in o.req.unredirected_hdrs) | |
577 | ||
578 | # loop detection | |
579 | req = Request(from_url) | |
580 | def redirect(h, req, url=to_url): | |
581 | h.http_error_302(req, MockFile(), 302, "Blah", | |
582 | MockHeaders({"location": url})) | |
583 | # Note that the *original* request shares the same record of | |
584 | # redirections with the sub-requests caused by the redirections. | |
585 | ||
586 | # detect infinite loop redirect of a URL to itself | |
587 | req = Request(from_url, origin_req_host="example.com") | |
588 | count = 0 | |
589 | try: | |
590 | while 1: | |
591 | redirect(h, req, "http://example.com/") | |
592 | count = count + 1 | |
593 | except urllib2.HTTPError: | |
594 | # don't stop until max_repeats, because cookies may introduce state | |
595 | self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats) | |
596 | ||
597 | # detect endless non-repeating chain of redirects | |
598 | req = Request(from_url, origin_req_host="example.com") | |
599 | count = 0 | |
600 | try: | |
601 | while 1: | |
602 | redirect(h, req, "http://example.com/%d" % count) | |
603 | count = count + 1 | |
604 | except urllib2.HTTPError: | |
605 | self.assertEqual(count, | |
606 | urllib2.HTTPRedirectHandler.max_redirections) | |
607 | ||
608 | def test_cookie_redirect(self): | |
609 | class MockHTTPHandler(urllib2.HTTPHandler): | |
610 | def __init__(self): self._count = 0 | |
611 | def http_open(self, req): | |
612 | import mimetools | |
613 | from StringIO import StringIO | |
614 | if self._count == 0: | |
615 | self._count = self._count + 1 | |
616 | msg = mimetools.Message( | |
617 | StringIO("Location: http://www.cracker.com/\r\n\r\n")) | |
618 | return self.parent.error( | |
619 | "http", req, MockFile(), 302, "Found", msg) | |
620 | else: | |
621 | self.req = req | |
622 | msg = mimetools.Message(StringIO("\r\n\r\n")) | |
623 | return MockResponse(200, "OK", msg, "", req.get_full_url()) | |
624 | # cookies shouldn't leak into redirected requests | |
625 | from cookielib import CookieJar | |
626 | from urllib2 import build_opener, HTTPHandler, HTTPError, \ | |
627 | HTTPCookieProcessor | |
628 | ||
629 | from test_cookielib import interact_netscape | |
630 | ||
631 | cj = CookieJar() | |
632 | interact_netscape(cj, "http://www.example.com/", "spam=eggs") | |
633 | hh = MockHTTPHandler() | |
634 | cp = HTTPCookieProcessor(cj) | |
635 | o = build_opener(hh, cp) | |
636 | o.open("http://www.example.com/") | |
637 | self.assert_(not hh.req.has_header("Cookie")) | |
638 | ||
639 | ||
640 | class MiscTests(unittest.TestCase): | |
641 | ||
642 | def test_build_opener(self): | |
643 | class MyHTTPHandler(urllib2.HTTPHandler): pass | |
644 | class FooHandler(urllib2.BaseHandler): | |
645 | def foo_open(self): pass | |
646 | class BarHandler(urllib2.BaseHandler): | |
647 | def bar_open(self): pass | |
648 | ||
649 | build_opener = urllib2.build_opener | |
650 | ||
651 | o = build_opener(FooHandler, BarHandler) | |
652 | self.opener_has_handler(o, FooHandler) | |
653 | self.opener_has_handler(o, BarHandler) | |
654 | ||
655 | # can take a mix of classes and instances | |
656 | o = build_opener(FooHandler, BarHandler()) | |
657 | self.opener_has_handler(o, FooHandler) | |
658 | self.opener_has_handler(o, BarHandler) | |
659 | ||
660 | # subclasses of default handlers override default handlers | |
661 | o = build_opener(MyHTTPHandler) | |
662 | self.opener_has_handler(o, MyHTTPHandler) | |
663 | ||
664 | # a particular case of overriding: default handlers can be passed | |
665 | # in explicitly | |
666 | o = build_opener() | |
667 | self.opener_has_handler(o, urllib2.HTTPHandler) | |
668 | o = build_opener(urllib2.HTTPHandler) | |
669 | self.opener_has_handler(o, urllib2.HTTPHandler) | |
670 | o = build_opener(urllib2.HTTPHandler()) | |
671 | self.opener_has_handler(o, urllib2.HTTPHandler) | |
672 | ||
673 | def opener_has_handler(self, opener, handler_class): | |
674 | for h in opener.handlers: | |
675 | if h.__class__ == handler_class: | |
676 | break | |
677 | else: | |
678 | self.assert_(False) | |
679 | ||
680 | class NetworkTests(unittest.TestCase): | |
681 | def setUp(self): | |
682 | if 0: # for debugging | |
683 | import logging | |
684 | logger = logging.getLogger("test_urllib2") | |
685 | logger.addHandler(logging.StreamHandler()) | |
686 | ||
687 | def test_range (self): | |
688 | req = urllib2.Request("http://www.python.org", | |
689 | headers={'Range': 'bytes=20-39'}) | |
690 | result = urllib2.urlopen(req) | |
691 | data = result.read() | |
692 | self.assertEqual(len(data), 20) | |
693 | ||
694 | # XXX The rest of these tests aren't very good -- they don't check much. | |
695 | # They do sometimes catch some major disasters, though. | |
696 | ||
697 | def test_ftp(self): | |
698 | urls = [ | |
699 | 'ftp://www.python.org/pub/python/misc/sousa.au', | |
700 | 'ftp://www.python.org/pub/tmp/blat', | |
701 | 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' | |
702 | '/research-reports/00README-Legal-Rules-Regs', | |
703 | ] | |
704 | self._test_urls(urls, self._extra_handlers()) | |
705 | ||
706 | def test_gopher(self): | |
707 | urls = [ | |
708 | # Thanks to Fred for finding these! | |
709 | 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex', | |
710 | 'gopher://gopher.vt.edu:10010/10/33', | |
711 | ] | |
712 | self._test_urls(urls, self._extra_handlers()) | |
713 | ||
714 | def test_file(self): | |
715 | TESTFN = test_support.TESTFN | |
716 | f = open(TESTFN, 'w') | |
717 | try: | |
718 | f.write('hi there\n') | |
719 | f.close() | |
720 | urls = [ | |
721 | 'file:'+sanepathname2url(os.path.abspath(TESTFN)), | |
722 | ||
723 | # XXX bug, should raise URLError | |
724 | #('file://nonsensename/etc/passwd', None, urllib2.URLError) | |
725 | ('file://nonsensename/etc/passwd', None, (OSError, socket.error)) | |
726 | ] | |
727 | self._test_urls(urls, self._extra_handlers()) | |
728 | finally: | |
729 | os.remove(TESTFN) | |
730 | ||
731 | def test_http(self): | |
732 | urls = [ | |
733 | 'http://www.espn.com/', # redirect | |
734 | 'http://www.python.org/Spanish/Inquistion/', | |
735 | ('http://www.python.org/cgi-bin/faqw.py', | |
736 | 'query=pythonistas&querytype=simple&casefold=yes&req=search', None), | |
737 | 'http://www.python.org/', | |
738 | ] | |
739 | self._test_urls(urls, self._extra_handlers()) | |
740 | ||
741 | # XXX Following test depends on machine configurations that are internal | |
742 | # to CNRI. Need to set up a public server with the right authentication | |
743 | # configuration for test purposes. | |
744 | ||
745 | ## def test_cnri(self): | |
746 | ## if socket.gethostname() == 'bitdiddle': | |
747 | ## localhost = 'bitdiddle.cnri.reston.va.us' | |
748 | ## elif socket.gethostname() == 'bitdiddle.concentric.net': | |
749 | ## localhost = 'localhost' | |
750 | ## else: | |
751 | ## localhost = None | |
752 | ## if localhost is not None: | |
753 | ## urls = [ | |
754 | ## 'file://%s/etc/passwd' % localhost, | |
755 | ## 'http://%s/simple/' % localhost, | |
756 | ## 'http://%s/digest/' % localhost, | |
757 | ## 'http://%s/not/found.h' % localhost, | |
758 | ## ] | |
759 | ||
760 | ## bauth = HTTPBasicAuthHandler() | |
761 | ## bauth.add_password('basic_test_realm', localhost, 'jhylton', | |
762 | ## 'password') | |
763 | ## dauth = HTTPDigestAuthHandler() | |
764 | ## dauth.add_password('digest_test_realm', localhost, 'jhylton', | |
765 | ## 'password') | |
766 | ||
767 | ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) | |
768 | ||
769 | def _test_urls(self, urls, handlers): | |
770 | import socket | |
771 | import time | |
772 | import logging | |
773 | debug = logging.getLogger("test_urllib2").debug | |
774 | ||
775 | urllib2.install_opener(urllib2.build_opener(*handlers)) | |
776 | ||
777 | for url in urls: | |
778 | if isinstance(url, tuple): | |
779 | url, req, expected_err = url | |
780 | else: | |
781 | req = expected_err = None | |
782 | debug(url) | |
783 | try: | |
784 | f = urllib2.urlopen(url, req) | |
785 | except (IOError, socket.error, OSError), err: | |
786 | debug(err) | |
787 | if expected_err: | |
788 | self.assert_(isinstance(err, expected_err)) | |
789 | else: | |
790 | buf = f.read() | |
791 | f.close() | |
792 | debug("read %d bytes" % len(buf)) | |
793 | debug("******** next url coming up...") | |
794 | time.sleep(0.1) | |
795 | ||
796 | def _extra_handlers(self): | |
797 | handlers = [] | |
798 | ||
799 | handlers.append(urllib2.GopherHandler) | |
800 | ||
801 | cfh = urllib2.CacheFTPHandler() | |
802 | cfh.setTimeout(1) | |
803 | handlers.append(cfh) | |
804 | ||
805 | ## # XXX try out some custom proxy objects too! | |
806 | ## def at_cnri(req): | |
807 | ## host = req.get_host() | |
808 | ## debug(host) | |
809 | ## if host[-18:] == '.cnri.reston.va.us': | |
810 | ## return True | |
811 | ## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us') | |
812 | ## ph = CustomProxyHandler(p) | |
813 | ## handlers.append(ph) | |
814 | ||
815 | return handlers | |
816 | ||
817 | ||
818 | def test_main(verbose=None): | |
819 | tests = (TrivialTests, | |
820 | OpenerDirectorTests, | |
821 | HandlerTests, | |
822 | MiscTests) | |
823 | if test_support.is_resource_enabled('network'): | |
824 | tests += (NetworkTests,) | |
825 | test_support.run_unittest(*tests) | |
826 | ||
827 | if __name__ == "__main__": | |
828 | test_main(verbose=True) |