Back to index

moin  1.9.0~rc2
preforkserver.py
Go to the documentation of this file.
00001 # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 # 1. Redistributions of source code must retain the above copyright
00008 #    notice, this list of conditions and the following disclaimer.
00009 # 2. Redistributions in binary form must reproduce the above copyright
00010 #    notice, this list of conditions and the following disclaimer in the
00011 #    documentation and/or other materials provided with the distribution.
00012 #
00013 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
00014 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00015 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00016 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
00017 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00018 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00019 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00020 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00021 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00022 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00023 # SUCH DAMAGE.
00024 #
00025 # $Id$
00026 
00027 __author__ = 'Allan Saddi <allan@saddi.com>'
00028 __version__ = '$Revision$'
00029 
00030 import sys
00031 import os
00032 import socket
00033 import select
00034 import errno
00035 import signal
00036 import random
00037 import time
00038 
00039 try:
00040     import fcntl
00041 except ImportError:
00042     def setCloseOnExec(sock):
00043         pass
00044 else:
00045     def setCloseOnExec(sock):
00046         fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
00047 
00048 # If running Python < 2.4, require eunuchs module for socket.socketpair().
00049 # See <http://www.inoi.fi/open/trac/eunuchs>.
00050 if not hasattr(socket, 'socketpair'):
00051     try:
00052         import eunuchs.socketpair
00053     except ImportError:
00054         # TODO: Other alternatives? Perhaps using os.pipe()?
00055         raise ImportError, 'Requires eunuchs module for Python < 2.4'
00056 
00057     def socketpair():
00058         s1, s2 = eunuchs.socketpair.socketpair()
00059         p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
00060                 socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
00061         os.close(s1)
00062         os.close(s2)
00063         return p, c
00064 
00065     socket.socketpair = socketpair
00066 
00067 class PreforkServer(object):
00068     """
00069     A preforked server model conceptually similar to Apache httpd(2). At
00070     any given time, ensures there are at least minSpare children ready to
00071     process new requests (up to a maximum of maxChildren children total).
00072     If the number of idle children is ever above maxSpare, the extra
00073     children are killed.
00074 
00075     If maxRequests is positive, each child will only handle that many
00076     requests in its lifetime before exiting.
00077     
00078     jobClass should be a class whose constructor takes at least two
00079     arguments: the client socket and client address. jobArgs, which
00080     must be a list or tuple, is any additional (static) arguments you
00081     wish to pass to the constructor.
00082 
00083     jobClass should have a run() method (taking no arguments) that does
00084     the actual work. When run() returns, the request is considered
00085     complete and the child process moves to idle state.
00086     """
00087     def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
00088                  maxRequests=0, jobClass=None, jobArgs=()):
00089         self._minSpare = minSpare
00090         self._maxSpare = maxSpare
00091         self._maxChildren = max(maxSpare, maxChildren)
00092         self._maxRequests = maxRequests
00093         self._jobClass = jobClass
00094         self._jobArgs = jobArgs
00095 
00096         # Internal state of children. Maps pids to dictionaries with two
00097         # members: 'file' and 'avail'. 'file' is the socket to that
00098         # individidual child and 'avail' is whether or not the child is
00099         # free to process requests.
00100         self._children = {}
00101 
00102         self._children_to_purge = []
00103         self._last_purge = 0
00104 
00105         if minSpare < 1:
00106             raise ValueError("minSpare must be at least 1!")
00107         if maxSpare < minSpare:
00108             raise ValueError("maxSpare must be greater than, or equal to, minSpare!")
00109 
00110     def run(self, sock):
00111         """
00112         The main loop. Pass a socket that is ready to accept() client
00113         connections. Return value will be True or False indiciating whether
00114         or not the loop was exited due to SIGHUP.
00115         """
00116         # Set up signal handlers.
00117         self._keepGoing = True
00118         self._hupReceived = False
00119         self._installSignalHandlers()
00120 
00121         # Don't want operations on main socket to block.
00122         sock.setblocking(0)
00123 
00124         # Set close-on-exec
00125         setCloseOnExec(sock)
00126         
00127         # Main loop.
00128         while self._keepGoing:
00129             # Maintain minimum number of children. Note that we are checking
00130             # the absolute number of children, not the number of "available"
00131             # children. We explicitly test against _maxSpare to maintain
00132             # an *optimistic* absolute minimum. The number of children will
00133             # always be in the range [_maxSpare, _maxChildren].
00134             while len(self._children) < self._maxSpare:
00135                 if not self._spawnChild(sock): break
00136 
00137             # Wait on any socket activity from live children.
00138             r = [x['file'] for x in self._children.values()
00139                  if x['file'] is not None]
00140 
00141             if len(r) == len(self._children) and not self._children_to_purge:
00142                 timeout = None
00143             else:
00144                 # There are dead children that need to be reaped, ensure
00145                 # that they are by timing out, if necessary. Or there are some
00146                 # children that need to die.
00147                 timeout = 2
00148 
00149             w = (time.time() > self._last_purge + 10) and self._children_to_purge or []
00150             try:
00151                 r, w, e = select.select(r, w, [], timeout)
00152             except select.error, e:
00153                 if e[0] != errno.EINTR:
00154                     raise
00155 
00156             # Scan child sockets and tend to those that need attention.
00157             for child in r:
00158                 # Receive status byte.
00159                 try:
00160                     state = child.recv(1)
00161                 except socket.error, e:
00162                     if e[0] in (errno.EAGAIN, errno.EINTR):
00163                         # Guess it really didn't need attention?
00164                         continue
00165                     raise
00166                 # Try to match it with a child. (Do we need a reverse map?)
00167                 for pid,d in self._children.items():
00168                     if child is d['file']:
00169                         if state:
00170                             # Set availability status accordingly.
00171                             self._children[pid]['avail'] = state != '\x00'
00172                         else:
00173                             # Didn't receive anything. Child is most likely
00174                             # dead.
00175                             d = self._children[pid]
00176                             d['file'].close()
00177                             d['file'] = None
00178                             d['avail'] = False
00179 
00180             for child in w:
00181                 # purging child
00182                 child.send('bye, bye')
00183                 del self._children_to_purge[self._children_to_purge.index(child)]
00184                 self._last_purge = time.time()
00185 
00186                 # Try to match it with a child. (Do we need a reverse map?)
00187                 for pid,d in self._children.items():
00188                     if child is d['file']:
00189                         d['file'].close()
00190                         d['file'] = None
00191                         d['avail'] = False
00192                 break
00193 
00194             # Reap children.
00195             self._reapChildren()
00196 
00197             # See who and how many children are available.
00198             availList = filter(lambda x: x[1]['avail'], self._children.items())
00199             avail = len(availList)
00200 
00201             if avail < self._minSpare:
00202                 # Need to spawn more children.
00203                 while avail < self._minSpare and \
00204                       len(self._children) < self._maxChildren:
00205                     if not self._spawnChild(sock): break
00206                     avail += 1
00207             elif avail > self._maxSpare:
00208                 # Too many spares, kill off the extras.
00209                 pids = [x[0] for x in availList]
00210                 pids.sort()
00211                 pids = pids[self._maxSpare:]
00212                 for pid in pids:
00213                     d = self._children[pid]
00214                     d['file'].close()
00215                     d['file'] = None
00216                     d['avail'] = False
00217 
00218         # Clean up all child processes.
00219         self._cleanupChildren()
00220 
00221         # Restore signal handlers.
00222         self._restoreSignalHandlers()
00223 
00224         # Return bool based on whether or not SIGHUP was received.
00225         return self._hupReceived
00226 
00227     def _cleanupChildren(self):
00228         """
00229         Closes all child sockets (letting those that are available know
00230         that it's time to exit). Sends SIGINT to those that are currently
00231         processing (and hopes that it finishses ASAP).
00232 
00233         Any children remaining after 10 seconds is SIGKILLed.
00234         """
00235         # Let all children know it's time to go.
00236         for pid,d in self._children.items():
00237             if d['file'] is not None:
00238                 d['file'].close()
00239                 d['file'] = None
00240             if not d['avail']:
00241                 # Child is unavailable. SIGINT it.
00242                 try:
00243                     os.kill(pid, signal.SIGINT)
00244                 except OSError, e:
00245                     if e[0] != errno.ESRCH:
00246                         raise
00247 
00248         def alrmHandler(signum, frame):
00249             pass
00250 
00251         # Set up alarm to wake us up after 10 seconds.
00252         oldSIGALRM = signal.getsignal(signal.SIGALRM)
00253         signal.signal(signal.SIGALRM, alrmHandler)
00254         signal.alarm(10)
00255 
00256         # Wait for all children to die.
00257         while len(self._children):
00258             try:
00259                 pid, status = os.wait()
00260             except OSError, e:
00261                 if e[0] in (errno.ECHILD, errno.EINTR):
00262                     break
00263             if self._children.has_key(pid):
00264                 del self._children[pid]
00265 
00266         signal.signal(signal.SIGALRM, oldSIGALRM)
00267 
00268         # Forcefully kill any remaining children.
00269         for pid in self._children.keys():
00270             try:
00271                 os.kill(pid, signal.SIGKILL)
00272             except OSError, e:
00273                 if e[0] != errno.ESRCH:
00274                     raise
00275 
00276     def _reapChildren(self):
00277         """Cleans up self._children whenever children die."""
00278         while True:
00279             try:
00280                 pid, status = os.waitpid(-1, os.WNOHANG)
00281             except OSError, e:
00282                 if e[0] == errno.ECHILD:
00283                     break
00284                 raise
00285             if pid <= 0:
00286                 break
00287             if self._children.has_key(pid): # Sanity check.
00288                 if self._children[pid]['file'] is not None:
00289                     self._children[pid]['file'].close()
00290                 del self._children[pid]
00291 
00292     def _spawnChild(self, sock):
00293         """
00294         Spawn a single child. Returns True if successful, False otherwise.
00295         """
00296         # This socket pair is used for very simple communication between
00297         # the parent and its children.
00298         parent, child = socket.socketpair()
00299         parent.setblocking(0)
00300         setCloseOnExec(parent)
00301         child.setblocking(0)
00302         setCloseOnExec(child)
00303         try:
00304             pid = os.fork()
00305         except OSError, e:
00306             if e[0] in (errno.EAGAIN, errno.ENOMEM):
00307                 return False # Can't fork anymore.
00308             raise
00309         if not pid:
00310             # Child
00311             child.close()
00312             # Put child into its own process group.
00313             pid = os.getpid()
00314             os.setpgid(pid, pid)
00315             # Restore signal handlers.
00316             self._restoreSignalHandlers()
00317             # Close copies of child sockets.
00318             for f in [x['file'] for x in self._children.values()
00319                       if x['file'] is not None]:
00320                 f.close()
00321             self._children = {}
00322             try:
00323                 # Enter main loop.
00324                 self._child(sock, parent)
00325             except KeyboardInterrupt:
00326                 pass
00327             sys.exit(0)
00328         else:
00329             # Parent
00330             parent.close()
00331             d = self._children[pid] = {}
00332             d['file'] = child
00333             d['avail'] = True
00334             return True
00335 
00336     def _isClientAllowed(self, addr):
00337         """Override to provide access control."""
00338         return True
00339 
00340     def _notifyParent(self, parent, msg):
00341         """Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
00342         while True:
00343             try:
00344                 parent.send(msg)
00345                 return True
00346             except socket.error, e:
00347                 if e[0] == errno.EPIPE:
00348                     return False # Parent is gone
00349                 if e[0] == errno.EAGAIN:
00350                     # Wait for socket change before sending again
00351                     select.select([], [parent], [])
00352                 else:
00353                     raise
00354                 
00355     def _child(self, sock, parent):
00356         """Main loop for children."""
00357         requestCount = 0
00358 
00359         # Re-seed random module
00360         preseed = ''
00361         # urandom only exists in Python >= 2.4
00362         if hasattr(os, 'urandom'):
00363             try:
00364                 preseed = os.urandom(16)
00365             except NotImplementedError:
00366                 pass
00367         # Have doubts about this. random.seed will just hash the string
00368         random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
00369         del preseed
00370 
00371         while True:
00372             # Wait for any activity on the main socket or parent socket.
00373             r, w, e = select.select([sock, parent], [], [])
00374 
00375             for f in r:
00376                 # If there's any activity on the parent socket, it
00377                 # means the parent wants us to die or has died itself.
00378                 # Either way, exit.
00379                 if f is parent:
00380                     return
00381 
00382             # Otherwise, there's activity on the main socket...
00383             try:
00384                 clientSock, addr = sock.accept()
00385             except socket.error, e:
00386                 if e[0] == errno.EAGAIN:
00387                     # Or maybe not.
00388                     continue
00389                 raise
00390 
00391             setCloseOnExec(clientSock)
00392             
00393             # Check if this client is allowed.
00394             if not self._isClientAllowed(addr):
00395                 clientSock.close()
00396                 continue
00397 
00398             # Notify parent we're no longer available.
00399             self._notifyParent(parent, '\x00')
00400 
00401             # Do the job.
00402             self._jobClass(clientSock, addr, *self._jobArgs).run()
00403 
00404             # If we've serviced the maximum number of requests, exit.
00405             if self._maxRequests > 0:
00406                 requestCount += 1
00407                 if requestCount >= self._maxRequests:
00408                     break
00409                 
00410             # Tell parent we're free again.
00411             if not self._notifyParent(parent, '\xff'):
00412                 return # Parent is gone.
00413 
00414     # Signal handlers
00415 
00416     def _hupHandler(self, signum, frame):
00417         self._keepGoing = False
00418         self._hupReceived = True
00419 
00420     def _intHandler(self, signum, frame):
00421         self._keepGoing = False
00422 
00423     def _chldHandler(self, signum, frame):
00424         # Do nothing (breaks us out of select and allows us to reap children).
00425         pass
00426 
00427     def _usr1Handler(self, signum, frame):
00428         self._children_to_purge = [x['file'] for x in self._children.values()
00429                                    if x['file'] is not None]
00430 
00431     def _installSignalHandlers(self):
00432         supportedSignals = [signal.SIGINT, signal.SIGTERM]
00433         if hasattr(signal, 'SIGHUP'):
00434             supportedSignals.append(signal.SIGHUP)
00435         if hasattr(signal, 'SIGUSR1'):
00436             supportedSignals.append(signal.SIGUSR1)
00437 
00438         self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
00439 
00440         for sig in supportedSignals:
00441             if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
00442                 signal.signal(sig, self._hupHandler)
00443             elif hasattr(signal, 'SIGUSR1') and sig == signal.SIGUSR1:
00444                 signal.signal(sig, self._usr1Handler)
00445             else:
00446                 signal.signal(sig, self._intHandler)
00447 
00448     def _restoreSignalHandlers(self):
00449         """Restores previous signal handlers."""
00450         for signum,handler in self._oldSIGs:
00451             signal.signal(signum, handler)
00452 
00453 if __name__ == '__main__':
00454     class TestJob(object):
00455         def __init__(self, sock, addr):
00456             self._sock = sock
00457             self._addr = addr
00458         def run(self):
00459             print "Client connection opened from %s:%d" % self._addr
00460             self._sock.send('Hello World!\n')
00461             self._sock.setblocking(1)
00462             self._sock.recv(1)
00463             self._sock.close()
00464             print "Client connection closed from %s:%d" % self._addr
00465     sock = socket.socket()
00466     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00467     sock.bind(('', 8080))
00468     sock.listen(socket.SOMAXCONN)
00469     PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)