Back to index

nordugrid-arc-nox  1.1.0~rc6
threadpool.py
Go to the documentation of this file.
00001 import threading, traceback
00002 
00003 from time import sleep
00004 
00005 # Slightly modified from http://code.activestate.com/recipes/203871/
00006 
00007 class ThreadPool:
00008 
00009     """Flexible thread pool class.  Creates a pool of threads, then
00010     accepts tasks that will be dispatched to the next available
00011     thread."""
00012     
00013     def __init__(self, numThreads):
00014 
00015         """Initialize the thread pool with numThreads workers."""
00016         
00017         self.__threads = []
00018         self.__resizeLock = threading.Condition(threading.Lock())
00019         self.__taskLock = threading.Condition(threading.Lock())
00020         self.__tasks = []
00021         self.__isJoining = False
00022         self.setThreadCount(numThreads)
00023 
00024     def setThreadCount(self, newNumThreads):
00025 
00026         """ External method to set the current pool size.  Acquires
00027         the resizing lock, then calls the internal version to do real
00028         work."""
00029         
00030         # Can't change the thread count if we're shutting down the pool!
00031         if self.__isJoining:
00032             return False
00033         
00034         self.__resizeLock.acquire()
00035         try:
00036             self.__setThreadCountNolock(newNumThreads)
00037         finally:
00038             self.__resizeLock.release()
00039         return True
00040 
00041     def __setThreadCountNolock(self, newNumThreads):
00042         
00043         """Set the current pool size, spawning or terminating threads
00044         if necessary.  Internal use only; assumes the resizing lock is
00045         held."""
00046         
00047         # If we need to grow the pool, do so
00048         while newNumThreads > len(self.__threads):
00049             newThread = ThreadPoolThread(self)
00050             self.__threads.append(newThread)
00051             newThread.start()
00052         # If we need to shrink the pool, do so
00053         while newNumThreads < len(self.__threads):
00054             self.__threads[0].goAway()
00055             del self.__threads[0]
00056 
00057     def getThreadCount(self):
00058 
00059         """Return the number of threads in the pool."""
00060         
00061         self.__resizeLock.acquire()
00062         try:
00063             return len(self.__threads)
00064         finally:
00065             self.__resizeLock.release()
00066 
00067     def queueTask(self, task, args=None, taskCallback=None):
00068 
00069         """Insert a task into the queue.  task must be callable;
00070         args and taskCallback can be None."""
00071         
00072         if self.__isJoining == True:
00073             return False
00074         if not callable(task):
00075             return False
00076         
00077         self.__taskLock.acquire()
00078         try:
00079             self.__tasks.append((task, args, taskCallback))
00080             return True
00081         finally:
00082             self.__taskLock.release()
00083 
00084     def getNextTask(self):
00085 
00086         """ Retrieve the next task from the task queue.  For use
00087         only by ThreadPoolThread objects contained in the pool."""
00088         
00089         self.__taskLock.acquire()
00090         try:
00091             if self.__tasks == []:
00092                 return (None, None, None)
00093             else:
00094                 return self.__tasks.pop(0)
00095         finally:
00096             self.__taskLock.release()
00097     
00098     def joinAll(self, waitForTasks = True, waitForThreads = True):
00099 
00100         """ Clear the task queue and terminate all pooled threads,
00101         optionally allowing the tasks and threads to finish."""
00102 
00103         # Mark the pool as joining to prevent any more task queueing
00104         self.__isJoining = True
00105 
00106         # Wait for tasks to finish
00107         if waitForTasks:
00108             while self.__tasks != []:
00109                 sleep(0.1)
00110 
00111         # Tell all the threads to quit
00112         self.__resizeLock.acquire()
00113         try:
00114             # Wait until all threads have exited
00115             if waitForThreads:
00116                 for t in self.__threads:
00117                     t.goAway()
00118                 for t in self.__threads:
00119                     t.join()
00120                     # print t,"joined"
00121                     del t
00122             self.__setThreadCountNolock(0)
00123             self.__isJoining = True
00124 
00125             # Reset the pool for potential reuse
00126             self.__isJoining = False
00127         finally:
00128             self.__resizeLock.release()
00129         
00130 class ThreadPoolThread(threading.Thread):
00131 
00132     """ Pooled thread class. """
00133     
00134     threadSleepTime = 0.1
00135 
00136     def __init__(self, pool):
00137 
00138         """ Initialize the thread and remember the pool. """
00139         
00140         threading.Thread.__init__(self)
00141         self.__pool = pool
00142         self.__isDying = False
00143         
00144     def run(self):
00145 
00146         """ Until told to quit, retrieve the next task and execute
00147         it, calling the callback if any.  """
00148         
00149         while self.__isDying == False:
00150             cmd, args, callback = self.__pool.getNextTask()
00151             # If there's nothing to do, just sleep a bit
00152             if cmd is None:
00153                 sleep(ThreadPoolThread.threadSleepTime)
00154             elif callback is None:
00155                 if args is None:
00156                     cmd()
00157                 elif isinstance(args,list):
00158                     cmd(*args)
00159                 else:
00160                     cmd(args)
00161             else:
00162                 if args is None:
00163                     callback(cmd()) 
00164                 elif isinstance(args,list):
00165                     callback(cmd(*args))
00166                 else:
00167                     callback(cmd(args))
00168                     
00169     def goAway(self):
00170 
00171         """ Exit the run loop next time through."""
00172         
00173         self.__isDying = True
00174 
00175             
00176 # from http://my.safaribooksonline.com/0596001673/pythoncook-CHP-6-SECT-4
00177 class ReadWriteLock:
00178     """ A lock object that allows many simultaneous "read locks", but
00179     only one "write lock." """
00180 
00181     def __init__(self):
00182         self._read_ready = threading.Condition(threading.Lock())
00183         self._readers = 0
00184 
00185     def acquire_read(self):
00186         """ Acquire a read lock. Blocks only if a thread has
00187         acquired the write lock. """
00188         self._read_ready.acquire()
00189         try:
00190             self._readers += 1
00191         finally:
00192             self._read_ready.release()
00193 
00194     def release_read(self):
00195         """ Release a read lock. """
00196         self._read_ready.acquire()
00197         try:
00198             self._readers -= 1
00199             if not self._readers:
00200                 self._read_ready.notifyAll()
00201         finally:
00202             self._read_ready.release()
00203 
00204     def acquire_write(self):
00205         """ Acquire a write lock. Blocks until there are no
00206         acquired read or write locks. """
00207         self._read_ready.acquire()
00208         while self._readers > 0:
00209             self._read_ready.wait()
00210 
00211     def release_write(self):    
00212         """ Release a write lock. """
00213         self._read_ready.release()
00214 
00215 COUNTER = 0
00216 
00217 # Usage example
00218 if __name__ == "__main__":
00219 
00220     from random import randrange
00221 
00222     # Sample task 1: given a start and end value, shuffle integers,
00223     # then sort them
00224     
00225     def sortTask(data):
00226         print "SortTask starting for ", data
00227         numbers = range(data[0], data[1])
00228         for a in numbers:
00229             rnd = randrange(0, len(numbers) - 1)
00230             a, numbers[rnd] = numbers[rnd], a
00231         print "SortTask sorting for ", data
00232         numbers.sort()
00233         print "SortTask done for ", data
00234         return "Sorter ", data
00235 
00236     # Sample task 2: just sleep for a number of seconds.
00237 
00238     def waitTask(data):
00239         print "WaitTask starting for ", data
00240         print "WaitTask sleeping for %d seconds" % data
00241         sleep(data)
00242         return "Waiter", data
00243 
00244     locker = ReadWriteLock()
00245     def addTask(data):
00246         global COUNTER
00247         nadds = 10000
00248         for i in range(nadds):
00249             # re-acquiring lock for every add
00250             # don't do this at home...
00251             locker.acquire_write()
00252             COUNTER += data
00253             locker.release_write()
00254         print "Added %d to counter, counter is now %d"%(data*nadds,COUNTER)
00255         return "addTask", data
00256     
00257     # Both tasks use the same callback
00258 
00259     def taskCallback(data):
00260         print "Callback called for", data
00261     # Create a pool with three worker threads
00262 
00263     pool = ThreadPool(3)
00264 
00265     # Insert tasks into the queue and let them run
00266     pool.queueTask(sortTask, (1000, 100000), taskCallback)
00267     pool.queueTask(waitTask, 5, taskCallback)
00268     pool.queueTask(sortTask, (200, 200000), taskCallback)
00269     pool.queueTask(waitTask, 2, taskCallback)
00270     pool.queueTask(sortTask, (3, 30000), taskCallback)
00271     pool.queueTask(waitTask, 7, taskCallback)
00272 
00273     pool.joinAll()
00274 
00275     pool = ThreadPool(5)
00276 
00277     pool.queueTask(addTask, 10, taskCallback)
00278     pool.queueTask(addTask, 10, taskCallback)
00279     pool.queueTask(addTask, 10, taskCallback)
00280     pool.queueTask(addTask, 10, taskCallback)
00281     
00282     print "before join", COUNTER
00283     # When all tasks are finished, allow the threads to terminate
00284     pool.joinAll()
00285     print "after join", COUNTER
00286