Back to index

moin  1.9.0~rc2
fcgi_base.py
Go to the documentation of this file.
00001 # Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com>
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 # 1. Redistributions of source code must retain the above copyright
00008 #    notice, this list of conditions and the following disclaimer.
00009 # 2. Redistributions in binary form must reproduce the above copyright
00010 #    notice, this list of conditions and the following disclaimer in the
00011 #    documentation and/or other materials provided with the distribution.
00012 #
00013 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
00014 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00015 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00016 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
00017 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00018 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00019 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00020 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00021 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00022 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00023 # SUCH DAMAGE.
00024 #
00025 # $Id$
00026 
00027 __author__ = 'Allan Saddi <allan@saddi.com>'
00028 __version__ = '$Revision$'
00029 
00030 import sys
00031 import os
00032 import signal
00033 import struct
00034 import cStringIO as StringIO
00035 import select
00036 import socket
00037 import errno
00038 import traceback
00039 
00040 try:
00041     import thread
00042     import threading
00043     thread_available = True
00044 except ImportError:
00045     import dummy_thread as thread
00046     import dummy_threading as threading
00047     thread_available = False
00048 
00049 # Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case.
00050 if not hasattr(socket, 'SHUT_WR'):
00051     socket.SHUT_WR = 1
00052 
00053 __all__ = ['BaseFCGIServer']
00054 
00055 # Constants from the spec.
00056 FCGI_LISTENSOCK_FILENO = 0
00057 
00058 FCGI_HEADER_LEN = 8
00059 
00060 FCGI_VERSION_1 = 1
00061 
00062 FCGI_BEGIN_REQUEST = 1
00063 FCGI_ABORT_REQUEST = 2
00064 FCGI_END_REQUEST = 3
00065 FCGI_PARAMS = 4
00066 FCGI_STDIN = 5
00067 FCGI_STDOUT = 6
00068 FCGI_STDERR = 7
00069 FCGI_DATA = 8
00070 FCGI_GET_VALUES = 9
00071 FCGI_GET_VALUES_RESULT = 10
00072 FCGI_UNKNOWN_TYPE = 11
00073 FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
00074 
00075 FCGI_NULL_REQUEST_ID = 0
00076 
00077 FCGI_KEEP_CONN = 1
00078 
00079 FCGI_RESPONDER = 1
00080 FCGI_AUTHORIZER = 2
00081 FCGI_FILTER = 3
00082 
00083 FCGI_REQUEST_COMPLETE = 0
00084 FCGI_CANT_MPX_CONN = 1
00085 FCGI_OVERLOADED = 2
00086 FCGI_UNKNOWN_ROLE = 3
00087 
00088 FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
00089 FCGI_MAX_REQS = 'FCGI_MAX_REQS'
00090 FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
00091 
00092 FCGI_Header = '!BBHHBx'
00093 FCGI_BeginRequestBody = '!HB5x'
00094 FCGI_EndRequestBody = '!LB3x'
00095 FCGI_UnknownTypeBody = '!B7x'
00096 
00097 FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
00098 FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
00099 
00100 if __debug__:
00101     import time
00102 
00103     # Set non-zero to write debug output to a file.
00104     DEBUG = 0
00105     DEBUGLOG = '/tmp/fcgi.log'
00106 
00107     def _debug(level, msg):
00108         if DEBUG < level:
00109             return
00110 
00111         try:
00112             f = open(DEBUGLOG, 'a')
00113             f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
00114             f.close()
00115         except:
00116             pass
00117 
00118 class InputStream(object):
00119     """
00120     File-like object representing FastCGI input streams (FCGI_STDIN and
00121     FCGI_DATA). Supports the minimum methods required by WSGI spec.
00122     """
00123     def __init__(self, conn):
00124         self._conn = conn
00125 
00126         # See Server.
00127         self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
00128 
00129         self._buf = ''
00130         self._bufList = []
00131         self._pos = 0 # Current read position.
00132         self._avail = 0 # Number of bytes currently available.
00133 
00134         self._eof = False # True when server has sent EOF notification.
00135 
00136     def _shrinkBuffer(self):
00137         """Gets rid of already read data (since we can't rewind)."""
00138         if self._pos >= self._shrinkThreshold:
00139             self._buf = self._buf[self._pos:]
00140             self._avail -= self._pos
00141             self._pos = 0
00142 
00143             assert self._avail >= 0
00144 
00145     def _waitForData(self):
00146         """Waits for more data to become available."""
00147         self._conn.process_input()
00148 
00149     def read(self, n=-1):
00150         if self._pos == self._avail and self._eof:
00151             return ''
00152         while True:
00153             if n < 0 or (self._avail - self._pos) < n:
00154                 # Not enough data available.
00155                 if self._eof:
00156                     # And there's no more coming.
00157                     newPos = self._avail
00158                     break
00159                 else:
00160                     # Wait for more data.
00161                     self._waitForData()
00162                     continue
00163             else:
00164                 newPos = self._pos + n
00165                 break
00166         # Merge buffer list, if necessary.
00167         if self._bufList:
00168             self._buf += ''.join(self._bufList)
00169             self._bufList = []
00170         r = self._buf[self._pos:newPos]
00171         self._pos = newPos
00172         self._shrinkBuffer()
00173         return r
00174 
00175     def readline(self, length=None):
00176         if self._pos == self._avail and self._eof:
00177             return ''
00178         while True:
00179             # Unfortunately, we need to merge the buffer list early.
00180             if self._bufList:
00181                 self._buf += ''.join(self._bufList)
00182                 self._bufList = []
00183             # Find newline.
00184             i = self._buf.find('\n', self._pos)
00185             if i < 0:
00186                 # Not found?
00187                 if self._eof:
00188                     # No more data coming.
00189                     newPos = self._avail
00190                     break
00191                 else:
00192                     if length is not None and len(self._buf) >= length + self._pos:
00193                         newPos = self._pos + length
00194                         break
00195                     # Wait for more to come.
00196                     self._waitForData()
00197                     continue
00198             else:
00199                 newPos = i + 1
00200                 break
00201         r = self._buf[self._pos:newPos]
00202         self._pos = newPos
00203         self._shrinkBuffer()
00204         return r
00205 
00206     def readlines(self, sizehint=0):
00207         total = 0
00208         lines = []
00209         line = self.readline()
00210         while line:
00211             lines.append(line)
00212             total += len(line)
00213             if 0 < sizehint <= total:
00214                 break
00215             line = self.readline()
00216         return lines
00217 
00218     def __iter__(self):
00219         return self
00220 
00221     def next(self):
00222         r = self.readline()
00223         if not r:
00224             raise StopIteration
00225         return r
00226 
00227     def add_data(self, data):
00228         if not data:
00229             self._eof = True
00230         else:
00231             self._bufList.append(data)
00232             self._avail += len(data)
00233 
00234 class MultiplexedInputStream(InputStream):
00235     """
00236     A version of InputStream meant to be used with MultiplexedConnections.
00237     Assumes the MultiplexedConnection (the producer) and the Request
00238     (the consumer) are running in different threads.
00239     """
00240     def __init__(self, conn):
00241         super(MultiplexedInputStream, self).__init__(conn)
00242 
00243         # Arbitrates access to this InputStream (it's used simultaneously
00244         # by a Request and its owning Connection object).
00245         lock = threading.RLock()
00246 
00247         # Notifies Request thread that there is new data available.
00248         self._lock = threading.Condition(lock)
00249 
00250     def _waitForData(self):
00251         # Wait for notification from add_data().
00252         self._lock.wait()
00253 
00254     def read(self, n=-1):
00255         self._lock.acquire()
00256         try:
00257             return super(MultiplexedInputStream, self).read(n)
00258         finally:
00259             self._lock.release()
00260 
00261     def readline(self, length=None):
00262         self._lock.acquire()
00263         try:
00264             return super(MultiplexedInputStream, self).readline(length)
00265         finally:
00266             self._lock.release()
00267 
00268     def add_data(self, data):
00269         self._lock.acquire()
00270         try:
00271             super(MultiplexedInputStream, self).add_data(data)
00272             self._lock.notify()
00273         finally:
00274             self._lock.release()
00275 
00276 class OutputStream(object):
00277     """
00278     FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
00279     write() or writelines() immediately result in Records being sent back
00280     to the server. Buffering should be done in a higher level!
00281     """
00282     def __init__(self, conn, req, type, buffered=False):
00283         self._conn = conn
00284         self._req = req
00285         self._type = type
00286         self._buffered = buffered
00287         self._bufList = [] # Used if buffered is True
00288         self.dataWritten = False
00289         self.closed = False
00290 
00291     def _write(self, data):
00292         length = len(data)
00293         while length:
00294             toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
00295 
00296             rec = Record(self._type, self._req.requestId)
00297             rec.contentLength = toWrite
00298             rec.contentData = data[:toWrite]
00299             self._conn.writeRecord(rec)
00300 
00301             data = data[toWrite:]
00302             length -= toWrite
00303 
00304     def write(self, data):
00305         assert not self.closed
00306 
00307         if not data:
00308             return
00309 
00310         self.dataWritten = True
00311 
00312         if self._buffered:
00313             self._bufList.append(data)
00314         else:
00315             self._write(data)
00316 
00317     def writelines(self, lines):
00318         assert not self.closed
00319 
00320         for line in lines:
00321             self.write(line)
00322 
00323     def flush(self):
00324         # Only need to flush if this OutputStream is actually buffered.
00325         if self._buffered:
00326             data = ''.join(self._bufList)
00327             self._bufList = []
00328             self._write(data)
00329 
00330     # Though available, the following should NOT be called by WSGI apps.
00331     def close(self):
00332         """Sends end-of-stream notification, if necessary."""
00333         if not self.closed and self.dataWritten:
00334             self.flush()
00335             rec = Record(self._type, self._req.requestId)
00336             self._conn.writeRecord(rec)
00337             self.closed = True
00338 
00339 class TeeOutputStream(object):
00340     """
00341     Simple wrapper around two or more output file-like objects that copies
00342     written data to all streams.
00343     """
00344     def __init__(self, streamList):
00345         self._streamList = streamList
00346 
00347     def write(self, data):
00348         for f in self._streamList:
00349             f.write(data)
00350 
00351     def writelines(self, lines):
00352         for line in lines:
00353             self.write(line)
00354 
00355     def flush(self):
00356         for f in self._streamList:
00357             f.flush()
00358 
00359 class StdoutWrapper(object):
00360     """
00361     Wrapper for sys.stdout so we know if data has actually been written.
00362     """
00363     def __init__(self, stdout):
00364         self._file = stdout
00365         self.dataWritten = False
00366 
00367     def write(self, data):
00368         if data:
00369             self.dataWritten = True
00370         self._file.write(data)
00371 
00372     def writelines(self, lines):
00373         for line in lines:
00374             self.write(line)
00375 
00376     def __getattr__(self, name):
00377         return getattr(self._file, name)
00378 
00379 def decode_pair(s, pos=0):
00380     """
00381     Decodes a name/value pair.
00382 
00383     The number of bytes decoded as well as the name/value pair
00384     are returned.
00385     """
00386     nameLength = ord(s[pos])
00387     if nameLength & 128:
00388         nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
00389         pos += 4
00390     else:
00391         pos += 1
00392 
00393     valueLength = ord(s[pos])
00394     if valueLength & 128:
00395         valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
00396         pos += 4
00397     else:
00398         pos += 1
00399 
00400     name = s[pos:pos+nameLength]
00401     pos += nameLength
00402     value = s[pos:pos+valueLength]
00403     pos += valueLength
00404 
00405     return (pos, (name, value))
00406 
00407 def encode_pair(name, value):
00408     """
00409     Encodes a name/value pair.
00410 
00411     The encoded string is returned.
00412     """
00413     nameLength = len(name)
00414     if nameLength < 128:
00415         s = chr(nameLength)
00416     else:
00417         s = struct.pack('!L', nameLength | 0x80000000L)
00418 
00419     valueLength = len(value)
00420     if valueLength < 128:
00421         s += chr(valueLength)
00422     else:
00423         s += struct.pack('!L', valueLength | 0x80000000L)
00424 
00425     return s + name + value
00426     
00427 class Record(object):
00428     """
00429     A FastCGI Record.
00430 
00431     Used for encoding/decoding records.
00432     """
00433     def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
00434         self.version = FCGI_VERSION_1
00435         self.type = type
00436         self.requestId = requestId
00437         self.contentLength = 0
00438         self.paddingLength = 0
00439         self.contentData = ''
00440 
00441     def _recvall(sock, length):
00442         """
00443         Attempts to receive length bytes from a socket, blocking if necessary.
00444         (Socket may be blocking or non-blocking.)
00445         """
00446         dataList = []
00447         recvLen = 0
00448         while length:
00449             try:
00450                 data = sock.recv(length)
00451             except socket.error, e:
00452                 if e[0] == errno.EAGAIN:
00453                     select.select([sock], [], [])
00454                     continue
00455                 else:
00456                     raise
00457             if not data: # EOF
00458                 break
00459             dataList.append(data)
00460             dataLen = len(data)
00461             recvLen += dataLen
00462             length -= dataLen
00463         return ''.join(dataList), recvLen
00464     _recvall = staticmethod(_recvall)
00465 
00466     def read(self, sock):
00467         """Read and decode a Record from a socket."""
00468         try:
00469             header, length = self._recvall(sock, FCGI_HEADER_LEN)
00470         except:
00471             raise EOFError
00472 
00473         if length < FCGI_HEADER_LEN:
00474             raise EOFError
00475         
00476         self.version, self.type, self.requestId, self.contentLength, \
00477                       self.paddingLength = struct.unpack(FCGI_Header, header)
00478 
00479         if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
00480                              'contentLength = %d' %
00481                              (sock.fileno(), self.type, self.requestId,
00482                               self.contentLength))
00483         
00484         if self.contentLength:
00485             try:
00486                 self.contentData, length = self._recvall(sock,
00487                                                          self.contentLength)
00488             except:
00489                 raise EOFError
00490 
00491             if length < self.contentLength:
00492                 raise EOFError
00493 
00494         if self.paddingLength:
00495             try:
00496                 self._recvall(sock, self.paddingLength)
00497             except:
00498                 raise EOFError
00499 
00500     def _sendall(sock, data):
00501         """
00502         Writes data to a socket and does not return until all the data is sent.
00503         """
00504         length = len(data)
00505         while length:
00506             try:
00507                 sent = sock.send(data)
00508             except socket.error, e:
00509                 if e[0] == errno.EAGAIN:
00510                     select.select([], [sock], [])
00511                     continue
00512                 else:
00513                     raise
00514             data = data[sent:]
00515             length -= sent
00516     _sendall = staticmethod(_sendall)
00517 
00518     def write(self, sock):
00519         """Encode and write a Record to a socket."""
00520         self.paddingLength = -self.contentLength & 7
00521 
00522         if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
00523                              'contentLength = %d' %
00524                              (sock.fileno(), self.type, self.requestId,
00525                               self.contentLength))
00526 
00527         header = struct.pack(FCGI_Header, self.version, self.type,
00528                              self.requestId, self.contentLength,
00529                              self.paddingLength)
00530         self._sendall(sock, header)
00531         if self.contentLength:
00532             self._sendall(sock, self.contentData)
00533         if self.paddingLength:
00534             self._sendall(sock, '\x00'*self.paddingLength)
00535             
00536 class Request(object):
00537     """
00538     Represents a single FastCGI request.
00539 
00540     These objects are passed to your handler and is the main interface
00541     between your handler and the fcgi module. The methods should not
00542     be called by your handler. However, server, params, stdin, stdout,
00543     stderr, and data are free for your handler's use.
00544     """
00545     def __init__(self, conn, inputStreamClass):
00546         self._conn = conn
00547 
00548         self.server = conn.server
00549         self.params = {}
00550         self.stdin = inputStreamClass(conn)
00551         self.stdout = OutputStream(conn, self, FCGI_STDOUT)
00552         self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
00553         self.data = inputStreamClass(conn)
00554 
00555     def run(self):
00556         """Runs the handler, flushes the streams, and ends the request."""
00557         try:
00558             protocolStatus, appStatus = self.server.handler(self)
00559         except:
00560             traceback.print_exc(file=self.stderr)
00561             self.stderr.flush()
00562             if not self.stdout.dataWritten:
00563                 self.server.error(self)
00564 
00565             protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
00566 
00567         if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
00568                              (protocolStatus, appStatus))
00569 
00570         try:
00571             self._flush()
00572             self._end(appStatus, protocolStatus)
00573         except socket.error, e:
00574             if e[0] != errno.EPIPE:
00575                 raise
00576 
00577     def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
00578         self._conn.end_request(self, appStatus, protocolStatus)
00579         
00580     def _flush(self):
00581         self.stdout.close()
00582         self.stderr.close()
00583 
00584 class CGIRequest(Request):
00585     """A normal CGI request disguised as a FastCGI request."""
00586     def __init__(self, server):
00587         # These are normally filled in by Connection.
00588         self.requestId = 1
00589         self.role = FCGI_RESPONDER
00590         self.flags = 0
00591         self.aborted = False
00592         
00593         self.server = server
00594         self.params = dict(os.environ)
00595         self.stdin = sys.stdin
00596         self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
00597         self.stderr = sys.stderr
00598         self.data = StringIO.StringIO()
00599         
00600     def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
00601         sys.exit(appStatus)
00602 
00603     def _flush(self):
00604         # Not buffered, do nothing.
00605         pass
00606 
00607 class Connection(object):
00608     """
00609     A Connection with the web server.
00610 
00611     Each Connection is associated with a single socket (which is
00612     connected to the web server) and is responsible for handling all
00613     the FastCGI message processing for that socket.
00614     """
00615     _multiplexed = False
00616     _inputStreamClass = InputStream
00617 
00618     def __init__(self, sock, addr, server):
00619         self._sock = sock
00620         self._addr = addr
00621         self.server = server
00622 
00623         # Active Requests for this Connection, mapped by request ID.
00624         self._requests = {}
00625 
00626     def _cleanupSocket(self):
00627         """Close the Connection's socket."""
00628         try:
00629             self._sock.shutdown(socket.SHUT_WR)
00630         except:
00631             return
00632         try:
00633             while True:
00634                 r, w, e = select.select([self._sock], [], [])
00635                 if not r or not self._sock.recv(1024):
00636                     break
00637         except:
00638             pass
00639         self._sock.close()
00640         
00641     def run(self):
00642         """Begin processing data from the socket."""
00643         self._keepGoing = True
00644         while self._keepGoing:
00645             try:
00646                 self.process_input()
00647             except (EOFError, KeyboardInterrupt):
00648                 break
00649             except (select.error, socket.error), e:
00650                 if e[0] == errno.EBADF: # Socket was closed by Request.
00651                     break
00652                 raise
00653 
00654         self._cleanupSocket()
00655 
00656     def process_input(self):
00657         """Attempt to read a single Record from the socket and process it."""
00658         # Currently, any children Request threads notify this Connection
00659         # that it is no longer needed by closing the Connection's socket.
00660         # We need to put a timeout on select, otherwise we might get
00661         # stuck in it indefinitely... (I don't like this solution.)
00662         while self._keepGoing:
00663             try:
00664                 r, w, e = select.select([self._sock], [], [], 1.0)
00665             except ValueError:
00666                 # Sigh. ValueError gets thrown sometimes when passing select
00667                 # a closed socket.
00668                 raise EOFError
00669             if r: break
00670         if not self._keepGoing:
00671             return
00672         rec = Record()
00673         rec.read(self._sock)
00674 
00675         if rec.type == FCGI_GET_VALUES:
00676             self._do_get_values(rec)
00677         elif rec.type == FCGI_BEGIN_REQUEST:
00678             self._do_begin_request(rec)
00679         elif rec.type == FCGI_ABORT_REQUEST:
00680             self._do_abort_request(rec)
00681         elif rec.type == FCGI_PARAMS:
00682             self._do_params(rec)
00683         elif rec.type == FCGI_STDIN:
00684             self._do_stdin(rec)
00685         elif rec.type == FCGI_DATA:
00686             self._do_data(rec)
00687         elif rec.requestId == FCGI_NULL_REQUEST_ID:
00688             self._do_unknown_type(rec)
00689         else:
00690             # Need to complain about this.
00691             pass
00692 
00693     def writeRecord(self, rec):
00694         """
00695         Write a Record to the socket.
00696         """
00697         rec.write(self._sock)
00698 
00699     def end_request(self, req, appStatus=0L,
00700                     protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
00701         """
00702         End a Request.
00703 
00704         Called by Request objects. An FCGI_END_REQUEST Record is
00705         sent to the web server. If the web server no longer requires
00706         the connection, the socket is closed, thereby ending this
00707         Connection (run() returns).
00708         """
00709         rec = Record(FCGI_END_REQUEST, req.requestId)
00710         rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
00711                                       protocolStatus)
00712         rec.contentLength = FCGI_EndRequestBody_LEN
00713         self.writeRecord(rec)
00714 
00715         if remove:
00716             del self._requests[req.requestId]
00717 
00718         if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
00719 
00720         if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
00721             self._cleanupSocket()
00722             self._keepGoing = False
00723 
00724     def _do_get_values(self, inrec):
00725         """Handle an FCGI_GET_VALUES request from the web server."""
00726         outrec = Record(FCGI_GET_VALUES_RESULT)
00727 
00728         pos = 0
00729         while pos < inrec.contentLength:
00730             pos, (name, value) = decode_pair(inrec.contentData, pos)
00731             cap = self.server.capability.get(name)
00732             if cap is not None:
00733                 outrec.contentData += encode_pair(name, str(cap))
00734 
00735         outrec.contentLength = len(outrec.contentData)
00736         self.writeRecord(outrec)
00737 
00738     def _do_begin_request(self, inrec):
00739         """Handle an FCGI_BEGIN_REQUEST from the web server."""
00740         role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
00741 
00742         req = self.server.request_class(self, self._inputStreamClass)
00743         req.requestId, req.role, req.flags = inrec.requestId, role, flags
00744         req.aborted = False
00745 
00746         if not self._multiplexed and self._requests:
00747             # Can't multiplex requests.
00748             self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
00749         else:
00750             self._requests[inrec.requestId] = req
00751 
00752     def _do_abort_request(self, inrec):
00753         """
00754         Handle an FCGI_ABORT_REQUEST from the web server.
00755 
00756         We just mark a flag in the associated Request.
00757         """
00758         req = self._requests.get(inrec.requestId)
00759         if req is not None:
00760             req.aborted = True
00761 
00762     def _start_request(self, req):
00763         """Run the request."""
00764         # Not multiplexed, so run it inline.
00765         req.run()
00766 
00767     def _do_params(self, inrec):
00768         """
00769         Handle an FCGI_PARAMS Record.
00770 
00771         If the last FCGI_PARAMS Record is received, start the request.
00772         """
00773         req = self._requests.get(inrec.requestId)
00774         if req is not None:
00775             if inrec.contentLength:
00776                 pos = 0
00777                 while pos < inrec.contentLength:
00778                     pos, (name, value) = decode_pair(inrec.contentData, pos)
00779                     req.params[name] = value
00780             else:
00781                 self._start_request(req)
00782 
00783     def _do_stdin(self, inrec):
00784         """Handle the FCGI_STDIN stream."""
00785         req = self._requests.get(inrec.requestId)
00786         if req is not None:
00787             req.stdin.add_data(inrec.contentData)
00788 
00789     def _do_data(self, inrec):
00790         """Handle the FCGI_DATA stream."""
00791         req = self._requests.get(inrec.requestId)
00792         if req is not None:
00793             req.data.add_data(inrec.contentData)
00794 
00795     def _do_unknown_type(self, inrec):
00796         """Handle an unknown request type. Respond accordingly."""
00797         outrec = Record(FCGI_UNKNOWN_TYPE)
00798         outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
00799         outrec.contentLength = FCGI_UnknownTypeBody_LEN
00800         self.writeRecord(rec)
00801         
00802 class MultiplexedConnection(Connection):
00803     """
00804     A version of Connection capable of handling multiple requests
00805     simultaneously.
00806     """
00807     _multiplexed = True
00808     _inputStreamClass = MultiplexedInputStream
00809 
00810     def __init__(self, sock, addr, server):
00811         super(MultiplexedConnection, self).__init__(sock, addr, server)
00812 
00813         # Used to arbitrate access to self._requests.
00814         lock = threading.RLock()
00815 
00816         # Notification is posted everytime a request completes, allowing us
00817         # to quit cleanly.
00818         self._lock = threading.Condition(lock)
00819 
00820     def _cleanupSocket(self):
00821         # Wait for any outstanding requests before closing the socket.
00822         self._lock.acquire()
00823         while self._requests:
00824             self._lock.wait()
00825         self._lock.release()
00826 
00827         super(MultiplexedConnection, self)._cleanupSocket()
00828         
00829     def writeRecord(self, rec):
00830         # Must use locking to prevent intermingling of Records from different
00831         # threads.
00832         self._lock.acquire()
00833         try:
00834             # Probably faster than calling super. ;)
00835             rec.write(self._sock)
00836         finally:
00837             self._lock.release()
00838 
00839     def end_request(self, req, appStatus=0L,
00840                     protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
00841         self._lock.acquire()
00842         try:
00843             super(MultiplexedConnection, self).end_request(req, appStatus,
00844                                                            protocolStatus,
00845                                                            remove)
00846             self._lock.notify()
00847         finally:
00848             self._lock.release()
00849 
00850     def _do_begin_request(self, inrec):
00851         self._lock.acquire()
00852         try:
00853             super(MultiplexedConnection, self)._do_begin_request(inrec)
00854         finally:
00855             self._lock.release()
00856 
00857     def _do_abort_request(self, inrec):
00858         self._lock.acquire()
00859         try:
00860             super(MultiplexedConnection, self)._do_abort_request(inrec)
00861         finally:
00862             self._lock.release()
00863 
00864     def _start_request(self, req):
00865         thread.start_new_thread(req.run, ())
00866 
00867     def _do_params(self, inrec):
00868         self._lock.acquire()
00869         try:
00870             super(MultiplexedConnection, self)._do_params(inrec)
00871         finally:
00872             self._lock.release()
00873 
00874     def _do_stdin(self, inrec):
00875         self._lock.acquire()
00876         try:
00877             super(MultiplexedConnection, self)._do_stdin(inrec)
00878         finally:
00879             self._lock.release()
00880 
00881     def _do_data(self, inrec):
00882         self._lock.acquire()
00883         try:
00884             super(MultiplexedConnection, self)._do_data(inrec)
00885         finally:
00886             self._lock.release()
00887         
00888 class BaseFCGIServer(object):
00889     request_class = Request
00890     cgirequest_class = CGIRequest
00891 
00892     # The maximum number of bytes (per Record) to write to the server.
00893     # I've noticed mod_fastcgi has a relatively small receive buffer (8K or
00894     # so).
00895     maxwrite = 8192
00896 
00897     # Limits the size of the InputStream's string buffer to this size + the
00898     # server's maximum Record size. Since the InputStream is not seekable,
00899     # we throw away already-read data once this certain amount has been read.
00900     inputStreamShrinkThreshold = 102400 - 8192
00901 
00902     def __init__(self, application, environ=None,
00903                  multithreaded=True, multiprocess=False,
00904                  bindAddress=None, umask=None, multiplexed=False,
00905                  debug=False, roles=(FCGI_RESPONDER,),
00906                  forceCGI=False):
00907         """
00908         bindAddress, if present, must either be a string or a 2-tuple. If
00909         present, run() will open its own listening socket. You would use
00910         this if you wanted to run your application as an 'external' FastCGI
00911         app. (i.e. the webserver would no longer be responsible for starting
00912         your app) If a string, it will be interpreted as a filename and a UNIX
00913         socket will be opened. If a tuple, the first element, a string,
00914         is the interface name/IP to bind to, and the second element (an int)
00915         is the port number.
00916 
00917         If binding to a UNIX socket, umask may be set to specify what
00918         the umask is to be changed to before the socket is created in the
00919         filesystem. After the socket is created, the previous umask is
00920         restored.
00921         
00922         Set multiplexed to True if you want to handle multiple requests
00923         per connection. Some FastCGI backends (namely mod_fastcgi) don't
00924         multiplex requests at all, so by default this is off (which saves
00925         on thread creation/locking overhead). If threads aren't available,
00926         this keyword is ignored; it's not possible to multiplex requests
00927         at all.
00928         """
00929         if environ is None:
00930             environ = {}
00931 
00932         self.application = application
00933         self.environ = environ
00934         self.multithreaded = multithreaded
00935         self.multiprocess = multiprocess
00936         self.debug = debug
00937         self.roles = roles
00938         self.forceCGI = forceCGI
00939 
00940         self._bindAddress = bindAddress
00941         self._umask = umask
00942         
00943         # Used to force single-threadedness
00944         self._appLock = thread.allocate_lock()
00945 
00946         if thread_available:
00947             try:
00948                 import resource
00949                 # Attempt to glean the maximum number of connections
00950                 # from the OS.
00951                 maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
00952             except ImportError:
00953                 maxConns = 100 # Just some made up number.
00954             maxReqs = maxConns
00955             if multiplexed:
00956                 self._connectionClass = MultiplexedConnection
00957                 maxReqs *= 5 # Another made up number.
00958             else:
00959                 self._connectionClass = Connection
00960             self.capability = {
00961                 FCGI_MAX_CONNS: maxConns,
00962                 FCGI_MAX_REQS: maxReqs,
00963                 FCGI_MPXS_CONNS: multiplexed and 1 or 0
00964                 }
00965         else:
00966             self._connectionClass = Connection
00967             self.capability = {
00968                 # If threads aren't available, these are pretty much correct.
00969                 FCGI_MAX_CONNS: 1,
00970                 FCGI_MAX_REQS: 1,
00971                 FCGI_MPXS_CONNS: 0
00972                 }
00973 
00974     def _setupSocket(self):
00975         if self._bindAddress is None: # Run as a normal FastCGI?
00976             isFCGI = True
00977 
00978             sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
00979                                  socket.SOCK_STREAM)
00980             try:
00981                 sock.getpeername()
00982             except socket.error, e:
00983                 if e[0] == errno.ENOTSOCK:
00984                     # Not a socket, assume CGI context.
00985                     isFCGI = False
00986                 elif e[0] != errno.ENOTCONN:
00987                     raise
00988 
00989             # FastCGI/CGI discrimination is broken on Mac OS X.
00990             # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
00991             # if you want to run your app as a simple CGI. (You can do
00992             # this with Apache's mod_env [not loaded by default in OS X
00993             # client, ha ha] and the SetEnv directive.)
00994             if not isFCGI or self.forceCGI or \
00995                os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
00996                 req = self.cgirequest_class(self)
00997                 req.run()
00998                 sys.exit(0)
00999         else:
01000             # Run as a server
01001             oldUmask = None
01002             if type(self._bindAddress) is str:
01003                 # Unix socket
01004                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
01005                 try:
01006                     os.unlink(self._bindAddress)
01007                 except OSError:
01008                     pass
01009                 if self._umask is not None:
01010                     oldUmask = os.umask(self._umask)
01011             else:
01012                 # INET socket
01013                 assert type(self._bindAddress) is tuple
01014                 assert len(self._bindAddress) == 2
01015                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
01016                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
01017 
01018             sock.bind(self._bindAddress)
01019             sock.listen(socket.SOMAXCONN)
01020 
01021             if oldUmask is not None:
01022                 os.umask(oldUmask)
01023                 
01024         return sock
01025 
01026     def _cleanupSocket(self, sock):
01027         """Closes the main socket."""
01028         sock.close()
01029 
01030     def handler(self, req):
01031         """Special handler for WSGI."""
01032         if req.role not in self.roles:
01033             return FCGI_UNKNOWN_ROLE, 0
01034 
01035         # Mostly taken from example CGI gateway.
01036         environ = req.params
01037         environ.update(self.environ)
01038 
01039         environ['wsgi.version'] = (1,0)
01040         environ['wsgi.input'] = req.stdin
01041         if self._bindAddress is None:
01042             stderr = req.stderr
01043         else:
01044             stderr = TeeOutputStream((sys.stderr, req.stderr))
01045         environ['wsgi.errors'] = stderr
01046         environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
01047                                       thread_available and self.multithreaded
01048         environ['wsgi.multiprocess'] = isinstance(req, CGIRequest) or \
01049                                        self.multiprocess
01050         environ['wsgi.run_once'] = isinstance(req, CGIRequest)
01051 
01052         if environ.get('HTTPS', 'off') in ('on', '1'):
01053             environ['wsgi.url_scheme'] = 'https'
01054         else:
01055             environ['wsgi.url_scheme'] = 'http'
01056 
01057         self._sanitizeEnv(environ)
01058 
01059         headers_set = []
01060         headers_sent = []
01061         result = None
01062 
01063         def write(data):
01064             assert type(data) is str, 'write() argument must be string'
01065             assert headers_set, 'write() before start_response()'
01066 
01067             if not headers_sent:
01068                 status, responseHeaders = headers_sent[:] = headers_set
01069                 found = False
01070                 for header,value in responseHeaders:
01071                     if header.lower() == 'content-length':
01072                         found = True
01073                         break
01074                 if not found and result is not None:
01075                     try:
01076                         if len(result) == 1:
01077                             responseHeaders.append(('Content-Length',
01078                                                     str(len(data))))
01079                     except:
01080                         pass
01081                 s = 'Status: %s\r\n' % status
01082                 for header in responseHeaders:
01083                     s += '%s: %s\r\n' % header
01084                 s += '\r\n'
01085                 req.stdout.write(s)
01086 
01087             req.stdout.write(data)
01088             req.stdout.flush()
01089 
01090         def start_response(status, response_headers, exc_info=None):
01091             if exc_info:
01092                 try:
01093                     if headers_sent:
01094                         # Re-raise if too late
01095                         raise exc_info[0], exc_info[1], exc_info[2]
01096                 finally:
01097                     exc_info = None # avoid dangling circular ref
01098             else:
01099                 assert not headers_set, 'Headers already set!'
01100 
01101             assert type(status) is str, 'Status must be a string'
01102             assert len(status) >= 4, 'Status must be at least 4 characters'
01103             assert int(status[:3]), 'Status must begin with 3-digit code'
01104             assert status[3] == ' ', 'Status must have a space after code'
01105             assert type(response_headers) is list, 'Headers must be a list'
01106             if __debug__:
01107                 for name,val in response_headers:
01108                     assert type(name) is str, 'Header name "%s" must be a string' % name
01109                     assert type(val) is str, 'Value of header "%s" must be a string' % name
01110 
01111             headers_set[:] = [status, response_headers]
01112             return write
01113 
01114         if not self.multithreaded:
01115             self._appLock.acquire()
01116         try:
01117             try:
01118                 result = self.application(environ, start_response)
01119                 try:
01120                     for data in result:
01121                         if data:
01122                             write(data)
01123                     if not headers_sent:
01124                         write('') # in case body was empty
01125                 finally:
01126                     if hasattr(result, 'close'):
01127                         result.close()
01128             except socket.error, e:
01129                 if e[0] != errno.EPIPE:
01130                     raise # Don't let EPIPE propagate beyond server
01131         finally:
01132             if not self.multithreaded:
01133                 self._appLock.release()
01134 
01135         return FCGI_REQUEST_COMPLETE, 0
01136 
01137     def _sanitizeEnv(self, environ):
01138         """Ensure certain values are present, if required by WSGI."""
01139         if not environ.has_key('SCRIPT_NAME'):
01140             environ['SCRIPT_NAME'] = ''
01141 
01142         reqUri = None
01143         if environ.has_key('REQUEST_URI'):
01144             reqUri = environ['REQUEST_URI'].split('?', 1)
01145 
01146         if not environ.has_key('PATH_INFO') or not environ['PATH_INFO']:
01147             if reqUri is not None:
01148                 environ['PATH_INFO'] = reqUri[0]
01149             else:
01150                 environ['PATH_INFO'] = ''
01151         if not environ.has_key('QUERY_STRING') or not environ['QUERY_STRING']:
01152             if reqUri is not None and len(reqUri) > 1:
01153                 environ['QUERY_STRING'] = reqUri[1]
01154             else:
01155                 environ['QUERY_STRING'] = ''
01156 
01157         # If any of these are missing, it probably signifies a broken
01158         # server...
01159         for name,default in [('REQUEST_METHOD', 'GET'),
01160                              ('SERVER_NAME', 'localhost'),
01161                              ('SERVER_PORT', '80'),
01162                              ('SERVER_PROTOCOL', 'HTTP/1.0')]:
01163             if not environ.has_key(name):
01164                 environ['wsgi.errors'].write('%s: missing FastCGI param %s '
01165                                              'required by WSGI!\n' %
01166                                              (self.__class__.__name__, name))
01167                 environ[name] = default
01168             
01169     def error(self, req):
01170         """
01171         Called by Request if an exception occurs within the handler. May and
01172         should be overridden.
01173         """
01174         if self.debug:
01175             import cgitb
01176             req.stdout.write('Content-Type: text/html\r\n\r\n' +
01177                              cgitb.html(sys.exc_info()))
01178         else:
01179             errorpage = """<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
01180 <html><head>
01181 <title>Unhandled Exception</title>
01182 </head><body>
01183 <h1>Unhandled Exception</h1>
01184 <p>An unhandled exception was thrown by the application.</p>
01185 </body></html>
01186 """
01187             req.stdout.write('Content-Type: text/html\r\n\r\n' +
01188                              errorpage)