Back to index

python3.2  3.2.2
test_urllib2.py
Go to the documentation of this file.
00001 import unittest
00002 from test import support
00003 
00004 import os
00005 import io
00006 import socket
00007 import array
00008 
00009 import urllib.request
00010 # The proxy bypass method imported below has logic specific to the OSX
00011 # proxy config data structure but is testable on all platforms.
00012 from urllib.request import Request, OpenerDirector, _proxy_bypass_macosx_sysconf
00013 import urllib.error
00014 
00015 # XXX
00016 # Request
00017 # CacheFTPHandler (hard to write)
00018 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
00019 
00020 class TrivialTests(unittest.TestCase):
00021     def test_trivial(self):
00022         # A couple trivial tests
00023 
00024         self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url')
00025 
00026         # XXX Name hacking to get this to work on Windows.
00027         fname = os.path.abspath(urllib.request.__file__).replace('\\', '/')
00028 
00029         if os.name == 'nt':
00030             file_url = "file:///%s" % fname
00031         else:
00032             file_url = "file://%s" % fname
00033 
00034         f = urllib.request.urlopen(file_url)
00035 
00036         buf = f.read()
00037         f.close()
00038 
00039     def test_parse_http_list(self):
00040         tests = [
00041             ('a,b,c', ['a', 'b', 'c']),
00042             ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
00043             ('a, b, "c", "d", "e,f", g, h',
00044              ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
00045             ('a="b\\"c", d="e\\,f", g="h\\\\i"',
00046              ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
00047         for string, list in tests:
00048             self.assertEqual(urllib.request.parse_http_list(string), list)
00049 
00050 
00051 def test_request_headers_dict():
00052     """
00053     The Request.headers dictionary is not a documented interface.  It should
00054     stay that way, because the complete set of headers are only accessible
00055     through the .get_header(), .has_header(), .header_items() interface.
00056     However, .headers pre-dates those methods, and so real code will be using
00057     the dictionary.
00058 
00059     The introduction in 2.4 of those methods was a mistake for the same reason:
00060     code that previously saw all (urllib2 user)-provided headers in .headers
00061     now sees only a subset (and the function interface is ugly and incomplete).
00062     A better change would have been to replace .headers dict with a dict
00063     subclass (or UserDict.DictMixin instance?)  that preserved the .headers
00064     interface and also provided access to the "unredirected" headers.  It's
00065     probably too late to fix that, though.
00066 
00067 
00068     Check .capitalize() case normalization:
00069 
00070     >>> url = "http://example.com"
00071     >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
00072     'blah'
00073     >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
00074     'blah'
00075 
00076     Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
00077     but that could be changed in future.
00078 
00079     """
00080 
00081 def test_request_headers_methods():
00082     """
00083     Note the case normalization of header names here, to .capitalize()-case.
00084     This should be preserved for backwards-compatibility.  (In the HTTP case,
00085     normalization to .title()-case is done by urllib2 before sending headers to
00086     http.client).
00087 
00088     >>> url = "http://example.com"
00089     >>> r = Request(url, headers={"Spam-eggs": "blah"})
00090     >>> r.has_header("Spam-eggs")
00091     True
00092     >>> r.header_items()
00093     [('Spam-eggs', 'blah')]
00094     >>> r.add_header("Foo-Bar", "baz")
00095     >>> items = sorted(r.header_items())
00096     >>> items
00097     [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
00098 
00099     Note that e.g. r.has_header("spam-EggS") is currently False, and
00100     r.get_header("spam-EggS") returns None, but that could be changed in
00101     future.
00102 
00103     >>> r.has_header("Not-there")
00104     False
00105     >>> print(r.get_header("Not-there"))
00106     None
00107     >>> r.get_header("Not-there", "default")
00108     'default'
00109 
00110     """
00111 
00112 
00113 def test_password_manager(self):
00114     """
00115     >>> mgr = urllib.request.HTTPPasswordMgr()
00116     >>> add = mgr.add_password
00117     >>> add("Some Realm", "http://example.com/", "joe", "password")
00118     >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
00119     >>> add("c", "http://example.com/foo", "foo", "ni")
00120     >>> add("c", "http://example.com/bar", "bar", "nini")
00121     >>> add("b", "http://example.com/", "first", "blah")
00122     >>> add("b", "http://example.com/", "second", "spam")
00123     >>> add("a", "http://example.com", "1", "a")
00124     >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
00125     >>> add("Some Realm", "d.example.com", "4", "d")
00126     >>> add("Some Realm", "e.example.com:3128", "5", "e")
00127 
00128     >>> mgr.find_user_password("Some Realm", "example.com")
00129     ('joe', 'password')
00130     >>> mgr.find_user_password("Some Realm", "http://example.com")
00131     ('joe', 'password')
00132     >>> mgr.find_user_password("Some Realm", "http://example.com/")
00133     ('joe', 'password')
00134     >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
00135     ('joe', 'password')
00136     >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
00137     ('joe', 'password')
00138     >>> mgr.find_user_password("c", "http://example.com/foo")
00139     ('foo', 'ni')
00140     >>> mgr.find_user_password("c", "http://example.com/bar")
00141     ('bar', 'nini')
00142 
00143     Actually, this is really undefined ATM
00144 ##     Currently, we use the highest-level path where more than one match:
00145 
00146 ##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
00147 ##     ('joe', 'password')
00148 
00149     Use latest add_password() in case of conflict:
00150 
00151     >>> mgr.find_user_password("b", "http://example.com/")
00152     ('second', 'spam')
00153 
00154     No special relationship between a.example.com and example.com:
00155 
00156     >>> mgr.find_user_password("a", "http://example.com/")
00157     ('1', 'a')
00158     >>> mgr.find_user_password("a", "http://a.example.com/")
00159     (None, None)
00160 
00161     Ports:
00162 
00163     >>> mgr.find_user_password("Some Realm", "c.example.com")
00164     (None, None)
00165     >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
00166     ('3', 'c')
00167     >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
00168     ('3', 'c')
00169     >>> mgr.find_user_password("Some Realm", "d.example.com")
00170     ('4', 'd')
00171     >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
00172     ('5', 'e')
00173 
00174     """
00175     pass
00176 
00177 
00178 def test_password_manager_default_port(self):
00179     """
00180     >>> mgr = urllib.request.HTTPPasswordMgr()
00181     >>> add = mgr.add_password
00182 
00183     The point to note here is that we can't guess the default port if there's
00184     no scheme.  This applies to both add_password and find_user_password.
00185 
00186     >>> add("f", "http://g.example.com:80", "10", "j")
00187     >>> add("g", "http://h.example.com", "11", "k")
00188     >>> add("h", "i.example.com:80", "12", "l")
00189     >>> add("i", "j.example.com", "13", "m")
00190     >>> mgr.find_user_password("f", "g.example.com:100")
00191     (None, None)
00192     >>> mgr.find_user_password("f", "g.example.com:80")
00193     ('10', 'j')
00194     >>> mgr.find_user_password("f", "g.example.com")
00195     (None, None)
00196     >>> mgr.find_user_password("f", "http://g.example.com:100")
00197     (None, None)
00198     >>> mgr.find_user_password("f", "http://g.example.com:80")
00199     ('10', 'j')
00200     >>> mgr.find_user_password("f", "http://g.example.com")
00201     ('10', 'j')
00202     >>> mgr.find_user_password("g", "h.example.com")
00203     ('11', 'k')
00204     >>> mgr.find_user_password("g", "h.example.com:80")
00205     ('11', 'k')
00206     >>> mgr.find_user_password("g", "http://h.example.com:80")
00207     ('11', 'k')
00208     >>> mgr.find_user_password("h", "i.example.com")
00209     (None, None)
00210     >>> mgr.find_user_password("h", "i.example.com:80")
00211     ('12', 'l')
00212     >>> mgr.find_user_password("h", "http://i.example.com:80")
00213     ('12', 'l')
00214     >>> mgr.find_user_password("i", "j.example.com")
00215     ('13', 'm')
00216     >>> mgr.find_user_password("i", "j.example.com:80")
00217     (None, None)
00218     >>> mgr.find_user_password("i", "http://j.example.com")
00219     ('13', 'm')
00220     >>> mgr.find_user_password("i", "http://j.example.com:80")
00221     (None, None)
00222 
00223     """
00224 
00225 class MockOpener:
00226     addheaders = []
00227     def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
00228         self.req, self.data, self.timeout = req, data, timeout
00229     def error(self, proto, *args):
00230         self.proto, self.args = proto, args
00231 
00232 class MockFile:
00233     def read(self, count=None): pass
00234     def readline(self, count=None): pass
00235     def close(self): pass
00236 
00237 class MockHeaders(dict):
00238     def getheaders(self, name):
00239         return list(self.values())
00240 
00241 class MockResponse(io.StringIO):
00242     def __init__(self, code, msg, headers, data, url=None):
00243         io.StringIO.__init__(self, data)
00244         self.code, self.msg, self.headers, self.url = code, msg, headers, url
00245     def info(self):
00246         return self.headers
00247     def geturl(self):
00248         return self.url
00249 
00250 class MockCookieJar:
00251     def add_cookie_header(self, request):
00252         self.ach_req = request
00253     def extract_cookies(self, response, request):
00254         self.ec_req, self.ec_r = request, response
00255 
00256 class FakeMethod:
00257     def __init__(self, meth_name, action, handle):
00258         self.meth_name = meth_name
00259         self.handle = handle
00260         self.action = action
00261     def __call__(self, *args):
00262         return self.handle(self.meth_name, self.action, *args)
00263 
00264 class MockHTTPResponse(io.IOBase):
00265     def __init__(self, fp, msg, status, reason):
00266         self.fp = fp
00267         self.msg = msg
00268         self.status = status
00269         self.reason = reason
00270         self.code = 200
00271 
00272     def read(self):
00273         return ''
00274 
00275     def info(self):
00276         return {}
00277 
00278     def geturl(self):
00279         return self.url
00280 
00281 
00282 class MockHTTPClass:
00283     def __init__(self):
00284         self.level = 0
00285         self.req_headers = []
00286         self.data = None
00287         self.raise_on_endheaders = False
00288         self._tunnel_headers = {}
00289 
00290     def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
00291         self.host = host
00292         self.timeout = timeout
00293         return self
00294 
00295     def set_debuglevel(self, level):
00296         self.level = level
00297 
00298     def set_tunnel(self, host, port=None, headers=None):
00299         self._tunnel_host = host
00300         self._tunnel_port = port
00301         if headers:
00302             self._tunnel_headers = headers
00303         else:
00304             self._tunnel_headers.clear()
00305 
00306     def request(self, method, url, body=None, headers=None):
00307         self.method = method
00308         self.selector = url
00309         if headers is not None:
00310             self.req_headers += headers.items()
00311         self.req_headers.sort()
00312         if body:
00313             self.data = body
00314         if self.raise_on_endheaders:
00315             import socket
00316             raise socket.error()
00317     def getresponse(self):
00318         return MockHTTPResponse(MockFile(), {}, 200, "OK")
00319 
00320     def close(self):
00321         pass
00322 
00323 class MockHandler:
00324     # useful for testing handler machinery
00325     # see add_ordered_mock_handlers() docstring
00326     handler_order = 500
00327     def __init__(self, methods):
00328         self._define_methods(methods)
00329     def _define_methods(self, methods):
00330         for spec in methods:
00331             if len(spec) == 2: name, action = spec
00332             else: name, action = spec, None
00333             meth = FakeMethod(name, action, self.handle)
00334             setattr(self.__class__, name, meth)
00335     def handle(self, fn_name, action, *args, **kwds):
00336         self.parent.calls.append((self, fn_name, args, kwds))
00337         if action is None:
00338             return None
00339         elif action == "return self":
00340             return self
00341         elif action == "return response":
00342             res = MockResponse(200, "OK", {}, "")
00343             return res
00344         elif action == "return request":
00345             return Request("http://blah/")
00346         elif action.startswith("error"):
00347             code = action[action.rfind(" ")+1:]
00348             try:
00349                 code = int(code)
00350             except ValueError:
00351                 pass
00352             res = MockResponse(200, "OK", {}, "")
00353             return self.parent.error("http", args[0], res, code, "", {})
00354         elif action == "raise":
00355             raise urllib.error.URLError("blah")
00356         assert False
00357     def close(self): pass
00358     def add_parent(self, parent):
00359         self.parent = parent
00360         self.parent.calls = []
00361     def __lt__(self, other):
00362         if not hasattr(other, "handler_order"):
00363             # No handler_order, leave in original order.  Yuck.
00364             return True
00365         return self.handler_order < other.handler_order
00366 
00367 def add_ordered_mock_handlers(opener, meth_spec):
00368     """Create MockHandlers and add them to an OpenerDirector.
00369 
00370     meth_spec: list of lists of tuples and strings defining methods to define
00371     on handlers.  eg:
00372 
00373     [["http_error", "ftp_open"], ["http_open"]]
00374 
00375     defines methods .http_error() and .ftp_open() on one handler, and
00376     .http_open() on another.  These methods just record their arguments and
00377     return None.  Using a tuple instead of a string causes the method to
00378     perform some action (see MockHandler.handle()), eg:
00379 
00380     [["http_error"], [("http_open", "return request")]]
00381 
00382     defines .http_error() on one handler (which simply returns None), and
00383     .http_open() on another handler, which returns a Request object.
00384 
00385     """
00386     handlers = []
00387     count = 0
00388     for meths in meth_spec:
00389         class MockHandlerSubclass(MockHandler): pass
00390         h = MockHandlerSubclass(meths)
00391         h.handler_order += count
00392         h.add_parent(opener)
00393         count = count + 1
00394         handlers.append(h)
00395         opener.add_handler(h)
00396     return handlers
00397 
00398 def build_test_opener(*handler_instances):
00399     opener = OpenerDirector()
00400     for h in handler_instances:
00401         opener.add_handler(h)
00402     return opener
00403 
00404 class MockHTTPHandler(urllib.request.BaseHandler):
00405     # useful for testing redirections and auth
00406     # sends supplied headers and code as first response
00407     # sends 200 OK as second response
00408     def __init__(self, code, headers):
00409         self.code = code
00410         self.headers = headers
00411         self.reset()
00412     def reset(self):
00413         self._count = 0
00414         self.requests = []
00415     def http_open(self, req):
00416         import email, http.client, copy
00417         from io import StringIO
00418         self.requests.append(copy.deepcopy(req))
00419         if self._count == 0:
00420             self._count = self._count + 1
00421             name = http.client.responses[self.code]
00422             msg = email.message_from_string(self.headers)
00423             return self.parent.error(
00424                 "http", req, MockFile(), self.code, name, msg)
00425         else:
00426             self.req = req
00427             msg = email.message_from_string("\r\n\r\n")
00428             return MockResponse(200, "OK", msg, "", req.get_full_url())
00429 
00430 class MockHTTPSHandler(urllib.request.AbstractHTTPHandler):
00431     # Useful for testing the Proxy-Authorization request by verifying the
00432     # properties of httpcon
00433 
00434     def __init__(self):
00435         urllib.request.AbstractHTTPHandler.__init__(self)
00436         self.httpconn = MockHTTPClass()
00437 
00438     def https_open(self, req):
00439         return self.do_open(self.httpconn, req)
00440 
00441 class MockPasswordManager:
00442     def add_password(self, realm, uri, user, password):
00443         self.realm = realm
00444         self.url = uri
00445         self.user = user
00446         self.password = password
00447     def find_user_password(self, realm, authuri):
00448         self.target_realm = realm
00449         self.target_url = authuri
00450         return self.user, self.password
00451 
00452 
00453 class OpenerDirectorTests(unittest.TestCase):
00454 
00455     def test_add_non_handler(self):
00456         class NonHandler(object):
00457             pass
00458         self.assertRaises(TypeError,
00459                           OpenerDirector().add_handler, NonHandler())
00460 
00461     def test_badly_named_methods(self):
00462         # test work-around for three methods that accidentally follow the
00463         # naming conventions for handler methods
00464         # (*_open() / *_request() / *_response())
00465 
00466         # These used to call the accidentally-named methods, causing a
00467         # TypeError in real code; here, returning self from these mock
00468         # methods would either cause no exception, or AttributeError.
00469 
00470         from urllib.error import URLError
00471 
00472         o = OpenerDirector()
00473         meth_spec = [
00474             [("do_open", "return self"), ("proxy_open", "return self")],
00475             [("redirect_request", "return self")],
00476             ]
00477         handlers = add_ordered_mock_handlers(o, meth_spec)
00478         o.add_handler(urllib.request.UnknownHandler())
00479         for scheme in "do", "proxy", "redirect":
00480             self.assertRaises(URLError, o.open, scheme+"://example.com/")
00481 
00482     def test_handled(self):
00483         # handler returning non-None means no more handlers will be called
00484         o = OpenerDirector()
00485         meth_spec = [
00486             ["http_open", "ftp_open", "http_error_302"],
00487             ["ftp_open"],
00488             [("http_open", "return self")],
00489             [("http_open", "return self")],
00490             ]
00491         handlers = add_ordered_mock_handlers(o, meth_spec)
00492 
00493         req = Request("http://example.com/")
00494         r = o.open(req)
00495         # Second .http_open() gets called, third doesn't, since second returned
00496         # non-None.  Handlers without .http_open() never get any methods called
00497         # on them.
00498         # In fact, second mock handler defining .http_open() returns self
00499         # (instead of response), which becomes the OpenerDirector's return
00500         # value.
00501         self.assertEqual(r, handlers[2])
00502         calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
00503         for expected, got in zip(calls, o.calls):
00504             handler, name, args, kwds = got
00505             self.assertEqual((handler, name), expected)
00506             self.assertEqual(args, (req,))
00507 
00508     def test_handler_order(self):
00509         o = OpenerDirector()
00510         handlers = []
00511         for meths, handler_order in [
00512             ([("http_open", "return self")], 500),
00513             (["http_open"], 0),
00514             ]:
00515             class MockHandlerSubclass(MockHandler): pass
00516             h = MockHandlerSubclass(meths)
00517             h.handler_order = handler_order
00518             handlers.append(h)
00519             o.add_handler(h)
00520 
00521         r = o.open("http://example.com/")
00522         # handlers called in reverse order, thanks to their sort order
00523         self.assertEqual(o.calls[0][0], handlers[1])
00524         self.assertEqual(o.calls[1][0], handlers[0])
00525 
00526     def test_raise(self):
00527         # raising URLError stops processing of request
00528         o = OpenerDirector()
00529         meth_spec = [
00530             [("http_open", "raise")],
00531             [("http_open", "return self")],
00532             ]
00533         handlers = add_ordered_mock_handlers(o, meth_spec)
00534 
00535         req = Request("http://example.com/")
00536         self.assertRaises(urllib.error.URLError, o.open, req)
00537         self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
00538 
00539 ##     def test_error(self):
00540 ##         # XXX this doesn't actually seem to be used in standard library,
00541 ##         #  but should really be tested anyway...
00542 
00543     def test_http_error(self):
00544         # XXX http_error_default
00545         # http errors are a special case
00546         o = OpenerDirector()
00547         meth_spec = [
00548             [("http_open", "error 302")],
00549             [("http_error_400", "raise"), "http_open"],
00550             [("http_error_302", "return response"), "http_error_303",
00551              "http_error"],
00552             [("http_error_302")],
00553             ]
00554         handlers = add_ordered_mock_handlers(o, meth_spec)
00555 
00556         class Unknown:
00557             def __eq__(self, other): return True
00558 
00559         req = Request("http://example.com/")
00560         r = o.open(req)
00561         assert len(o.calls) == 2
00562         calls = [(handlers[0], "http_open", (req,)),
00563                  (handlers[2], "http_error_302",
00564                   (req, Unknown(), 302, "", {}))]
00565         for expected, got in zip(calls, o.calls):
00566             handler, method_name, args = expected
00567             self.assertEqual((handler, method_name), got[:2])
00568             self.assertEqual(args, got[2])
00569 
00570     def test_processors(self):
00571         # *_request / *_response methods get called appropriately
00572         o = OpenerDirector()
00573         meth_spec = [
00574             [("http_request", "return request"),
00575              ("http_response", "return response")],
00576             [("http_request", "return request"),
00577              ("http_response", "return response")],
00578             ]
00579         handlers = add_ordered_mock_handlers(o, meth_spec)
00580 
00581         req = Request("http://example.com/")
00582         r = o.open(req)
00583         # processor methods are called on *all* handlers that define them,
00584         # not just the first handler that handles the request
00585         calls = [
00586             (handlers[0], "http_request"), (handlers[1], "http_request"),
00587             (handlers[0], "http_response"), (handlers[1], "http_response")]
00588 
00589         for i, (handler, name, args, kwds) in enumerate(o.calls):
00590             if i < 2:
00591                 # *_request
00592                 self.assertEqual((handler, name), calls[i])
00593                 self.assertEqual(len(args), 1)
00594                 self.assertIsInstance(args[0], Request)
00595             else:
00596                 # *_response
00597                 self.assertEqual((handler, name), calls[i])
00598                 self.assertEqual(len(args), 2)
00599                 self.assertIsInstance(args[0], Request)
00600                 # response from opener.open is None, because there's no
00601                 # handler that defines http_open to handle it
00602                 self.assertTrue(args[1] is None or
00603                              isinstance(args[1], MockResponse))
00604 
00605 
00606 def sanepathname2url(path):
00607     try:
00608         path.encode("utf8")
00609     except UnicodeEncodeError:
00610         raise unittest.SkipTest("path is not encodable to utf8")
00611     urlpath = urllib.request.pathname2url(path)
00612     if os.name == "nt" and urlpath.startswith("///"):
00613         urlpath = urlpath[2:]
00614     # XXX don't ask me about the mac...
00615     return urlpath
00616 
00617 class HandlerTests(unittest.TestCase):
00618 
00619     def test_ftp(self):
00620         class MockFTPWrapper:
00621             def __init__(self, data): self.data = data
00622             def retrfile(self, filename, filetype):
00623                 self.filename, self.filetype = filename, filetype
00624                 return io.StringIO(self.data), len(self.data)
00625             def close(self): pass
00626 
00627         class NullFTPHandler(urllib.request.FTPHandler):
00628             def __init__(self, data): self.data = data
00629             def connect_ftp(self, user, passwd, host, port, dirs,
00630                             timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
00631                 self.user, self.passwd = user, passwd
00632                 self.host, self.port = host, port
00633                 self.dirs = dirs
00634                 self.ftpwrapper = MockFTPWrapper(self.data)
00635                 return self.ftpwrapper
00636 
00637         import ftplib
00638         data = "rheum rhaponicum"
00639         h = NullFTPHandler(data)
00640         o = h.parent = MockOpener()
00641 
00642         for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
00643             ("ftp://localhost/foo/bar/baz.html",
00644              "localhost", ftplib.FTP_PORT, "", "", "I",
00645              ["foo", "bar"], "baz.html", "text/html"),
00646             ("ftp://parrot@localhost/foo/bar/baz.html",
00647              "localhost", ftplib.FTP_PORT, "parrot", "", "I",
00648              ["foo", "bar"], "baz.html", "text/html"),
00649             ("ftp://%25parrot@localhost/foo/bar/baz.html",
00650              "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
00651              ["foo", "bar"], "baz.html", "text/html"),
00652             ("ftp://%2542parrot@localhost/foo/bar/baz.html",
00653              "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
00654              ["foo", "bar"], "baz.html", "text/html"),
00655             ("ftp://localhost:80/foo/bar/",
00656              "localhost", 80, "", "", "D",
00657              ["foo", "bar"], "", None),
00658             ("ftp://localhost/baz.gif;type=a",
00659              "localhost", ftplib.FTP_PORT, "", "", "A",
00660              [], "baz.gif", None),  # XXX really this should guess image/gif
00661             ]:
00662             req = Request(url)
00663             req.timeout = None
00664             r = h.ftp_open(req)
00665             # ftp authentication not yet implemented by FTPHandler
00666             self.assertEqual(h.user, user)
00667             self.assertEqual(h.passwd, passwd)
00668             self.assertEqual(h.host, socket.gethostbyname(host))
00669             self.assertEqual(h.port, port)
00670             self.assertEqual(h.dirs, dirs)
00671             self.assertEqual(h.ftpwrapper.filename, filename)
00672             self.assertEqual(h.ftpwrapper.filetype, type_)
00673             headers = r.info()
00674             self.assertEqual(headers.get("Content-type"), mimetype)
00675             self.assertEqual(int(headers["Content-length"]), len(data))
00676 
00677     def test_file(self):
00678         import email.utils, socket
00679         h = urllib.request.FileHandler()
00680         o = h.parent = MockOpener()
00681 
00682         TESTFN = support.TESTFN
00683         urlpath = sanepathname2url(os.path.abspath(TESTFN))
00684         towrite = b"hello, world\n"
00685         urls = [
00686             "file://localhost%s" % urlpath,
00687             "file://%s" % urlpath,
00688             "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
00689             ]
00690         try:
00691             localaddr = socket.gethostbyname(socket.gethostname())
00692         except socket.gaierror:
00693             localaddr = ''
00694         if localaddr:
00695             urls.append("file://%s%s" % (localaddr, urlpath))
00696 
00697         for url in urls:
00698             f = open(TESTFN, "wb")
00699             try:
00700                 try:
00701                     f.write(towrite)
00702                 finally:
00703                     f.close()
00704 
00705                 r = h.file_open(Request(url))
00706                 try:
00707                     data = r.read()
00708                     headers = r.info()
00709                     respurl = r.geturl()
00710                 finally:
00711                     r.close()
00712                 stats = os.stat(TESTFN)
00713                 modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
00714             finally:
00715                 os.remove(TESTFN)
00716             self.assertEqual(data, towrite)
00717             self.assertEqual(headers["Content-type"], "text/plain")
00718             self.assertEqual(headers["Content-length"], "13")
00719             self.assertEqual(headers["Last-modified"], modified)
00720             self.assertEqual(respurl, url)
00721 
00722         for url in [
00723             "file://localhost:80%s" % urlpath,
00724             "file:///file_does_not_exist.txt",
00725             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
00726                                    os.getcwd(), TESTFN),
00727             "file://somerandomhost.ontheinternet.com%s/%s" %
00728             (os.getcwd(), TESTFN),
00729             ]:
00730             try:
00731                 f = open(TESTFN, "wb")
00732                 try:
00733                     f.write(towrite)
00734                 finally:
00735                     f.close()
00736 
00737                 self.assertRaises(urllib.error.URLError,
00738                                   h.file_open, Request(url))
00739             finally:
00740                 os.remove(TESTFN)
00741 
00742         h = urllib.request.FileHandler()
00743         o = h.parent = MockOpener()
00744         # XXXX why does // mean ftp (and /// mean not ftp!), and where
00745         #  is file: scheme specified?  I think this is really a bug, and
00746         #  what was intended was to distinguish between URLs like:
00747         # file:/blah.txt (a file)
00748         # file://localhost/blah.txt (a file)
00749         # file:///blah.txt (a file)
00750         # file://ftp.example.com/blah.txt (an ftp URL)
00751         for url, ftp in [
00752             ("file://ftp.example.com//foo.txt", False),
00753             ("file://ftp.example.com///foo.txt", False),
00754 # XXXX bug: fails with OSError, should be URLError
00755             ("file://ftp.example.com/foo.txt", False),
00756             ("file://somehost//foo/something.txt", False),
00757             ("file://localhost//foo/something.txt", False),
00758             ]:
00759             req = Request(url)
00760             try:
00761                 h.file_open(req)
00762             # XXXX remove OSError when bug fixed
00763             except (urllib.error.URLError, OSError):
00764                 self.assertFalse(ftp)
00765             else:
00766                 self.assertIs(o.req, req)
00767                 self.assertEqual(req.type, "ftp")
00768             self.assertEqual(req.type == "ftp", ftp)
00769 
00770     def test_http(self):
00771 
00772         h = urllib.request.AbstractHTTPHandler()
00773         o = h.parent = MockOpener()
00774 
00775         url = "http://example.com/"
00776         for method, data in [("GET", None), ("POST", b"blah")]:
00777             req = Request(url, data, {"Foo": "bar"})
00778             req.timeout = None
00779             req.add_unredirected_header("Spam", "eggs")
00780             http = MockHTTPClass()
00781             r = h.do_open(http, req)
00782 
00783             # result attributes
00784             r.read; r.readline  # wrapped MockFile methods
00785             r.info; r.geturl  # addinfourl methods
00786             r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
00787             hdrs = r.info()
00788             hdrs.get; hdrs.__contains__  # r.info() gives dict from .getreply()
00789             self.assertEqual(r.geturl(), url)
00790 
00791             self.assertEqual(http.host, "example.com")
00792             self.assertEqual(http.level, 0)
00793             self.assertEqual(http.method, method)
00794             self.assertEqual(http.selector, "/")
00795             self.assertEqual(http.req_headers,
00796                              [("Connection", "close"),
00797                               ("Foo", "bar"), ("Spam", "eggs")])
00798             self.assertEqual(http.data, data)
00799 
00800         # check socket.error converted to URLError
00801         http.raise_on_endheaders = True
00802         self.assertRaises(urllib.error.URLError, h.do_open, http, req)
00803 
00804         # Check for TypeError on POST data which is str.
00805         req = Request("http://example.com/","badpost")
00806         self.assertRaises(TypeError, h.do_request_, req)
00807 
00808         # check adding of standard headers
00809         o.addheaders = [("Spam", "eggs")]
00810         for data in b"", None:  # POST, GET
00811             req = Request("http://example.com/", data)
00812             r = MockResponse(200, "OK", {}, "")
00813             newreq = h.do_request_(req)
00814             if data is None:  # GET
00815                 self.assertNotIn("Content-length", req.unredirected_hdrs)
00816                 self.assertNotIn("Content-type", req.unredirected_hdrs)
00817             else:  # POST
00818                 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
00819                 self.assertEqual(req.unredirected_hdrs["Content-type"],
00820                              "application/x-www-form-urlencoded")
00821             # XXX the details of Host could be better tested
00822             self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
00823             self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
00824 
00825             # don't clobber existing headers
00826             req.add_unredirected_header("Content-length", "foo")
00827             req.add_unredirected_header("Content-type", "bar")
00828             req.add_unredirected_header("Host", "baz")
00829             req.add_unredirected_header("Spam", "foo")
00830             newreq = h.do_request_(req)
00831             self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
00832             self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
00833             self.assertEqual(req.unredirected_hdrs["Host"], "baz")
00834             self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
00835 
00836         # Check iterable body support
00837         def iterable_body():
00838             yield b"one"
00839             yield b"two"
00840             yield b"three"
00841 
00842         for headers in {}, {"Content-Length": 11}:
00843             req = Request("http://example.com/", iterable_body(), headers)
00844             if not headers:
00845                 # Having an iterable body without a Content-Length should
00846                 # raise an exception
00847                 self.assertRaises(ValueError, h.do_request_, req)
00848             else:
00849                 newreq = h.do_request_(req)
00850 
00851         # A file object.
00852         # Test only Content-Length attribute of request.
00853 
00854         file_obj = io.BytesIO()
00855         file_obj.write(b"Something\nSomething\nSomething\n")
00856 
00857         for headers in {}, {"Content-Length": 30}:
00858             req = Request("http://example.com/", file_obj, headers)
00859             if not headers:
00860                 # Having an iterable body without a Content-Length should
00861                 # raise an exception
00862                 self.assertRaises(ValueError, h.do_request_, req)
00863             else:
00864                 newreq = h.do_request_(req)
00865                 self.assertEqual(int(newreq.get_header('Content-length')),30)
00866 
00867         file_obj.close()
00868 
00869         # array.array Iterable - Content Length is calculated
00870 
00871         iterable_array = array.array("I",[1,2,3,4])
00872 
00873         for headers in {}, {"Content-Length": 16}:
00874             req = Request("http://example.com/", iterable_array, headers)
00875             newreq = h.do_request_(req)
00876             self.assertEqual(int(newreq.get_header('Content-length')),16)
00877 
00878     def test_http_doubleslash(self):
00879         # Checks the presence of any unnecessary double slash in url does not
00880         # break anything. Previously, a double slash directly after the host
00881         # could could cause incorrect parsing.
00882         h = urllib.request.AbstractHTTPHandler()
00883         o = h.parent = MockOpener()
00884 
00885         data = b""
00886         ds_urls = [
00887             "http://example.com/foo/bar/baz.html",
00888             "http://example.com//foo/bar/baz.html",
00889             "http://example.com/foo//bar/baz.html",
00890             "http://example.com/foo/bar//baz.html"
00891             ]
00892 
00893         for ds_url in ds_urls:
00894             ds_req = Request(ds_url, data)
00895 
00896             # Check whether host is determined correctly if there is no proxy
00897             np_ds_req = h.do_request_(ds_req)
00898             self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
00899 
00900             # Check whether host is determined correctly if there is a proxy
00901             ds_req.set_proxy("someproxy:3128",None)
00902             p_ds_req = h.do_request_(ds_req)
00903             self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
00904 
00905     def test_fixpath_in_weirdurls(self):
00906         # Issue4493: urllib2 to supply '/' when to urls where path does not
00907         # start with'/'
00908 
00909         h = urllib.request.AbstractHTTPHandler()
00910         o = h.parent = MockOpener()
00911 
00912         weird_url = 'http://www.python.org?getspam'
00913         req = Request(weird_url)
00914         newreq = h.do_request_(req)
00915         self.assertEqual(newreq.host,'www.python.org')
00916         self.assertEqual(newreq.selector,'/?getspam')
00917 
00918         url_without_path = 'http://www.python.org'
00919         req = Request(url_without_path)
00920         newreq = h.do_request_(req)
00921         self.assertEqual(newreq.host,'www.python.org')
00922         self.assertEqual(newreq.selector,'')
00923 
00924 
00925     def test_errors(self):
00926         h = urllib.request.HTTPErrorProcessor()
00927         o = h.parent = MockOpener()
00928 
00929         url = "http://example.com/"
00930         req = Request(url)
00931         # all 2xx are passed through
00932         r = MockResponse(200, "OK", {}, "", url)
00933         newr = h.http_response(req, r)
00934         self.assertIs(r, newr)
00935         self.assertFalse(hasattr(o, "proto"))  # o.error not called
00936         r = MockResponse(202, "Accepted", {}, "", url)
00937         newr = h.http_response(req, r)
00938         self.assertIs(r, newr)
00939         self.assertFalse(hasattr(o, "proto"))  # o.error not called
00940         r = MockResponse(206, "Partial content", {}, "", url)
00941         newr = h.http_response(req, r)
00942         self.assertIs(r, newr)
00943         self.assertFalse(hasattr(o, "proto"))  # o.error not called
00944         # anything else calls o.error (and MockOpener returns None, here)
00945         r = MockResponse(502, "Bad gateway", {}, "", url)
00946         self.assertIsNone(h.http_response(req, r))
00947         self.assertEqual(o.proto, "http")  # o.error called
00948         self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
00949 
00950     def test_cookies(self):
00951         cj = MockCookieJar()
00952         h = urllib.request.HTTPCookieProcessor(cj)
00953         o = h.parent = MockOpener()
00954 
00955         req = Request("http://example.com/")
00956         r = MockResponse(200, "OK", {}, "")
00957         newreq = h.http_request(req)
00958         self.assertIs(cj.ach_req, req)
00959         self.assertIs(cj.ach_req, newreq)
00960         self.assertEqual(req.get_origin_req_host(), "example.com")
00961         self.assertFalse(req.is_unverifiable())
00962         newr = h.http_response(req, r)
00963         self.assertIs(cj.ec_req, req)
00964         self.assertIs(cj.ec_r, r)
00965         self.assertIs(r, newr)
00966 
00967     def test_redirect(self):
00968         from_url = "http://example.com/a.html"
00969         to_url = "http://example.com/b.html"
00970         h = urllib.request.HTTPRedirectHandler()
00971         o = h.parent = MockOpener()
00972 
00973         # ordinary redirect behaviour
00974         for code in 301, 302, 303, 307:
00975             for data in None, "blah\nblah\n":
00976                 method = getattr(h, "http_error_%s" % code)
00977                 req = Request(from_url, data)
00978                 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
00979                 req.add_header("Nonsense", "viking=withhold")
00980                 if data is not None:
00981                     req.add_header("Content-Length", str(len(data)))
00982                 req.add_unredirected_header("Spam", "spam")
00983                 try:
00984                     method(req, MockFile(), code, "Blah",
00985                            MockHeaders({"location": to_url}))
00986                 except urllib.error.HTTPError:
00987                     # 307 in response to POST requires user OK
00988                     self.assertTrue(code == 307 and data is not None)
00989                 self.assertEqual(o.req.get_full_url(), to_url)
00990                 try:
00991                     self.assertEqual(o.req.get_method(), "GET")
00992                 except AttributeError:
00993                     self.assertFalse(o.req.has_data())
00994 
00995                 # now it's a GET, there should not be headers regarding content
00996                 # (possibly dragged from before being a POST)
00997                 headers = [x.lower() for x in o.req.headers]
00998                 self.assertNotIn("content-length", headers)
00999                 self.assertNotIn("content-type", headers)
01000 
01001                 self.assertEqual(o.req.headers["Nonsense"],
01002                                  "viking=withhold")
01003                 self.assertNotIn("Spam", o.req.headers)
01004                 self.assertNotIn("Spam", o.req.unredirected_hdrs)
01005 
01006         # loop detection
01007         req = Request(from_url)
01008         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
01009         def redirect(h, req, url=to_url):
01010             h.http_error_302(req, MockFile(), 302, "Blah",
01011                              MockHeaders({"location": url}))
01012         # Note that the *original* request shares the same record of
01013         # redirections with the sub-requests caused by the redirections.
01014 
01015         # detect infinite loop redirect of a URL to itself
01016         req = Request(from_url, origin_req_host="example.com")
01017         count = 0
01018         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
01019         try:
01020             while 1:
01021                 redirect(h, req, "http://example.com/")
01022                 count = count + 1
01023         except urllib.error.HTTPError:
01024             # don't stop until max_repeats, because cookies may introduce state
01025             self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats)
01026 
01027         # detect endless non-repeating chain of redirects
01028         req = Request(from_url, origin_req_host="example.com")
01029         count = 0
01030         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
01031         try:
01032             while 1:
01033                 redirect(h, req, "http://example.com/%d" % count)
01034                 count = count + 1
01035         except urllib.error.HTTPError:
01036             self.assertEqual(count,
01037                              urllib.request.HTTPRedirectHandler.max_redirections)
01038 
01039 
01040     def test_invalid_redirect(self):
01041         from_url = "http://example.com/a.html"
01042         valid_schemes = ['http','https','ftp']
01043         invalid_schemes = ['file','imap','ldap']
01044         schemeless_url = "example.com/b.html"
01045         h = urllib.request.HTTPRedirectHandler()
01046         o = h.parent = MockOpener()
01047         req = Request(from_url)
01048         req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
01049 
01050         for scheme in invalid_schemes:
01051             invalid_url = scheme + '://' + schemeless_url
01052             self.assertRaises(urllib.error.HTTPError, h.http_error_302,
01053                     req, MockFile(), 302, "Security Loophole",
01054                     MockHeaders({"location": invalid_url}))
01055 
01056         for scheme in valid_schemes:
01057             valid_url = scheme + '://' + schemeless_url
01058             h.http_error_302(req, MockFile(), 302, "That's fine",
01059                 MockHeaders({"location": valid_url}))
01060             self.assertEqual(o.req.get_full_url(), valid_url)
01061 
01062     def test_cookie_redirect(self):
01063         # cookies shouldn't leak into redirected requests
01064         from http.cookiejar import CookieJar
01065         from test.test_http_cookiejar import interact_netscape
01066 
01067         cj = CookieJar()
01068         interact_netscape(cj, "http://www.example.com/", "spam=eggs")
01069         hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
01070         hdeh = urllib.request.HTTPDefaultErrorHandler()
01071         hrh = urllib.request.HTTPRedirectHandler()
01072         cp = urllib.request.HTTPCookieProcessor(cj)
01073         o = build_test_opener(hh, hdeh, hrh, cp)
01074         o.open("http://www.example.com/")
01075         self.assertFalse(hh.req.has_header("Cookie"))
01076 
01077     def test_redirect_fragment(self):
01078         redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
01079         hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
01080         hdeh = urllib.request.HTTPDefaultErrorHandler()
01081         hrh = urllib.request.HTTPRedirectHandler()
01082         o = build_test_opener(hh, hdeh, hrh)
01083         fp = o.open('http://www.example.com')
01084         self.assertEqual(fp.geturl(), redirected_url.strip())
01085 
01086     def test_proxy(self):
01087         o = OpenerDirector()
01088         ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128"))
01089         o.add_handler(ph)
01090         meth_spec = [
01091             [("http_open", "return response")]
01092             ]
01093         handlers = add_ordered_mock_handlers(o, meth_spec)
01094 
01095         req = Request("http://acme.example.com/")
01096         self.assertEqual(req.get_host(), "acme.example.com")
01097         r = o.open(req)
01098         self.assertEqual(req.get_host(), "proxy.example.com:3128")
01099 
01100         self.assertEqual([(handlers[0], "http_open")],
01101                          [tup[0:2] for tup in o.calls])
01102 
01103     def test_proxy_no_proxy(self):
01104         os.environ['no_proxy'] = 'python.org'
01105         o = OpenerDirector()
01106         ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
01107         o.add_handler(ph)
01108         req = Request("http://www.perl.org/")
01109         self.assertEqual(req.get_host(), "www.perl.org")
01110         r = o.open(req)
01111         self.assertEqual(req.get_host(), "proxy.example.com")
01112         req = Request("http://www.python.org")
01113         self.assertEqual(req.get_host(), "www.python.org")
01114         r = o.open(req)
01115         self.assertEqual(req.get_host(), "www.python.org")
01116         del os.environ['no_proxy']
01117 
01118     def test_proxy_no_proxy_all(self):
01119         os.environ['no_proxy'] = '*'
01120         o = OpenerDirector()
01121         ph = urllib.request.ProxyHandler(dict(http="proxy.example.com"))
01122         o.add_handler(ph)
01123         req = Request("http://www.python.org")
01124         self.assertEqual(req.get_host(), "www.python.org")
01125         r = o.open(req)
01126         self.assertEqual(req.get_host(), "www.python.org")
01127         del os.environ['no_proxy']
01128 
01129 
01130     def test_proxy_https(self):
01131         o = OpenerDirector()
01132         ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128"))
01133         o.add_handler(ph)
01134         meth_spec = [
01135             [("https_open", "return response")]
01136         ]
01137         handlers = add_ordered_mock_handlers(o, meth_spec)
01138 
01139         req = Request("https://www.example.com/")
01140         self.assertEqual(req.get_host(), "www.example.com")
01141         r = o.open(req)
01142         self.assertEqual(req.get_host(), "proxy.example.com:3128")
01143         self.assertEqual([(handlers[0], "https_open")],
01144                          [tup[0:2] for tup in o.calls])
01145 
01146     def test_proxy_https_proxy_authorization(self):
01147         o = OpenerDirector()
01148         ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128'))
01149         o.add_handler(ph)
01150         https_handler = MockHTTPSHandler()
01151         o.add_handler(https_handler)
01152         req = Request("https://www.example.com/")
01153         req.add_header("Proxy-Authorization","FooBar")
01154         req.add_header("User-Agent","Grail")
01155         self.assertEqual(req.get_host(), "www.example.com")
01156         self.assertIsNone(req._tunnel_host)
01157         r = o.open(req)
01158         # Verify Proxy-Authorization gets tunneled to request.
01159         # httpsconn req_headers do not have the Proxy-Authorization header but
01160         # the req will have.
01161         self.assertNotIn(("Proxy-Authorization","FooBar"),
01162                          https_handler.httpconn.req_headers)
01163         self.assertIn(("User-Agent","Grail"),
01164                       https_handler.httpconn.req_headers)
01165         self.assertIsNotNone(req._tunnel_host)
01166         self.assertEqual(req.get_host(), "proxy.example.com:3128")
01167         self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
01168 
01169     def test_osx_proxy_bypass(self):
01170         bypass = {
01171             'exclude_simple': False,
01172             'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10',
01173                            '10.0/16']
01174         }
01175         # Check hosts that should trigger the proxy bypass
01176         for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1',
01177                      '10.0.0.1'):
01178             self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
01179                             'expected bypass of %s to be True' % host)
01180         # Check hosts that should not trigger the proxy bypass
01181         for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1', 'test'):
01182             self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
01183                              'expected bypass of %s to be False' % host)
01184 
01185         # Check the exclude_simple flag
01186         bypass = {'exclude_simple': True, 'exceptions': []}
01187         self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass))
01188 
01189     def test_basic_auth(self, quote_char='"'):
01190         opener = OpenerDirector()
01191         password_manager = MockPasswordManager()
01192         auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager)
01193         realm = "ACME Widget Store"
01194         http_handler = MockHTTPHandler(
01195             401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
01196             (quote_char, realm, quote_char) )
01197         opener.add_handler(auth_handler)
01198         opener.add_handler(http_handler)
01199         self._test_basic_auth(opener, auth_handler, "Authorization",
01200                               realm, http_handler, password_manager,
01201                               "http://acme.example.com/protected",
01202                               "http://acme.example.com/protected",
01203                               )
01204 
01205     def test_basic_auth_with_single_quoted_realm(self):
01206         self.test_basic_auth(quote_char="'")
01207 
01208     def test_proxy_basic_auth(self):
01209         opener = OpenerDirector()
01210         ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128"))
01211         opener.add_handler(ph)
01212         password_manager = MockPasswordManager()
01213         auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager)
01214         realm = "ACME Networks"
01215         http_handler = MockHTTPHandler(
01216             407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
01217         opener.add_handler(auth_handler)
01218         opener.add_handler(http_handler)
01219         self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
01220                               realm, http_handler, password_manager,
01221                               "http://acme.example.com:3128/protected",
01222                               "proxy.example.com:3128",
01223                               )
01224 
01225     def test_basic_and_digest_auth_handlers(self):
01226         # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
01227         # response (http://python.org/sf/1479302), where it should instead
01228         # return None to allow another handler (especially
01229         # HTTPBasicAuthHandler) to handle the response.
01230 
01231         # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
01232         # try digest first (since it's the strongest auth scheme), so we record
01233         # order of calls here to check digest comes first:
01234         class RecordingOpenerDirector(OpenerDirector):
01235             def __init__(self):
01236                 OpenerDirector.__init__(self)
01237                 self.recorded = []
01238             def record(self, info):
01239                 self.recorded.append(info)
01240         class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler):
01241             def http_error_401(self, *args, **kwds):
01242                 self.parent.record("digest")
01243                 urllib.request.HTTPDigestAuthHandler.http_error_401(self,
01244                                                              *args, **kwds)
01245         class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler):
01246             def http_error_401(self, *args, **kwds):
01247                 self.parent.record("basic")
01248                 urllib.request.HTTPBasicAuthHandler.http_error_401(self,
01249                                                             *args, **kwds)
01250 
01251         opener = RecordingOpenerDirector()
01252         password_manager = MockPasswordManager()
01253         digest_handler = TestDigestAuthHandler(password_manager)
01254         basic_handler = TestBasicAuthHandler(password_manager)
01255         realm = "ACME Networks"
01256         http_handler = MockHTTPHandler(
01257             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
01258         opener.add_handler(basic_handler)
01259         opener.add_handler(digest_handler)
01260         opener.add_handler(http_handler)
01261 
01262         # check basic auth isn't blocked by digest handler failing
01263         self._test_basic_auth(opener, basic_handler, "Authorization",
01264                               realm, http_handler, password_manager,
01265                               "http://acme.example.com/protected",
01266                               "http://acme.example.com/protected",
01267                               )
01268         # check digest was tried before basic (twice, because
01269         # _test_basic_auth called .open() twice)
01270         self.assertEqual(opener.recorded, ["digest", "basic"]*2)
01271 
01272     def _test_basic_auth(self, opener, auth_handler, auth_header,
01273                          realm, http_handler, password_manager,
01274                          request_url, protected_url):
01275         import base64
01276         user, password = "wile", "coyote"
01277 
01278         # .add_password() fed through to password manager
01279         auth_handler.add_password(realm, request_url, user, password)
01280         self.assertEqual(realm, password_manager.realm)
01281         self.assertEqual(request_url, password_manager.url)
01282         self.assertEqual(user, password_manager.user)
01283         self.assertEqual(password, password_manager.password)
01284 
01285         r = opener.open(request_url)
01286 
01287         # should have asked the password manager for the username/password
01288         self.assertEqual(password_manager.target_realm, realm)
01289         self.assertEqual(password_manager.target_url, protected_url)
01290 
01291         # expect one request without authorization, then one with
01292         self.assertEqual(len(http_handler.requests), 2)
01293         self.assertFalse(http_handler.requests[0].has_header(auth_header))
01294         userpass = bytes('%s:%s' % (user, password), "ascii")
01295         auth_hdr_value = ('Basic ' +
01296             base64.encodebytes(userpass).strip().decode())
01297         self.assertEqual(http_handler.requests[1].get_header(auth_header),
01298                          auth_hdr_value)
01299         self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
01300                          auth_hdr_value)
01301         # if the password manager can't find a password, the handler won't
01302         # handle the HTTP auth error
01303         password_manager.user = password_manager.password = None
01304         http_handler.reset()
01305         r = opener.open(request_url)
01306         self.assertEqual(len(http_handler.requests), 1)
01307         self.assertFalse(http_handler.requests[0].has_header(auth_header))
01308 
01309 class MiscTests(unittest.TestCase):
01310 
01311     def test_build_opener(self):
01312         class MyHTTPHandler(urllib.request.HTTPHandler): pass
01313         class FooHandler(urllib.request.BaseHandler):
01314             def foo_open(self): pass
01315         class BarHandler(urllib.request.BaseHandler):
01316             def bar_open(self): pass
01317 
01318         build_opener = urllib.request.build_opener
01319 
01320         o = build_opener(FooHandler, BarHandler)
01321         self.opener_has_handler(o, FooHandler)
01322         self.opener_has_handler(o, BarHandler)
01323 
01324         # can take a mix of classes and instances
01325         o = build_opener(FooHandler, BarHandler())
01326         self.opener_has_handler(o, FooHandler)
01327         self.opener_has_handler(o, BarHandler)
01328 
01329         # subclasses of default handlers override default handlers
01330         o = build_opener(MyHTTPHandler)
01331         self.opener_has_handler(o, MyHTTPHandler)
01332 
01333         # a particular case of overriding: default handlers can be passed
01334         # in explicitly
01335         o = build_opener()
01336         self.opener_has_handler(o, urllib.request.HTTPHandler)
01337         o = build_opener(urllib.request.HTTPHandler)
01338         self.opener_has_handler(o, urllib.request.HTTPHandler)
01339         o = build_opener(urllib.request.HTTPHandler())
01340         self.opener_has_handler(o, urllib.request.HTTPHandler)
01341 
01342         # Issue2670: multiple handlers sharing the same base class
01343         class MyOtherHTTPHandler(urllib.request.HTTPHandler): pass
01344         o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
01345         self.opener_has_handler(o, MyHTTPHandler)
01346         self.opener_has_handler(o, MyOtherHTTPHandler)
01347 
01348     def opener_has_handler(self, opener, handler_class):
01349         self.assertTrue(any(h.__class__ == handler_class
01350                             for h in opener.handlers))
01351 
01352 class RequestTests(unittest.TestCase):
01353 
01354     def setUp(self):
01355         self.get = Request("http://www.python.org/~jeremy/")
01356         self.post = Request("http://www.python.org/~jeremy/",
01357                             "data",
01358                             headers={"X-Test": "test"})
01359 
01360     def test_method(self):
01361         self.assertEqual("POST", self.post.get_method())
01362         self.assertEqual("GET", self.get.get_method())
01363 
01364     def test_add_data(self):
01365         self.assertFalse(self.get.has_data())
01366         self.assertEqual("GET", self.get.get_method())
01367         self.get.add_data("spam")
01368         self.assertTrue(self.get.has_data())
01369         self.assertEqual("POST", self.get.get_method())
01370 
01371     def test_get_full_url(self):
01372         self.assertEqual("http://www.python.org/~jeremy/",
01373                          self.get.get_full_url())
01374 
01375     def test_selector(self):
01376         self.assertEqual("/~jeremy/", self.get.get_selector())
01377         req = Request("http://www.python.org/")
01378         self.assertEqual("/", req.get_selector())
01379 
01380     def test_get_type(self):
01381         self.assertEqual("http", self.get.get_type())
01382 
01383     def test_get_host(self):
01384         self.assertEqual("www.python.org", self.get.get_host())
01385 
01386     def test_get_host_unquote(self):
01387         req = Request("http://www.%70ython.org/")
01388         self.assertEqual("www.python.org", req.get_host())
01389 
01390     def test_proxy(self):
01391         self.assertFalse(self.get.has_proxy())
01392         self.get.set_proxy("www.perl.org", "http")
01393         self.assertTrue(self.get.has_proxy())
01394         self.assertEqual("www.python.org", self.get.get_origin_req_host())
01395         self.assertEqual("www.perl.org", self.get.get_host())
01396 
01397     def test_wrapped_url(self):
01398         req = Request("<URL:http://www.python.org>")
01399         self.assertEqual("www.python.org", req.get_host())
01400 
01401     def test_url_fragment(self):
01402         req = Request("http://www.python.org/?qs=query#fragment=true")
01403         self.assertEqual("/?qs=query", req.get_selector())
01404         req = Request("http://www.python.org/#fun=true")
01405         self.assertEqual("/", req.get_selector())
01406 
01407         # Issue 11703: geturl() omits fragment in the original URL.
01408         url = 'http://docs.python.org/library/urllib2.html#OK'
01409         req = Request(url)
01410         self.assertEqual(req.get_full_url(), url)
01411 
01412 def test_main(verbose=None):
01413     from test import test_urllib2
01414     support.run_doctest(test_urllib2, verbose)
01415     support.run_doctest(urllib.request, verbose)
01416     tests = (TrivialTests,
01417              OpenerDirectorTests,
01418              HandlerTests,
01419              MiscTests,
01420              RequestTests)
01421     support.run_unittest(*tests)
01422 
01423 if __name__ == "__main__":
01424     test_main(verbose=True)