Back to index

moin  1.9.0~rc2
threadpool.py
Go to the documentation of this file.
00001 # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 # 1. Redistributions of source code must retain the above copyright
00008 #    notice, this list of conditions and the following disclaimer.
00009 # 2. Redistributions in binary form must reproduce the above copyright
00010 #    notice, this list of conditions and the following disclaimer in the
00011 #    documentation and/or other materials provided with the distribution.
00012 #
00013 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
00014 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00015 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00016 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
00017 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00018 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00019 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00020 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00021 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00022 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00023 # SUCH DAMAGE.
00024 #
00025 # $Id$
00026 
00027 __author__ = 'Allan Saddi <allan@saddi.com>'
00028 __version__ = '$Revision$'
00029 
00030 import sys
00031 import threading
00032 
00033 class ThreadPool(object):
00034     """
00035     Thread pool that maintains the number of idle threads between
00036     minSpare and maxSpare inclusive. By default, there is no limit on
00037     the number of threads that can be started, but this can be controlled
00038     by maxThreads.
00039     """
00040     def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint):
00041         self._minSpare = minSpare
00042         self._maxSpare = maxSpare
00043         self._maxThreads = max(minSpare, maxThreads)
00044 
00045         self._lock = threading.Condition()
00046         self._workQueue = []
00047         self._idleCount = self._workerCount = maxSpare
00048 
00049         self._threads = []
00050         self._stop = False
00051 
00052         # Start the minimum number of worker threads.
00053         for i in range(maxSpare):
00054             self._start_new_thread()
00055             
00056     def _start_new_thread(self):
00057         t = threading.Thread(target=self._worker)
00058         self._threads.append(t)
00059         t.setDaemon(True)
00060         t.start()
00061         return t
00062         
00063     def shutdown(self):
00064         """shutdown all workers."""
00065         self._lock.acquire()
00066         self._stop = True
00067         self._lock.notifyAll()
00068         self._lock.release()
00069 
00070         # wait for all threads to finish
00071         for t in self._threads:
00072             t.join()
00073 
00074     def addJob(self, job, allowQueuing=True):
00075         """
00076         Adds a job to the work queue. The job object should have a run()
00077         method. If allowQueuing is True (the default), the job will be
00078         added to the work queue regardless if there are any idle threads
00079         ready. (The only way for there to be no idle threads is if maxThreads
00080         is some reasonable, finite limit.)
00081 
00082         Otherwise, if allowQueuing is False, and there are no more idle
00083         threads, the job will not be queued.
00084 
00085         Returns True if the job was queued, False otherwise.
00086         """
00087         self._lock.acquire()
00088         try:
00089             # Maintain minimum number of spares.
00090             while self._idleCount < self._minSpare and \
00091                   self._workerCount < self._maxThreads:
00092                 self._workerCount += 1
00093                 self._idleCount += 1
00094                 self._start_new_thread()
00095 
00096             # Hand off the job.
00097             if self._idleCount or allowQueuing:
00098                 self._workQueue.append(job)
00099                 self._lock.notify()
00100                 return True
00101             else:
00102                 return False
00103         finally:
00104             self._lock.release()
00105 
00106     def _worker(self):
00107         """
00108         Worker thread routine. Waits for a job, executes it, repeat.
00109         """
00110         self._lock.acquire()
00111         try:
00112             while True:
00113                 while not self._workQueue and not self._stop:
00114                     self._lock.wait()
00115                 
00116                 if self._stop:
00117                     return
00118 
00119                 # We have a job to do...
00120                 job = self._workQueue.pop(0)
00121 
00122                 assert self._idleCount > 0
00123                 self._idleCount -= 1
00124 
00125                 self._lock.release()
00126 
00127                 try:
00128                     job.run()
00129                 except:
00130                     # FIXME: This should really be reported somewhere.
00131                     # But we can't simply report it to stderr because of fcgi
00132                     pass
00133 
00134                 self._lock.acquire()
00135 
00136                 if self._idleCount == self._maxSpare:
00137                     break # NB: lock still held
00138                 self._idleCount += 1
00139                 assert self._idleCount <= self._maxSpare
00140 
00141             # Die off...
00142             assert self._workerCount > self._maxSpare
00143             self._workerCount -= 1
00144         finally:
00145             self._lock.release()