Back to index

moin  1.9.0~rc2
threadedserver.py
Go to the documentation of this file.
00001 # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 # 1. Redistributions of source code must retain the above copyright
00008 #    notice, this list of conditions and the following disclaimer.
00009 # 2. Redistributions in binary form must reproduce the above copyright
00010 #    notice, this list of conditions and the following disclaimer in the
00011 #    documentation and/or other materials provided with the distribution.
00012 #
00013 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
00014 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00015 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00016 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
00017 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00018 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00019 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00020 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00021 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00022 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00023 # SUCH DAMAGE.
00024 #
00025 # $Id$
00026 
00027 __author__ = 'Allan Saddi <allan@saddi.com>'
00028 __version__ = '$Revision$'
00029 
00030 import sys
00031 import socket
00032 import select
00033 import signal
00034 import errno
00035 
00036 try:
00037     import fcntl
00038 except ImportError:
00039     def setCloseOnExec(sock):
00040         pass
00041 else:
00042     def setCloseOnExec(sock):
00043         fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
00044 
00045 from flup.server.threadpool import ThreadPool
00046 
00047 __all__ = ['ThreadedServer']
00048 
00049 class ThreadedServer(object):
00050     def __init__(self, jobClass=None, jobArgs=(), **kw):
00051         self._jobClass = jobClass
00052         self._jobArgs = jobArgs
00053 
00054         self._threadPool = ThreadPool(**kw)
00055 
00056     def run(self, sock, timeout=1.0):
00057         """
00058         The main loop. Pass a socket that is ready to accept() client
00059         connections. Return value will be True or False indiciating whether
00060         or not the loop was exited due to SIGHUP.
00061         """
00062         # Set up signal handlers.
00063         self._keepGoing = True
00064         self._hupReceived = False
00065 
00066         # Might need to revisit this?
00067         if not sys.platform.startswith('win'):
00068             self._installSignalHandlers()
00069 
00070         # Set close-on-exec
00071         setCloseOnExec(sock)
00072         
00073         # Main loop.
00074         while self._keepGoing:
00075             try:
00076                 r, w, e = select.select([sock], [], [], timeout)
00077             except select.error, e:
00078                 if e[0] == errno.EINTR:
00079                     continue
00080                 raise
00081 
00082             if r:
00083                 try:
00084                     clientSock, addr = sock.accept()
00085                 except socket.error, e:
00086                     if e[0] in (errno.EINTR, errno.EAGAIN):
00087                         continue
00088                     raise
00089 
00090                 setCloseOnExec(clientSock)
00091                 
00092                 if not self._isClientAllowed(addr):
00093                     clientSock.close()
00094                     continue
00095 
00096                 # Hand off to Connection.
00097                 conn = self._jobClass(clientSock, addr, *self._jobArgs)
00098                 if not self._threadPool.addJob(conn, allowQueuing=False):
00099                     # No thread left, immediately close the socket to hopefully
00100                     # indicate to the web server that we're at our limit...
00101                     # and to prevent having too many opened (and useless)
00102                     # files.
00103                     clientSock.close()
00104 
00105             self._mainloopPeriodic()
00106 
00107         # Restore signal handlers.
00108         self._restoreSignalHandlers()
00109 
00110         # Return bool based on whether or not SIGHUP was received.
00111         return self._hupReceived
00112         
00113     def shutdown(self):
00114         """Wait for running threads to finish."""
00115         self._threadPool.shutdown()
00116 
00117     def _mainloopPeriodic(self):
00118         """
00119         Called with just about each iteration of the main loop. Meant to
00120         be overridden.
00121         """
00122         pass
00123 
00124     def _exit(self, reload=False):
00125         """
00126         Protected convenience method for subclasses to force an exit. Not
00127         really thread-safe, which is why it isn't public.
00128         """
00129         if self._keepGoing:
00130             self._keepGoing = False
00131             self._hupReceived = reload
00132 
00133     def _isClientAllowed(self, addr):
00134         """Override to provide access control."""
00135         return True
00136 
00137     # Signal handlers
00138 
00139     def _hupHandler(self, signum, frame):
00140         self._hupReceived = True
00141         self._keepGoing = False
00142 
00143     def _intHandler(self, signum, frame):
00144         self._keepGoing = False
00145 
00146     def _installSignalHandlers(self):
00147         supportedSignals = [signal.SIGINT, signal.SIGTERM]
00148         if hasattr(signal, 'SIGHUP'):
00149             supportedSignals.append(signal.SIGHUP)
00150 
00151         self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
00152 
00153         for sig in supportedSignals:
00154             if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
00155                 signal.signal(sig, self._hupHandler)
00156             else:
00157                 signal.signal(sig, self._intHandler)
00158 
00159     def _restoreSignalHandlers(self):
00160         for signum,handler in self._oldSIGs:
00161             signal.signal(signum, handler)
00162 
00163 if __name__ == '__main__':
00164     class TestJob(object):
00165         def __init__(self, sock, addr):
00166             self._sock = sock
00167             self._addr = addr
00168         def run(self):
00169             print "Client connection opened from %s:%d" % self._addr
00170             self._sock.send('Hello World!\n')
00171             self._sock.setblocking(1)
00172             self._sock.recv(1)
00173             self._sock.close()
00174             print "Client connection closed from %s:%d" % self._addr
00175     sock = socket.socket()
00176     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00177     sock.bind(('', 8080))
00178     sock.listen(socket.SOMAXCONN)
00179     ThreadedServer(maxThreads=10, jobClass=TestJob).run(sock)