Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions | Static Private Member Functions | Private Attributes
multiprocessing.pool.Pool Class Reference
Inheritance diagram for multiprocessing.pool.Pool:
Inheritance graph
[legend]
Collaboration diagram for multiprocessing.pool.Pool:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def __init__
def apply
def map
def imap
def imap_unordered
def apply_async
def map_async
def __reduce__
def close
def terminate
def join

Public Attributes

_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Static Public Attributes

 Process = Process

Private Member Functions

def _join_exited_workers
def _repopulate_pool
def _maintain_pool
def _setup_queues
def _terminate_pool

Static Private Member Functions

def _handle_workers
def _handle_tasks
def _handle_results
def _get_tasks
def _help_stuff_finish

Private Attributes

 _taskqueue
 _cache
 _state
 _maxtasksperchild
 _initializer
 _initargs
 _processes
 _pool
 _worker_handler
 _task_handler
 _result_handler
 _terminate
 _inqueue
 _outqueue
 _quick_put
 _quick_get

Detailed Description

Class which supports an async version of applying functions to arguments.

Definition at line 130 of file pool.py.


Constructor & Destructor Documentation

def multiprocessing.pool.Pool.__init__ (   self,
  processes = None,
  initializer = None,
  initargs = (),
  maxtasksperchild = None 
)

Definition at line 137 of file pool.py.

00137 
00138                  maxtasksperchild=None):
00139         self._setup_queues()
00140         self._taskqueue = queue.Queue()
00141         self._cache = {}
00142         self._state = RUN
00143         self._maxtasksperchild = maxtasksperchild
00144         self._initializer = initializer
00145         self._initargs = initargs
00146 
00147         if processes is None:
00148             try:
00149                 processes = cpu_count()
00150             except NotImplementedError:
00151                 processes = 1
00152         if processes < 1:
00153             raise ValueError("Number of processes must be at least 1")
00154 
00155         if initializer is not None and not hasattr(initializer, '__call__'):
00156             raise TypeError('initializer must be a callable')
00157 
00158         self._processes = processes
00159         self._pool = []
00160         self._repopulate_pool()
00161 
00162         self._worker_handler = threading.Thread(
00163             target=Pool._handle_workers,
00164             args=(self, )
00165             )
00166         self._worker_handler.daemon = True
00167         self._worker_handler._state = RUN
00168         self._worker_handler.start()
00169 
00170 
00171         self._task_handler = threading.Thread(
00172             target=Pool._handle_tasks,
00173             args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
00174             )
00175         self._task_handler.daemon = True
00176         self._task_handler._state = RUN
00177         self._task_handler.start()
00178 
00179         self._result_handler = threading.Thread(
00180             target=Pool._handle_results,
00181             args=(self._outqueue, self._quick_get, self._cache)
00182             )
00183         self._result_handler.daemon = True
00184         self._result_handler._state = RUN
00185         self._result_handler.start()
00186 
00187         self._terminate = Finalize(
00188             self, self._terminate_pool,
00189             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
00190                   self._worker_handler, self._task_handler,
00191                   self._result_handler, self._cache),
00192             exitpriority=15
00193             )

Here is the call graph for this function:

Here is the caller graph for this function:


Member Function Documentation

Definition at line 437 of file pool.py.

00437 
00438     def __reduce__(self):
00439         raise NotImplementedError(
00440               'pool objects cannot be passed between processes or pickled'
00441               )

def multiprocessing.pool.Pool._get_tasks (   func,
  it,
  size 
) [static, private]

Definition at line 429 of file pool.py.

00429 
00430     def _get_tasks(func, it, size):
00431         it = iter(it)
00432         while 1:
00433             x = tuple(itertools.islice(it, size))
00434             if not x:
00435                 return
00436             yield (func, x)

def multiprocessing.pool.Pool._handle_results (   outqueue,
  get,
  cache 
) [static, private]

Definition at line 371 of file pool.py.

00371 
00372     def _handle_results(outqueue, get, cache):
00373         thread = threading.current_thread()
00374 
00375         while 1:
00376             try:
00377                 task = get()
00378             except (IOError, EOFError):
00379                 debug('result handler got EOFError/IOError -- exiting')
00380                 return
00381 
00382             if thread._state:
00383                 assert thread._state == TERMINATE
00384                 debug('result handler found thread._state=TERMINATE')
00385                 break
00386 
00387             if task is None:
00388                 debug('result handler got sentinel')
00389                 break
00390 
00391             job, i, obj = task
00392             try:
00393                 cache[job]._set(i, obj)
00394             except KeyError:
00395                 pass
00396 
00397         while cache and thread._state != TERMINATE:
00398             try:
00399                 task = get()
00400             except (IOError, EOFError):
00401                 debug('result handler got EOFError/IOError -- exiting')
00402                 return
00403 
00404             if task is None:
00405                 debug('result handler ignoring extra sentinel')
00406                 continue
00407             job, i, obj = task
00408             try:
00409                 cache[job]._set(i, obj)
00410             except KeyError:
00411                 pass
00412 
00413         if hasattr(outqueue, '_reader'):
00414             debug('ensuring that outqueue is not full')
00415             # If we don't make room available in outqueue then
00416             # attempts to add the sentinel (None) to outqueue may
00417             # block.  There is guaranteed to be no more than 2 sentinels.
00418             try:
00419                 for i in range(10):
00420                     if not outqueue._reader.poll():
00421                         break
00422                     get()
00423             except (IOError, EOFError):
00424                 pass
00425 
00426         debug('result handler exiting: len(cache)=%s, thread._state=%s',
00427               len(cache), thread._state)

Here is the call graph for this function:

def multiprocessing.pool.Pool._handle_tasks (   taskqueue,
  put,
  outqueue,
  pool 
) [static, private]

Definition at line 332 of file pool.py.

00332 
00333     def _handle_tasks(taskqueue, put, outqueue, pool):
00334         thread = threading.current_thread()
00335 
00336         for taskseq, set_length in iter(taskqueue.get, None):
00337             i = -1
00338             for i, task in enumerate(taskseq):
00339                 if thread._state:
00340                     debug('task handler found thread._state != RUN')
00341                     break
00342                 try:
00343                     put(task)
00344                 except IOError:
00345                     debug('could not put task on queue')
00346                     break
00347             else:
00348                 if set_length:
00349                     debug('doing set_length()')
00350                     set_length(i+1)
00351                 continue
00352             break
00353         else:
00354             debug('task handler got sentinel')
00355 
00356 
00357         try:
00358             # tell result handler to finish when cache is empty
00359             debug('task handler sending sentinel to result handler')
00360             outqueue.put(None)
00361 
00362             # tell workers there is no more work
00363             debug('task handler sending sentinel to workers')
00364             for p in pool:
00365                 put(None)
00366         except IOError:
00367             debug('task handler got IOError when sending sentinels')
00368 
00369         debug('task handler exiting')

Here is the call graph for this function:

def multiprocessing.pool.Pool._handle_workers (   pool) [static, private]

Definition at line 323 of file pool.py.

00323 
00324     def _handle_workers(pool):
00325         while pool._worker_handler._state == RUN and pool._state == RUN:
00326             pool._maintain_pool()
00327             time.sleep(0.1)
00328         # send sentinel to stop workers
00329         pool._taskqueue.put(None)
00330         debug('worker handler exiting')

def multiprocessing.pool.Pool._help_stuff_finish (   inqueue,
  task_handler,
  size 
) [static, private]

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 464 of file pool.py.

00464 
00465     def _help_stuff_finish(inqueue, task_handler, size):
00466         # task_handler may be blocked trying to put items on inqueue
00467         debug('removing tasks from inqueue until task handler finished')
00468         inqueue._rlock.acquire()
00469         while task_handler.is_alive() and inqueue._reader.poll():
00470             inqueue._reader.recv()
00471             time.sleep(0)

Here is the call graph for this function:

Cleanup after any worker processes which have exited due to reaching
their specified lifetime.  Returns True if any workers were cleaned up.

Definition at line 194 of file pool.py.

00194 
00195     def _join_exited_workers(self):
00196         """Cleanup after any worker processes which have exited due to reaching
00197         their specified lifetime.  Returns True if any workers were cleaned up.
00198         """
00199         cleaned = False
00200         for i in reversed(range(len(self._pool))):
00201             worker = self._pool[i]
00202             if worker.exitcode is not None:
00203                 # worker exited
00204                 debug('cleaning up worker %d' % i)
00205                 worker.join()
00206                 cleaned = True
00207                 del self._pool[i]
00208         return cleaned

Here is the caller graph for this function:

def multiprocessing.pool.Pool._maintain_pool (   self) [private]
Clean up any exited workers and start replacements for them.

Definition at line 225 of file pool.py.

00225 
00226     def _maintain_pool(self):
00227         """Clean up any exited workers and start replacements for them.
00228         """
00229         if self._join_exited_workers():
00230             self._repopulate_pool()

Here is the call graph for this function:

Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.

Definition at line 209 of file pool.py.

00209 
00210     def _repopulate_pool(self):
00211         """Bring the number of pool processes up to the specified number,
00212         for use after reaping workers which have exited.
00213         """
00214         for i in range(self._processes - len(self._pool)):
00215             w = self.Process(target=worker,
00216                              args=(self._inqueue, self._outqueue,
00217                                    self._initializer,
00218                                    self._initargs, self._maxtasksperchild)
00219                             )
00220             self._pool.append(w)
00221             w.name = w.name.replace('Process', 'PoolWorker')
00222             w.daemon = True
00223             w.start()
00224             debug('added worker')

Here is the caller graph for this function:

def multiprocessing.pool.Pool._setup_queues (   self) [private]

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 231 of file pool.py.

00231 
00232     def _setup_queues(self):
00233         from .queues import SimpleQueue
00234         self._inqueue = SimpleQueue()
00235         self._outqueue = SimpleQueue()
00236         self._quick_put = self._inqueue._writer.send
00237         self._quick_get = self._outqueue._reader.recv

Here is the caller graph for this function:

def multiprocessing.pool.Pool._terminate_pool (   cls,
  taskqueue,
  inqueue,
  outqueue,
  pool,
  worker_handler,
  task_handler,
  result_handler,
  cache 
) [private]

Definition at line 474 of file pool.py.

00474 
00475                         worker_handler, task_handler, result_handler, cache):
00476         # this is guaranteed to only be called once
00477         debug('finalizing pool')
00478 
00479         worker_handler._state = TERMINATE
00480         task_handler._state = TERMINATE
00481 
00482         debug('helping task handler/workers to finish')
00483         cls._help_stuff_finish(inqueue, task_handler, len(pool))
00484 
00485         assert result_handler.is_alive() or len(cache) == 0
00486 
00487         result_handler._state = TERMINATE
00488         outqueue.put(None)                  # sentinel
00489 
00490         # We must wait for the worker handler to exit before terminating
00491         # workers because we don't want workers to be restarted behind our back.
00492         debug('joining worker handler')
00493         worker_handler.join()
00494 
00495         # Terminate workers which haven't already finished.
00496         if pool and hasattr(pool[0], 'terminate'):
00497             debug('terminating workers')
00498             for p in pool:
00499                 if p.exitcode is None:
00500                     p.terminate()
00501 
00502         debug('joining task handler')
00503         task_handler.join()
00504 
00505         debug('joining result handler')
00506         result_handler.join()
00507 
00508         if pool and hasattr(pool[0], 'terminate'):
00509             debug('joining pool workers')
00510             for p in pool:
00511                 if p.is_alive():
00512                     # worker has not yet exited
00513                     debug('cleaning up worker %d' % p.pid)
00514                     p.join()
00515 
00516 #
00517 # Class whose instances are returned by `Pool.apply_async()`
00518 #

Here is the caller graph for this function:

def multiprocessing.pool.Pool.apply (   self,
  func,
  args = (),
  kwds = {} 
)
Equivalent of `func(*args, **kwds)`.

Definition at line 238 of file pool.py.

00238 
00239     def apply(self, func, args=(), kwds={}):
00240         '''
00241         Equivalent of `func(*args, **kwds)`.
00242         '''
00243         assert self._state == RUN
00244         return self.apply_async(func, args, kwds).get()

Here is the call graph for this function:

Here is the caller graph for this function:

def multiprocessing.pool.Pool.apply_async (   self,
  func,
  args = (),
  kwds = {},
  callback = None,
  error_callback = None 
)
Asynchronous version of `apply()` method.

Definition at line 290 of file pool.py.

00290 
00291             error_callback=None):
00292         '''
00293         Asynchronous version of `apply()` method.
00294         '''
00295         assert self._state == RUN
00296         result = ApplyResult(self._cache, callback, error_callback)
00297         self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
00298         return result

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 442 of file pool.py.

00442 
00443     def close(self):
00444         debug('closing pool')
00445         if self._state == RUN:
00446             self._state = CLOSE
00447             self._worker_handler._state = CLOSE

Here is the caller graph for this function:

def multiprocessing.pool.Pool.imap (   self,
  func,
  iterable,
  chunksize = 1 
)
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.

Definition at line 253 of file pool.py.

00253 
00254     def imap(self, func, iterable, chunksize=1):
00255         '''
00256         Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
00257         '''
00258         assert self._state == RUN
00259         if chunksize == 1:
00260             result = IMapIterator(self._cache)
00261             self._taskqueue.put((((result._job, i, func, (x,), {})
00262                          for i, x in enumerate(iterable)), result._set_length))
00263             return result
00264         else:
00265             assert chunksize > 1
00266             task_batches = Pool._get_tasks(func, iterable, chunksize)
00267             result = IMapIterator(self._cache)
00268             self._taskqueue.put((((result._job, i, mapstar, (x,), {})
00269                      for i, x in enumerate(task_batches)), result._set_length))
00270             return (item for chunk in result for item in chunk)

Here is the call graph for this function:

def multiprocessing.pool.Pool.imap_unordered (   self,
  func,
  iterable,
  chunksize = 1 
)
Like `imap()` method but ordering of results is arbitrary.

Definition at line 271 of file pool.py.

00271 
00272     def imap_unordered(self, func, iterable, chunksize=1):
00273         '''
00274         Like `imap()` method but ordering of results is arbitrary.
00275         '''
00276         assert self._state == RUN
00277         if chunksize == 1:
00278             result = IMapUnorderedIterator(self._cache)
00279             self._taskqueue.put((((result._job, i, func, (x,), {})
00280                          for i, x in enumerate(iterable)), result._set_length))
00281             return result
00282         else:
00283             assert chunksize > 1
00284             task_batches = Pool._get_tasks(func, iterable, chunksize)
00285             result = IMapUnorderedIterator(self._cache)
00286             self._taskqueue.put((((result._job, i, mapstar, (x,), {})
00287                      for i, x in enumerate(task_batches)), result._set_length))
00288             return (item for chunk in result for item in chunk)

Here is the call graph for this function:

Definition at line 454 of file pool.py.

00454 
00455     def join(self):
00456         debug('joining pool')
00457         assert self._state in (CLOSE, TERMINATE)
00458         self._worker_handler.join()
00459         self._task_handler.join()
00460         self._result_handler.join()
00461         for p in self._pool:
00462             p.join()

Here is the caller graph for this function:

def multiprocessing.pool.Pool.map (   self,
  func,
  iterable,
  chunksize = None 
)
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.

Definition at line 245 of file pool.py.

00245 
00246     def map(self, func, iterable, chunksize=None):
00247         '''
00248         Apply `func` to each element in `iterable`, collecting the results
00249         in a list that is returned.
00250         '''
00251         assert self._state == RUN
00252         return self.map_async(func, iterable, chunksize).get()

Here is the call graph for this function:

def multiprocessing.pool.Pool.map_async (   self,
  func,
  iterable,
  chunksize = None,
  callback = None,
  error_callback = None 
)
Asynchronous version of `map()` method.

Definition at line 300 of file pool.py.

00300 
00301             error_callback=None):
00302         '''
00303         Asynchronous version of `map()` method.
00304         '''
00305         assert self._state == RUN
00306         if not hasattr(iterable, '__len__'):
00307             iterable = list(iterable)
00308 
00309         if chunksize is None:
00310             chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
00311             if extra:
00312                 chunksize += 1
00313         if len(iterable) == 0:
00314             chunksize = 0
00315 
00316         task_batches = Pool._get_tasks(func, iterable, chunksize)
00317         result = MapResult(self._cache, chunksize, len(iterable), callback,
00318                            error_callback=error_callback)
00319         self._taskqueue.put((((result._job, i, mapstar, (x,), {})
00320                               for i, x in enumerate(task_batches)), None))
00321         return result

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 448 of file pool.py.

00448 
00449     def terminate(self):
00450         debug('terminating pool')
00451         self._state = TERMINATE
00452         self._worker_handler._state = TERMINATE
00453         self._terminate()

Here is the caller graph for this function:


Member Data Documentation

Definition at line 140 of file pool.py.

Definition at line 144 of file pool.py.

Definition at line 143 of file pool.py.

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 233 of file pool.py.

Definition at line 142 of file pool.py.

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 234 of file pool.py.

Definition at line 158 of file pool.py.

Definition at line 157 of file pool.py.

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 236 of file pool.py.

Reimplemented in multiprocessing.pool.ThreadPool.

Definition at line 235 of file pool.py.

Definition at line 178 of file pool.py.

Definition at line 141 of file pool.py.

Definition at line 170 of file pool.py.

Definition at line 139 of file pool.py.

Definition at line 186 of file pool.py.

Definition at line 161 of file pool.py.

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.

Definition at line 134 of file pool.py.


The documentation for this class was generated from the following file: