Back to index

python3.2  3.2.2
test_xmlrpc.py
Go to the documentation of this file.
00001 import base64
00002 import datetime
00003 import sys
00004 import time
00005 import unittest
00006 import xmlrpc.client as xmlrpclib
00007 import xmlrpc.server
00008 import http.client
00009 import socket
00010 import os
00011 import re
00012 import io
00013 import contextlib
00014 from test import support
00015 
00016 try:
00017     import threading
00018 except ImportError:
00019     threading = None
00020 
00021 alist = [{'astring': 'foo@bar.baz.spam',
00022           'afloat': 7283.43,
00023           'anint': 2**20,
00024           'ashortlong': 2,
00025           'anotherlist': ['.zyx.41'],
00026           'abase64': xmlrpclib.Binary(b"my dog has fleas"),
00027           'boolean': False,
00028           'unicode': '\u4000\u6000\u8000',
00029           'ukey\u4000': 'regular value',
00030           'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
00031           'datetime2': xmlrpclib.DateTime(
00032                         (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
00033           'datetime3': xmlrpclib.DateTime(
00034                         datetime.datetime(2005, 2, 10, 11, 41, 23)),
00035           }]
00036 
00037 class XMLRPCTestCase(unittest.TestCase):
00038 
00039     def test_dump_load(self):
00040         dump = xmlrpclib.dumps((alist,))
00041         load = xmlrpclib.loads(dump)
00042         self.assertEqual(alist, load[0][0])
00043 
00044     def test_dump_bare_datetime(self):
00045         # This checks that an unwrapped datetime.date object can be handled
00046         # by the marshalling code.  This can't be done via test_dump_load()
00047         # since with use_datetime set to 1 the unmarshaller would create
00048         # datetime objects for the 'datetime[123]' keys as well
00049         dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
00050         s = xmlrpclib.dumps((dt,))
00051         (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
00052         self.assertEqual(newdt, dt)
00053         self.assertEqual(m, None)
00054 
00055         (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
00056         self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
00057 
00058     def test_datetime_before_1900(self):
00059         # same as before but with a date before 1900
00060         dt = datetime.datetime(1,  2, 10, 11, 41, 23)
00061         s = xmlrpclib.dumps((dt,))
00062         (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
00063         self.assertEqual(newdt, dt)
00064         self.assertEqual(m, None)
00065 
00066         (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
00067         self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
00068 
00069     def test_cmp_datetime_DateTime(self):
00070         now = datetime.datetime.now()
00071         dt = xmlrpclib.DateTime(now.timetuple())
00072         self.assertTrue(dt == now)
00073         self.assertTrue(now == dt)
00074         then = now + datetime.timedelta(seconds=4)
00075         self.assertTrue(then >= dt)
00076         self.assertTrue(dt < then)
00077 
00078     def test_bug_1164912 (self):
00079         d = xmlrpclib.DateTime()
00080         ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
00081                                             methodresponse=True))
00082         self.assertIsInstance(new_d.value, str)
00083 
00084         # Check that the output of dumps() is still an 8-bit string
00085         s = xmlrpclib.dumps((new_d,), methodresponse=True)
00086         self.assertIsInstance(s, str)
00087 
00088     def test_newstyle_class(self):
00089         class T(object):
00090             pass
00091         t = T()
00092         t.x = 100
00093         t.y = "Hello"
00094         ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
00095         self.assertEqual(t2, t.__dict__)
00096 
00097     def test_dump_big_long(self):
00098         self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
00099 
00100     def test_dump_bad_dict(self):
00101         self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
00102 
00103     def test_dump_recursive_seq(self):
00104         l = [1,2,3]
00105         t = [3,4,5,l]
00106         l.append(t)
00107         self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
00108 
00109     def test_dump_recursive_dict(self):
00110         d = {'1':1, '2':1}
00111         t = {'3':3, 'd':d}
00112         d['t'] = t
00113         self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
00114 
00115     def test_dump_big_int(self):
00116         if sys.maxsize > 2**31-1:
00117             self.assertRaises(OverflowError, xmlrpclib.dumps,
00118                               (int(2**34),))
00119 
00120         xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
00121         self.assertRaises(OverflowError, xmlrpclib.dumps,
00122                           (xmlrpclib.MAXINT+1,))
00123         self.assertRaises(OverflowError, xmlrpclib.dumps,
00124                           (xmlrpclib.MININT-1,))
00125 
00126         def dummy_write(s):
00127             pass
00128 
00129         m = xmlrpclib.Marshaller()
00130         m.dump_int(xmlrpclib.MAXINT, dummy_write)
00131         m.dump_int(xmlrpclib.MININT, dummy_write)
00132         self.assertRaises(OverflowError, m.dump_int,
00133                           xmlrpclib.MAXINT+1, dummy_write)
00134         self.assertRaises(OverflowError, m.dump_int,
00135                           xmlrpclib.MININT-1, dummy_write)
00136 
00137     def test_dump_none(self):
00138         value = alist + [None]
00139         arg1 = (alist + [None],)
00140         strg = xmlrpclib.dumps(arg1, allow_none=True)
00141         self.assertEqual(value,
00142                           xmlrpclib.loads(strg)[0][0])
00143         self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
00144 
00145     def test_get_host_info(self):
00146         # see bug #3613, this raised a TypeError
00147         transp = xmlrpc.client.Transport()
00148         self.assertEqual(transp.get_host_info("user@host.tld"),
00149                           ('host.tld',
00150                            [('Authorization', 'Basic dXNlcg==')], {}))
00151 
00152     def test_ssl_presence(self):
00153         try:
00154             import ssl
00155         except ImportError:
00156             has_ssl = False
00157         else:
00158             has_ssl = True
00159         try:
00160             xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
00161         except NotImplementedError:
00162             self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
00163         except socket.error:
00164             self.assertTrue(has_ssl)
00165 
00166 class HelperTestCase(unittest.TestCase):
00167     def test_escape(self):
00168         self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
00169         self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
00170         self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
00171 
00172 class FaultTestCase(unittest.TestCase):
00173     def test_repr(self):
00174         f = xmlrpclib.Fault(42, 'Test Fault')
00175         self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
00176         self.assertEqual(repr(f), str(f))
00177 
00178     def test_dump_fault(self):
00179         f = xmlrpclib.Fault(42, 'Test Fault')
00180         s = xmlrpclib.dumps((f,))
00181         (newf,), m = xmlrpclib.loads(s)
00182         self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
00183         self.assertEqual(m, None)
00184 
00185         s = xmlrpclib.Marshaller().dumps(f)
00186         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
00187 
00188     def test_dotted_attribute(self):
00189         # this will raise AttirebuteError because code don't want us to use
00190         # private methods
00191         self.assertRaises(AttributeError,
00192                           xmlrpc.server.resolve_dotted_attribute, str, '__add')
00193         self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
00194 
00195 class DateTimeTestCase(unittest.TestCase):
00196     def test_default(self):
00197         t = xmlrpclib.DateTime()
00198 
00199     def test_time(self):
00200         d = 1181399930.036952
00201         t = xmlrpclib.DateTime(d)
00202         self.assertEqual(str(t),
00203                          time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
00204 
00205     def test_time_tuple(self):
00206         d = (2007,6,9,10,38,50,5,160,0)
00207         t = xmlrpclib.DateTime(d)
00208         self.assertEqual(str(t), '20070609T10:38:50')
00209 
00210     def test_time_struct(self):
00211         d = time.localtime(1181399930.036952)
00212         t = xmlrpclib.DateTime(d)
00213         self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
00214 
00215     def test_datetime_datetime(self):
00216         d = datetime.datetime(2007,1,2,3,4,5)
00217         t = xmlrpclib.DateTime(d)
00218         self.assertEqual(str(t), '20070102T03:04:05')
00219 
00220     def test_repr(self):
00221         d = datetime.datetime(2007,1,2,3,4,5)
00222         t = xmlrpclib.DateTime(d)
00223         val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
00224         self.assertEqual(repr(t), val)
00225 
00226     def test_decode(self):
00227         d = ' 20070908T07:11:13  '
00228         t1 = xmlrpclib.DateTime()
00229         t1.decode(d)
00230         tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
00231         self.assertEqual(t1, tref)
00232 
00233         t2 = xmlrpclib._datetime(d)
00234         self.assertEqual(t1, tref)
00235 
00236 class BinaryTestCase(unittest.TestCase):
00237 
00238     # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
00239     # for now (i.e. interpreting the binary data as Latin-1-encoded
00240     # text).  But this feels very unsatisfactory.  Perhaps we should
00241     # only define repr(), and return r"Binary(b'\xff')" instead?
00242 
00243     def test_default(self):
00244         t = xmlrpclib.Binary()
00245         self.assertEqual(str(t), '')
00246 
00247     def test_string(self):
00248         d = b'\x01\x02\x03abc123\xff\xfe'
00249         t = xmlrpclib.Binary(d)
00250         self.assertEqual(str(t), str(d, "latin-1"))
00251 
00252     def test_decode(self):
00253         d = b'\x01\x02\x03abc123\xff\xfe'
00254         de = base64.encodebytes(d)
00255         t1 = xmlrpclib.Binary()
00256         t1.decode(de)
00257         self.assertEqual(str(t1), str(d, "latin-1"))
00258 
00259         t2 = xmlrpclib._binary(de)
00260         self.assertEqual(str(t2), str(d, "latin-1"))
00261 
00262 
00263 ADDR = PORT = URL = None
00264 
00265 # The evt is set twice.  First when the server is ready to serve.
00266 # Second when the server has been shutdown.  The user must clear
00267 # the event after it has been set the first time to catch the second set.
00268 def http_server(evt, numrequests, requestHandler=None):
00269     class TestInstanceClass:
00270         def div(self, x, y):
00271             return x // y
00272 
00273         def _methodHelp(self, name):
00274             if name == 'div':
00275                 return 'This is the div function'
00276 
00277     def my_function():
00278         '''This is my function'''
00279         return True
00280 
00281     class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
00282         def get_request(self):
00283             # Ensure the socket is always non-blocking.  On Linux, socket
00284             # attributes are not inherited like they are on *BSD and Windows.
00285             s, port = self.socket.accept()
00286             s.setblocking(True)
00287             return s, port
00288 
00289     if not requestHandler:
00290         requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
00291     serv = MyXMLRPCServer(("localhost", 0), requestHandler,
00292                           logRequests=False, bind_and_activate=False)
00293     try:
00294         serv.server_bind()
00295         global ADDR, PORT, URL
00296         ADDR, PORT = serv.socket.getsockname()
00297         #connect to IP address directly.  This avoids socket.create_connection()
00298         #trying to connect to to "localhost" using all address families, which
00299         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
00300         #on AF_INET only.
00301         URL = "http://%s:%d"%(ADDR, PORT)
00302         serv.server_activate()
00303         serv.register_introspection_functions()
00304         serv.register_multicall_functions()
00305         serv.register_function(pow)
00306         serv.register_function(lambda x,y: x+y, 'add')
00307         serv.register_function(my_function)
00308         serv.register_instance(TestInstanceClass())
00309         evt.set()
00310 
00311         # handle up to 'numrequests' requests
00312         while numrequests > 0:
00313             serv.handle_request()
00314             numrequests -= 1
00315 
00316     except socket.timeout:
00317         pass
00318     finally:
00319         serv.socket.close()
00320         PORT = None
00321         evt.set()
00322 
00323 def http_multi_server(evt, numrequests, requestHandler=None):
00324     class TestInstanceClass:
00325         def div(self, x, y):
00326             return x // y
00327 
00328         def _methodHelp(self, name):
00329             if name == 'div':
00330                 return 'This is the div function'
00331 
00332     def my_function():
00333         '''This is my function'''
00334         return True
00335 
00336     class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
00337         def get_request(self):
00338             # Ensure the socket is always non-blocking.  On Linux, socket
00339             # attributes are not inherited like they are on *BSD and Windows.
00340             s, port = self.socket.accept()
00341             s.setblocking(True)
00342             return s, port
00343 
00344     if not requestHandler:
00345         requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
00346     class MyRequestHandler(requestHandler):
00347         rpc_paths = []
00348 
00349     serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
00350                           logRequests=False, bind_and_activate=False)
00351     serv.socket.settimeout(3)
00352     serv.server_bind()
00353     try:
00354         global ADDR, PORT, URL
00355         ADDR, PORT = serv.socket.getsockname()
00356         #connect to IP address directly.  This avoids socket.create_connection()
00357         #trying to connect to to "localhost" using all address families, which
00358         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
00359         #on AF_INET only.
00360         URL = "http://%s:%d"%(ADDR, PORT)
00361         serv.server_activate()
00362         paths = ["/foo", "/foo/bar"]
00363         for path in paths:
00364             d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
00365             d.register_introspection_functions()
00366             d.register_multicall_functions()
00367         serv.get_dispatcher(paths[0]).register_function(pow)
00368         serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
00369         evt.set()
00370 
00371         # handle up to 'numrequests' requests
00372         while numrequests > 0:
00373             serv.handle_request()
00374             numrequests -= 1
00375 
00376     except socket.timeout:
00377         pass
00378     finally:
00379         serv.socket.close()
00380         PORT = None
00381         evt.set()
00382 
00383 # This function prevents errors like:
00384 #    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
00385 def is_unavailable_exception(e):
00386     '''Returns True if the given ProtocolError is the product of a server-side
00387        exception caused by the 'temporarily unavailable' response sometimes
00388        given by operations on non-blocking sockets.'''
00389 
00390     # sometimes we get a -1 error code and/or empty headers
00391     try:
00392         if e.errcode == -1 or e.headers is None:
00393             return True
00394         exc_mess = e.headers.get('X-exception')
00395     except AttributeError:
00396         # Ignore socket.errors here.
00397         exc_mess = str(e)
00398 
00399     if exc_mess and 'temporarily unavailable' in exc_mess.lower():
00400         return True
00401 
00402 def make_request_and_skipIf(condition, reason):
00403     # If we skip the test, we have to make a request because the
00404     # the server created in setUp blocks expecting one to come in.
00405     if not condition:
00406         return lambda func: func
00407     def decorator(func):
00408         def make_request_and_skip(self):
00409             try:
00410                 xmlrpclib.ServerProxy(URL).my_function()
00411             except (xmlrpclib.ProtocolError, socket.error) as e:
00412                 if not is_unavailable_exception(e):
00413                     raise
00414             raise unittest.SkipTest(reason)
00415         return make_request_and_skip
00416     return decorator
00417 
00418 @unittest.skipUnless(threading, 'Threading required for this test.')
00419 class BaseServerTestCase(unittest.TestCase):
00420     requestHandler = None
00421     request_count = 1
00422     threadFunc = staticmethod(http_server)
00423 
00424     def setUp(self):
00425         # enable traceback reporting
00426         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
00427 
00428         self.evt = threading.Event()
00429         # start server thread to handle requests
00430         serv_args = (self.evt, self.request_count, self.requestHandler)
00431         threading.Thread(target=self.threadFunc, args=serv_args).start()
00432 
00433         # wait for the server to be ready
00434         self.evt.wait()
00435         self.evt.clear()
00436 
00437     def tearDown(self):
00438         # wait on the server thread to terminate
00439         self.evt.wait(4.0)
00440         # XXX this code does not work, and in fact stop_serving doesn't exist.
00441         if not self.evt.is_set():
00442             self.evt.set()
00443             stop_serving()
00444             raise RuntimeError("timeout reached, test has failed")
00445 
00446         # disable traceback reporting
00447         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
00448 
00449 class SimpleServerTestCase(BaseServerTestCase):
00450     def test_simple1(self):
00451         try:
00452             p = xmlrpclib.ServerProxy(URL)
00453             self.assertEqual(p.pow(6,8), 6**8)
00454         except (xmlrpclib.ProtocolError, socket.error) as e:
00455             # ignore failures due to non-blocking socket 'unavailable' errors
00456             if not is_unavailable_exception(e):
00457                 # protocol error; provide additional information in test output
00458                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00459 
00460     def test_nonascii(self):
00461         start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
00462         end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
00463         try:
00464             p = xmlrpclib.ServerProxy(URL)
00465             self.assertEqual(p.add(start_string, end_string),
00466                              start_string + end_string)
00467         except (xmlrpclib.ProtocolError, socket.error) as e:
00468             # ignore failures due to non-blocking socket 'unavailable' errors
00469             if not is_unavailable_exception(e):
00470                 # protocol error; provide additional information in test output
00471                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00472 
00473     # [ch] The test 404 is causing lots of false alarms.
00474     def XXXtest_404(self):
00475         # send POST with http.client, it should return 404 header and
00476         # 'Not Found' message.
00477         conn = httplib.client.HTTPConnection(ADDR, PORT)
00478         conn.request('POST', '/this-is-not-valid')
00479         response = conn.getresponse()
00480         conn.close()
00481 
00482         self.assertEqual(response.status, 404)
00483         self.assertEqual(response.reason, 'Not Found')
00484 
00485     def test_introspection1(self):
00486         expected_methods = set(['pow', 'div', 'my_function', 'add',
00487                                 'system.listMethods', 'system.methodHelp',
00488                                 'system.methodSignature', 'system.multicall'])
00489         try:
00490             p = xmlrpclib.ServerProxy(URL)
00491             meth = p.system.listMethods()
00492             self.assertEqual(set(meth), expected_methods)
00493         except (xmlrpclib.ProtocolError, socket.error) as e:
00494             # ignore failures due to non-blocking socket 'unavailable' errors
00495             if not is_unavailable_exception(e):
00496                 # protocol error; provide additional information in test output
00497                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00498 
00499 
00500     def test_introspection2(self):
00501         try:
00502             # test _methodHelp()
00503             p = xmlrpclib.ServerProxy(URL)
00504             divhelp = p.system.methodHelp('div')
00505             self.assertEqual(divhelp, 'This is the div function')
00506         except (xmlrpclib.ProtocolError, socket.error) as e:
00507             # ignore failures due to non-blocking socket 'unavailable' errors
00508             if not is_unavailable_exception(e):
00509                 # protocol error; provide additional information in test output
00510                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00511 
00512     @make_request_and_skipIf(sys.flags.optimize >= 2,
00513                      "Docstrings are omitted with -O2 and above")
00514     def test_introspection3(self):
00515         try:
00516             # test native doc
00517             p = xmlrpclib.ServerProxy(URL)
00518             myfunction = p.system.methodHelp('my_function')
00519             self.assertEqual(myfunction, 'This is my function')
00520         except (xmlrpclib.ProtocolError, socket.error) as e:
00521             # ignore failures due to non-blocking socket 'unavailable' errors
00522             if not is_unavailable_exception(e):
00523                 # protocol error; provide additional information in test output
00524                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00525 
00526     def test_introspection4(self):
00527         # the SimpleXMLRPCServer doesn't support signatures, but
00528         # at least check that we can try making the call
00529         try:
00530             p = xmlrpclib.ServerProxy(URL)
00531             divsig = p.system.methodSignature('div')
00532             self.assertEqual(divsig, 'signatures not supported')
00533         except (xmlrpclib.ProtocolError, socket.error) as e:
00534             # ignore failures due to non-blocking socket 'unavailable' errors
00535             if not is_unavailable_exception(e):
00536                 # protocol error; provide additional information in test output
00537                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00538 
00539     def test_multicall(self):
00540         try:
00541             p = xmlrpclib.ServerProxy(URL)
00542             multicall = xmlrpclib.MultiCall(p)
00543             multicall.add(2,3)
00544             multicall.pow(6,8)
00545             multicall.div(127,42)
00546             add_result, pow_result, div_result = multicall()
00547             self.assertEqual(add_result, 2+3)
00548             self.assertEqual(pow_result, 6**8)
00549             self.assertEqual(div_result, 127//42)
00550         except (xmlrpclib.ProtocolError, socket.error) as e:
00551             # ignore failures due to non-blocking socket 'unavailable' errors
00552             if not is_unavailable_exception(e):
00553                 # protocol error; provide additional information in test output
00554                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00555 
00556     def test_non_existing_multicall(self):
00557         try:
00558             p = xmlrpclib.ServerProxy(URL)
00559             multicall = xmlrpclib.MultiCall(p)
00560             multicall.this_is_not_exists()
00561             result = multicall()
00562 
00563             # result.results contains;
00564             # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
00565             #   'method "this_is_not_exists" is not supported'>}]
00566 
00567             self.assertEqual(result.results[0]['faultCode'], 1)
00568             self.assertEqual(result.results[0]['faultString'],
00569                 '<class \'Exception\'>:method "this_is_not_exists" '
00570                 'is not supported')
00571         except (xmlrpclib.ProtocolError, socket.error) as e:
00572             # ignore failures due to non-blocking socket 'unavailable' errors
00573             if not is_unavailable_exception(e):
00574                 # protocol error; provide additional information in test output
00575                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00576 
00577     def test_dotted_attribute(self):
00578         # Raises an AttributeError because private methods are not allowed.
00579         self.assertRaises(AttributeError,
00580                           xmlrpc.server.resolve_dotted_attribute, str, '__add')
00581 
00582         self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
00583         # Get the test to run faster by sending a request with test_simple1.
00584         # This avoids waiting for the socket timeout.
00585         self.test_simple1()
00586 
00587 class MultiPathServerTestCase(BaseServerTestCase):
00588     threadFunc = staticmethod(http_multi_server)
00589     request_count = 2
00590     def test_path1(self):
00591         p = xmlrpclib.ServerProxy(URL+"/foo")
00592         self.assertEqual(p.pow(6,8), 6**8)
00593         self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
00594     def test_path2(self):
00595         p = xmlrpclib.ServerProxy(URL+"/foo/bar")
00596         self.assertEqual(p.add(6,8), 6+8)
00597         self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
00598 
00599 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
00600 #does indeed serve subsequent requests on the same connection
00601 class BaseKeepaliveServerTestCase(BaseServerTestCase):
00602     #a request handler that supports keep-alive and logs requests into a
00603     #class variable
00604     class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
00605         parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
00606         protocol_version = 'HTTP/1.1'
00607         myRequests = []
00608         def handle(self):
00609             self.myRequests.append([])
00610             self.reqidx = len(self.myRequests)-1
00611             return self.parentClass.handle(self)
00612         def handle_one_request(self):
00613             result = self.parentClass.handle_one_request(self)
00614             self.myRequests[self.reqidx].append(self.raw_requestline)
00615             return result
00616 
00617     requestHandler = RequestHandler
00618     def setUp(self):
00619         #clear request log
00620         self.RequestHandler.myRequests = []
00621         return BaseServerTestCase.setUp(self)
00622 
00623 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
00624 #does indeed serve subsequent requests on the same connection
00625 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
00626     def test_two(self):
00627         p = xmlrpclib.ServerProxy(URL)
00628         #do three requests.
00629         self.assertEqual(p.pow(6,8), 6**8)
00630         self.assertEqual(p.pow(6,8), 6**8)
00631         self.assertEqual(p.pow(6,8), 6**8)
00632         p("close")()
00633 
00634         #they should have all been handled by a single request handler
00635         self.assertEqual(len(self.RequestHandler.myRequests), 1)
00636 
00637         #check that we did at least two (the third may be pending append
00638         #due to thread scheduling)
00639         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
00640 
00641 
00642 #test special attribute access on the serverproxy, through the __call__
00643 #function.
00644 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
00645     #ask for two keepalive requests to be handled.
00646     request_count=2
00647 
00648     def test_close(self):
00649         p = xmlrpclib.ServerProxy(URL)
00650         #do some requests with close.
00651         self.assertEqual(p.pow(6,8), 6**8)
00652         self.assertEqual(p.pow(6,8), 6**8)
00653         self.assertEqual(p.pow(6,8), 6**8)
00654         p("close")() #this should trigger a new keep-alive request
00655         self.assertEqual(p.pow(6,8), 6**8)
00656         self.assertEqual(p.pow(6,8), 6**8)
00657         self.assertEqual(p.pow(6,8), 6**8)
00658         p("close")()
00659 
00660         #they should have all been two request handlers, each having logged at least
00661         #two complete requests
00662         self.assertEqual(len(self.RequestHandler.myRequests), 2)
00663         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
00664         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
00665 
00666 
00667     def test_transport(self):
00668         p = xmlrpclib.ServerProxy(URL)
00669         #do some requests with close.
00670         self.assertEqual(p.pow(6,8), 6**8)
00671         p("transport").close() #same as above, really.
00672         self.assertEqual(p.pow(6,8), 6**8)
00673         p("close")()
00674         self.assertEqual(len(self.RequestHandler.myRequests), 2)
00675 
00676 #A test case that verifies that gzip encoding works in both directions
00677 #(for a request and the response)
00678 class GzipServerTestCase(BaseServerTestCase):
00679     #a request handler that supports keep-alive and logs requests into a
00680     #class variable
00681     class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
00682         parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
00683         protocol_version = 'HTTP/1.1'
00684 
00685         def do_POST(self):
00686             #store content of last request in class
00687             self.__class__.content_length = int(self.headers["content-length"])
00688             return self.parentClass.do_POST(self)
00689     requestHandler = RequestHandler
00690 
00691     class Transport(xmlrpclib.Transport):
00692         #custom transport, stores the response length for our perusal
00693         fake_gzip = False
00694         def parse_response(self, response):
00695             self.response_length=int(response.getheader("content-length", 0))
00696             return xmlrpclib.Transport.parse_response(self, response)
00697 
00698         def send_content(self, connection, body):
00699             if self.fake_gzip:
00700                 #add a lone gzip header to induce decode error remotely
00701                 connection.putheader("Content-Encoding", "gzip")
00702             return xmlrpclib.Transport.send_content(self, connection, body)
00703 
00704     def setUp(self):
00705         BaseServerTestCase.setUp(self)
00706 
00707     def test_gzip_request(self):
00708         t = self.Transport()
00709         t.encode_threshold = None
00710         p = xmlrpclib.ServerProxy(URL, transport=t)
00711         self.assertEqual(p.pow(6,8), 6**8)
00712         a = self.RequestHandler.content_length
00713         t.encode_threshold = 0 #turn on request encoding
00714         self.assertEqual(p.pow(6,8), 6**8)
00715         b = self.RequestHandler.content_length
00716         self.assertTrue(a>b)
00717         p("close")()
00718 
00719     def test_bad_gzip_request(self):
00720         t = self.Transport()
00721         t.encode_threshold = None
00722         t.fake_gzip = True
00723         p = xmlrpclib.ServerProxy(URL, transport=t)
00724         cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
00725                                     re.compile(r"\b400\b"))
00726         with cm:
00727             p.pow(6, 8)
00728         p("close")()
00729 
00730     def test_gsip_response(self):
00731         t = self.Transport()
00732         p = xmlrpclib.ServerProxy(URL, transport=t)
00733         old = self.requestHandler.encode_threshold
00734         self.requestHandler.encode_threshold = None #no encoding
00735         self.assertEqual(p.pow(6,8), 6**8)
00736         a = t.response_length
00737         self.requestHandler.encode_threshold = 0 #always encode
00738         self.assertEqual(p.pow(6,8), 6**8)
00739         p("close")()
00740         b = t.response_length
00741         self.requestHandler.encode_threshold = old
00742         self.assertTrue(a>b)
00743 
00744 #Test special attributes of the ServerProxy object
00745 class ServerProxyTestCase(unittest.TestCase):
00746     def setUp(self):
00747         unittest.TestCase.setUp(self)
00748         if threading:
00749             self.url = URL
00750         else:
00751             # Without threading, http_server() and http_multi_server() will not
00752             # be executed and URL is still equal to None. 'http://' is a just
00753             # enough to choose the scheme (HTTP)
00754             self.url = 'http://'
00755 
00756     def test_close(self):
00757         p = xmlrpclib.ServerProxy(self.url)
00758         self.assertEqual(p('close')(), None)
00759 
00760     def test_transport(self):
00761         t = xmlrpclib.Transport()
00762         p = xmlrpclib.ServerProxy(self.url, transport=t)
00763         self.assertEqual(p('transport'), t)
00764 
00765 # This is a contrived way to make a failure occur on the server side
00766 # in order to test the _send_traceback_header flag on the server
00767 class FailingMessageClass(http.client.HTTPMessage):
00768     def get(self, key, failobj=None):
00769         key = key.lower()
00770         if key == 'content-length':
00771             return 'I am broken'
00772         return super().get(key, failobj)
00773 
00774 
00775 @unittest.skipUnless(threading, 'Threading required for this test.')
00776 class FailingServerTestCase(unittest.TestCase):
00777     def setUp(self):
00778         self.evt = threading.Event()
00779         # start server thread to handle requests
00780         serv_args = (self.evt, 1)
00781         threading.Thread(target=http_server, args=serv_args).start()
00782 
00783         # wait for the server to be ready
00784         self.evt.wait()
00785         self.evt.clear()
00786 
00787     def tearDown(self):
00788         # wait on the server thread to terminate
00789         self.evt.wait()
00790         # reset flag
00791         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
00792         # reset message class
00793         default_class = http.client.HTTPMessage
00794         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
00795 
00796     def test_basic(self):
00797         # check that flag is false by default
00798         flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
00799         self.assertEqual(flagval, False)
00800 
00801         # enable traceback reporting
00802         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
00803 
00804         # test a call that shouldn't fail just as a smoke test
00805         try:
00806             p = xmlrpclib.ServerProxy(URL)
00807             self.assertEqual(p.pow(6,8), 6**8)
00808         except (xmlrpclib.ProtocolError, socket.error) as e:
00809             # ignore failures due to non-blocking socket 'unavailable' errors
00810             if not is_unavailable_exception(e):
00811                 # protocol error; provide additional information in test output
00812                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
00813 
00814     def test_fail_no_info(self):
00815         # use the broken message class
00816         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
00817 
00818         try:
00819             p = xmlrpclib.ServerProxy(URL)
00820             p.pow(6,8)
00821         except (xmlrpclib.ProtocolError, socket.error) as e:
00822             # ignore failures due to non-blocking socket 'unavailable' errors
00823             if not is_unavailable_exception(e) and hasattr(e, "headers"):
00824                 # The two server-side error headers shouldn't be sent back in this case
00825                 self.assertTrue(e.headers.get("X-exception") is None)
00826                 self.assertTrue(e.headers.get("X-traceback") is None)
00827         else:
00828             self.fail('ProtocolError not raised')
00829 
00830     def test_fail_with_info(self):
00831         # use the broken message class
00832         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
00833 
00834         # Check that errors in the server send back exception/traceback
00835         # info when flag is set
00836         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
00837 
00838         try:
00839             p = xmlrpclib.ServerProxy(URL)
00840             p.pow(6,8)
00841         except (xmlrpclib.ProtocolError, socket.error) as e:
00842             # ignore failures due to non-blocking socket 'unavailable' errors
00843             if not is_unavailable_exception(e) and hasattr(e, "headers"):
00844                 # We should get error info in the response
00845                 expected_err = "invalid literal for int() with base 10: 'I am broken'"
00846                 self.assertEqual(e.headers.get("X-exception"), expected_err)
00847                 self.assertTrue(e.headers.get("X-traceback") is not None)
00848         else:
00849             self.fail('ProtocolError not raised')
00850 
00851 
00852 @contextlib.contextmanager
00853 def captured_stdout(encoding='utf-8'):
00854     """A variation on support.captured_stdout() which gives a text stream
00855     having a `buffer` attribute.
00856     """
00857     import io
00858     orig_stdout = sys.stdout
00859     sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
00860     try:
00861         yield sys.stdout
00862     finally:
00863         sys.stdout = orig_stdout
00864 
00865 
00866 class CGIHandlerTestCase(unittest.TestCase):
00867     def setUp(self):
00868         self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
00869 
00870     def tearDown(self):
00871         self.cgi = None
00872 
00873     def test_cgi_get(self):
00874         with support.EnvironmentVarGuard() as env:
00875             env['REQUEST_METHOD'] = 'GET'
00876             # if the method is GET and no request_text is given, it runs handle_get
00877             # get sysout output
00878             with captured_stdout(encoding=self.cgi.encoding) as data_out:
00879                 self.cgi.handle_request()
00880 
00881             # parse Status header
00882             data_out.seek(0)
00883             handle = data_out.read()
00884             status = handle.split()[1]
00885             message = ' '.join(handle.split()[2:4])
00886 
00887             self.assertEqual(status, '400')
00888             self.assertEqual(message, 'Bad Request')
00889 
00890 
00891     def test_cgi_xmlrpc_response(self):
00892         data = """<?xml version='1.0'?>
00893         <methodCall>
00894             <methodName>test_method</methodName>
00895             <params>
00896                 <param>
00897                     <value><string>foo</string></value>
00898                 </param>
00899                 <param>
00900                     <value><string>bar</string></value>
00901                 </param>
00902             </params>
00903         </methodCall>
00904         """
00905 
00906         with support.EnvironmentVarGuard() as env, \
00907              captured_stdout(encoding=self.cgi.encoding) as data_out, \
00908              support.captured_stdin() as data_in:
00909             data_in.write(data)
00910             data_in.seek(0)
00911             env['CONTENT_LENGTH'] = str(len(data))
00912             self.cgi.handle_request()
00913         data_out.seek(0)
00914 
00915         # will respond exception, if so, our goal is achieved ;)
00916         handle = data_out.read()
00917 
00918         # start with 44th char so as not to get http header, we just
00919         # need only xml
00920         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
00921 
00922         # Also test the content-length returned  by handle_request
00923         # Using the same test method inorder to avoid all the datapassing
00924         # boilerplate code.
00925         # Test for bug: http://bugs.python.org/issue5040
00926 
00927         content = handle[handle.find("<?xml"):]
00928 
00929         self.assertEqual(
00930             int(re.search('Content-Length: (\d+)', handle).group(1)),
00931             len(content))
00932 
00933 
00934 @support.reap_threads
00935 def test_main():
00936     xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
00937          BinaryTestCase, FaultTestCase]
00938     xmlrpc_tests.append(SimpleServerTestCase)
00939     xmlrpc_tests.append(KeepaliveServerTestCase1)
00940     xmlrpc_tests.append(KeepaliveServerTestCase2)
00941     try:
00942         import gzip
00943         xmlrpc_tests.append(GzipServerTestCase)
00944     except ImportError:
00945         pass #gzip not supported in this build
00946     xmlrpc_tests.append(MultiPathServerTestCase)
00947     xmlrpc_tests.append(ServerProxyTestCase)
00948     xmlrpc_tests.append(FailingServerTestCase)
00949     xmlrpc_tests.append(CGIHandlerTestCase)
00950 
00951     support.run_unittest(*xmlrpc_tests)
00952 
00953 if __name__ == "__main__":
00954     test_main()