Back to index

python3.2  3.2.2
test_socketserver.py
Go to the documentation of this file.
00001 """
00002 Test suite for socketserver.
00003 """
00004 
00005 import contextlib
00006 import imp
00007 import os
00008 import select
00009 import signal
00010 import socket
00011 import tempfile
00012 import unittest
00013 import socketserver
00014 
00015 import test.support
00016 from test.support import reap_children, reap_threads, verbose
00017 try:
00018     import threading
00019 except ImportError:
00020     threading = None
00021 
00022 test.support.requires("network")
00023 
00024 TEST_STR = b"hello world\n"
00025 HOST = test.support.HOST
00026 
00027 HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
00028 HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
00029 
00030 def signal_alarm(n):
00031     """Call signal.alarm when it exists (i.e. not on Windows)."""
00032     if hasattr(signal, 'alarm'):
00033         signal.alarm(n)
00034 
00035 def receive(sock, n, timeout=20):
00036     r, w, x = select.select([sock], [], [], timeout)
00037     if sock in r:
00038         return sock.recv(n)
00039     else:
00040         raise RuntimeError("timed out on %r" % (sock,))
00041 
00042 if HAVE_UNIX_SOCKETS:
00043     class ForkingUnixStreamServer(socketserver.ForkingMixIn,
00044                                   socketserver.UnixStreamServer):
00045         pass
00046 
00047     class ForkingUnixDatagramServer(socketserver.ForkingMixIn,
00048                                     socketserver.UnixDatagramServer):
00049         pass
00050 
00051 
00052 @contextlib.contextmanager
00053 def simple_subprocess(testcase):
00054     pid = os.fork()
00055     if pid == 0:
00056         # Don't throw an exception; it would be caught by the test harness.
00057         os._exit(72)
00058     yield None
00059     pid2, status = os.waitpid(pid, 0)
00060     testcase.assertEqual(pid2, pid)
00061     testcase.assertEqual(72 << 8, status)
00062 
00063 
00064 @unittest.skipUnless(threading, 'Threading required for this test.')
00065 class SocketServerTest(unittest.TestCase):
00066     """Test all socket servers."""
00067 
00068     def setUp(self):
00069         signal_alarm(60)  # Kill deadlocks after 60 seconds.
00070         self.port_seed = 0
00071         self.test_files = []
00072 
00073     def tearDown(self):
00074         signal_alarm(0)  # Didn't deadlock.
00075         reap_children()
00076 
00077         for fn in self.test_files:
00078             try:
00079                 os.remove(fn)
00080             except os.error:
00081                 pass
00082         self.test_files[:] = []
00083 
00084     def pickaddr(self, proto):
00085         if proto == socket.AF_INET:
00086             return (HOST, 0)
00087         else:
00088             # XXX: We need a way to tell AF_UNIX to pick its own name
00089             # like AF_INET provides port==0.
00090             dir = None
00091             if os.name == 'os2':
00092                 dir = '\socket'
00093             fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
00094             if os.name == 'os2':
00095                 # AF_UNIX socket names on OS/2 require a specific prefix
00096                 # which can't include a drive letter and must also use
00097                 # backslashes as directory separators
00098                 if fn[1] == ':':
00099                     fn = fn[2:]
00100                 if fn[0] in (os.sep, os.altsep):
00101                     fn = fn[1:]
00102                 if os.sep == '/':
00103                     fn = fn.replace(os.sep, os.altsep)
00104                 else:
00105                     fn = fn.replace(os.altsep, os.sep)
00106             self.test_files.append(fn)
00107             return fn
00108 
00109     def make_server(self, addr, svrcls, hdlrbase):
00110         class MyServer(svrcls):
00111             def handle_error(self, request, client_address):
00112                 self.close_request(request)
00113                 self.server_close()
00114                 raise
00115 
00116         class MyHandler(hdlrbase):
00117             def handle(self):
00118                 line = self.rfile.readline()
00119                 self.wfile.write(line)
00120 
00121         if verbose: print("creating server")
00122         server = MyServer(addr, MyHandler)
00123         self.assertEqual(server.server_address, server.socket.getsockname())
00124         return server
00125 
00126     @unittest.skipUnless(threading, 'Threading required for this test.')
00127     @reap_threads
00128     def run_server(self, svrcls, hdlrbase, testfunc):
00129         server = self.make_server(self.pickaddr(svrcls.address_family),
00130                                   svrcls, hdlrbase)
00131         # We had the OS pick a port, so pull the real address out of
00132         # the server.
00133         addr = server.server_address
00134         if verbose:
00135             print("ADDR =", addr)
00136             print("CLASS =", svrcls)
00137 
00138         t = threading.Thread(
00139             name='%s serving' % svrcls,
00140             target=server.serve_forever,
00141             # Short poll interval to make the test finish quickly.
00142             # Time between requests is short enough that we won't wake
00143             # up spuriously too many times.
00144             kwargs={'poll_interval':0.01})
00145         t.daemon = True  # In case this function raises.
00146         t.start()
00147         if verbose: print("server running")
00148         for i in range(3):
00149             if verbose: print("test client", i)
00150             testfunc(svrcls.address_family, addr)
00151         if verbose: print("waiting for server")
00152         server.shutdown()
00153         t.join()
00154         server.server_close()
00155         if verbose: print("done")
00156 
00157     def stream_examine(self, proto, addr):
00158         s = socket.socket(proto, socket.SOCK_STREAM)
00159         s.connect(addr)
00160         s.sendall(TEST_STR)
00161         buf = data = receive(s, 100)
00162         while data and b'\n' not in buf:
00163             data = receive(s, 100)
00164             buf += data
00165         self.assertEqual(buf, TEST_STR)
00166         s.close()
00167 
00168     def dgram_examine(self, proto, addr):
00169         s = socket.socket(proto, socket.SOCK_DGRAM)
00170         s.sendto(TEST_STR, addr)
00171         buf = data = receive(s, 100)
00172         while data and b'\n' not in buf:
00173             data = receive(s, 100)
00174             buf += data
00175         self.assertEqual(buf, TEST_STR)
00176         s.close()
00177 
00178     def test_TCPServer(self):
00179         self.run_server(socketserver.TCPServer,
00180                         socketserver.StreamRequestHandler,
00181                         self.stream_examine)
00182 
00183     def test_ThreadingTCPServer(self):
00184         self.run_server(socketserver.ThreadingTCPServer,
00185                         socketserver.StreamRequestHandler,
00186                         self.stream_examine)
00187 
00188     if HAVE_FORKING:
00189         def test_ForkingTCPServer(self):
00190             with simple_subprocess(self):
00191                 self.run_server(socketserver.ForkingTCPServer,
00192                                 socketserver.StreamRequestHandler,
00193                                 self.stream_examine)
00194 
00195     if HAVE_UNIX_SOCKETS:
00196         def test_UnixStreamServer(self):
00197             self.run_server(socketserver.UnixStreamServer,
00198                             socketserver.StreamRequestHandler,
00199                             self.stream_examine)
00200 
00201         def test_ThreadingUnixStreamServer(self):
00202             self.run_server(socketserver.ThreadingUnixStreamServer,
00203                             socketserver.StreamRequestHandler,
00204                             self.stream_examine)
00205 
00206         if HAVE_FORKING:
00207             def test_ForkingUnixStreamServer(self):
00208                 with simple_subprocess(self):
00209                     self.run_server(ForkingUnixStreamServer,
00210                                     socketserver.StreamRequestHandler,
00211                                     self.stream_examine)
00212 
00213     def test_UDPServer(self):
00214         self.run_server(socketserver.UDPServer,
00215                         socketserver.DatagramRequestHandler,
00216                         self.dgram_examine)
00217 
00218     def test_ThreadingUDPServer(self):
00219         self.run_server(socketserver.ThreadingUDPServer,
00220                         socketserver.DatagramRequestHandler,
00221                         self.dgram_examine)
00222 
00223     if HAVE_FORKING:
00224         def test_ForkingUDPServer(self):
00225             with simple_subprocess(self):
00226                 self.run_server(socketserver.ForkingUDPServer,
00227                                 socketserver.DatagramRequestHandler,
00228                                 self.dgram_examine)
00229 
00230     # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
00231     # client address so this cannot work:
00232 
00233     # if HAVE_UNIX_SOCKETS:
00234     #     def test_UnixDatagramServer(self):
00235     #         self.run_server(socketserver.UnixDatagramServer,
00236     #                         socketserver.DatagramRequestHandler,
00237     #                         self.dgram_examine)
00238     #
00239     #     def test_ThreadingUnixDatagramServer(self):
00240     #         self.run_server(socketserver.ThreadingUnixDatagramServer,
00241     #                         socketserver.DatagramRequestHandler,
00242     #                         self.dgram_examine)
00243     #
00244     #     if HAVE_FORKING:
00245     #         def test_ForkingUnixDatagramServer(self):
00246     #             self.run_server(socketserver.ForkingUnixDatagramServer,
00247     #                             socketserver.DatagramRequestHandler,
00248     #                             self.dgram_examine)
00249 
00250     @reap_threads
00251     def test_shutdown(self):
00252         # Issue #2302: shutdown() should always succeed in making an
00253         # other thread leave serve_forever().
00254         class MyServer(socketserver.TCPServer):
00255             pass
00256 
00257         class MyHandler(socketserver.StreamRequestHandler):
00258             pass
00259 
00260         threads = []
00261         for i in range(20):
00262             s = MyServer((HOST, 0), MyHandler)
00263             t = threading.Thread(
00264                 name='MyServer serving',
00265                 target=s.serve_forever,
00266                 kwargs={'poll_interval':0.01})
00267             t.daemon = True  # In case this function raises.
00268             threads.append((t, s))
00269         for t, s in threads:
00270             t.start()
00271             s.shutdown()
00272         for t, s in threads:
00273             t.join()
00274             s.server_close()
00275 
00276 
00277 def test_main():
00278     if imp.lock_held():
00279         # If the import lock is held, the threads will hang
00280         raise unittest.SkipTest("can't run when import lock is held")
00281 
00282     test.support.run_unittest(SocketServerTest)
00283 
00284 if __name__ == "__main__":
00285     test_main()