Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Private Member Functions | Private Attributes
arcom.threadpool.ThreadPool Class Reference

List of all members.

Public Member Functions

def __init__
def setThreadCount
def getThreadCount
def queueTask
def getNextTask
def joinAll

Private Member Functions

def __setThreadCountNolock

Private Attributes

 __threads
 __resizeLock
 __taskLock
 __tasks
 __isJoining

Detailed Description

Flexible thread pool class.  Creates a pool of threads, then
accepts tasks that will be dispatched to the next available
thread.

Definition at line 7 of file threadpool.py.


Constructor & Destructor Documentation

def arcom.threadpool.ThreadPool.__init__ (   self,
  numThreads 
)
Initialize the thread pool with numThreads workers.

Definition at line 13 of file threadpool.py.

00013 
00014     def __init__(self, numThreads):
00015 
00016         """Initialize the thread pool with numThreads workers."""
00017         
00018         self.__threads = []
00019         self.__resizeLock = threading.Condition(threading.Lock())
00020         self.__taskLock = threading.Condition(threading.Lock())
00021         self.__tasks = []
00022         self.__isJoining = False
00023         self.setThreadCount(numThreads)


Member Function Documentation

def arcom.threadpool.ThreadPool.__setThreadCountNolock (   self,
  newNumThreads 
) [private]
Set the current pool size, spawning or terminating threads
if necessary.  Internal use only; assumes the resizing lock is
held.

Definition at line 41 of file threadpool.py.

00041 
00042     def __setThreadCountNolock(self, newNumThreads):
00043         
00044         """Set the current pool size, spawning or terminating threads
00045         if necessary.  Internal use only; assumes the resizing lock is
00046         held."""
00047         
00048         # If we need to grow the pool, do so
00049         while newNumThreads > len(self.__threads):
00050             newThread = ThreadPoolThread(self)
00051             self.__threads.append(newThread)
00052             newThread.start()
00053         # If we need to shrink the pool, do so
00054         while newNumThreads < len(self.__threads):
00055             self.__threads[0].goAway()
00056             del self.__threads[0]

Here is the caller graph for this function:

Retrieve the next task from the task queue.  For use
only by ThreadPoolThread objects contained in the pool.

Definition at line 84 of file threadpool.py.

00084 
00085     def getNextTask(self):
00086 
00087         """ Retrieve the next task from the task queue.  For use
00088         only by ThreadPoolThread objects contained in the pool."""
00089         
00090         self.__taskLock.acquire()
00091         try:
00092             if self.__tasks == []:
00093                 return (None, None, None)
00094             else:
00095                 return self.__tasks.pop(0)
00096         finally:
00097             self.__taskLock.release()
    
Return the number of threads in the pool.

Definition at line 57 of file threadpool.py.

00057 
00058     def getThreadCount(self):
00059 
00060         """Return the number of threads in the pool."""
00061         
00062         self.__resizeLock.acquire()
00063         try:
00064             return len(self.__threads)
00065         finally:
00066             self.__resizeLock.release()

def arcom.threadpool.ThreadPool.joinAll (   self,
  waitForTasks = True,
  waitForThreads = True 
)
Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish.

Definition at line 98 of file threadpool.py.

00098 
00099     def joinAll(self, waitForTasks = True, waitForThreads = True):
00100 
00101         """ Clear the task queue and terminate all pooled threads,
00102         optionally allowing the tasks and threads to finish."""
00103 
00104         # Mark the pool as joining to prevent any more task queueing
00105         self.__isJoining = True
00106 
00107         # Wait for tasks to finish
00108         if waitForTasks:
00109             while self.__tasks != []:
00110                 sleep(0.1)
00111 
00112         # Tell all the threads to quit
00113         self.__resizeLock.acquire()
00114         try:
00115             # Wait until all threads have exited
00116             if waitForThreads:
00117                 for t in self.__threads:
00118                     t.goAway()
00119                 for t in self.__threads:
00120                     t.join()
00121                     # print t,"joined"
00122                     del t
00123             self.__setThreadCountNolock(0)
00124             self.__isJoining = True
00125 
00126             # Reset the pool for potential reuse
00127             self.__isJoining = False
00128         finally:
00129             self.__resizeLock.release()
        

Here is the call graph for this function:

def arcom.threadpool.ThreadPool.queueTask (   self,
  task,
  args = None,
  taskCallback = None 
)
Insert a task into the queue.  task must be callable;
args and taskCallback can be None.

Definition at line 67 of file threadpool.py.

00067 
00068     def queueTask(self, task, args=None, taskCallback=None):
00069 
00070         """Insert a task into the queue.  task must be callable;
00071         args and taskCallback can be None."""
00072         
00073         if self.__isJoining == True:
00074             return False
00075         if not callable(task):
00076             return False
00077         
00078         self.__taskLock.acquire()
00079         try:
00080             self.__tasks.append((task, args, taskCallback))
00081             return True
00082         finally:
00083             self.__taskLock.release()

def arcom.threadpool.ThreadPool.setThreadCount (   self,
  newNumThreads 
)
External method to set the current pool size.  Acquires
the resizing lock, then calls the internal version to do real
work.

Definition at line 24 of file threadpool.py.

00024 
00025     def setThreadCount(self, newNumThreads):
00026 
00027         """ External method to set the current pool size.  Acquires
00028         the resizing lock, then calls the internal version to do real
00029         work."""
00030         
00031         # Can't change the thread count if we're shutting down the pool!
00032         if self.__isJoining:
00033             return False
00034         
00035         self.__resizeLock.acquire()
00036         try:
00037             self.__setThreadCountNolock(newNumThreads)
00038         finally:
00039             self.__resizeLock.release()
00040         return True

Here is the call graph for this function:


Member Data Documentation

Definition at line 21 of file threadpool.py.

Definition at line 18 of file threadpool.py.

Definition at line 19 of file threadpool.py.

Definition at line 20 of file threadpool.py.

Definition at line 17 of file threadpool.py.


The documentation for this class was generated from the following file: