Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes
concurrent.futures.process.ProcessPoolExecutor Class Reference
Inheritance diagram for concurrent.futures.process.ProcessPoolExecutor:
Inheritance graph
[legend]
Collaboration diagram for concurrent.futures.process.ProcessPoolExecutor:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def __init__
def submit
def shutdown
def map
def __enter__
def __exit__

Public Attributes

_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Private Member Functions

def _start_queue_management_thread
def _adjust_process_count

Private Attributes

 _max_workers
 _call_queue
 _result_queue
 _work_ids
 _queue_management_thread
 _processes
 _shutdown_thread
 _shutdown_lock
 _queue_count
 _pending_work_items

Detailed Description

Definition at line 267 of file process.py.


Constructor & Destructor Documentation

def concurrent.futures.process.ProcessPoolExecutor.__init__ (   self,
  max_workers = None 
)
Initializes a new ProcessPoolExecutor instance.

Args:
    max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.

Definition at line 268 of file process.py.

00268 
00269     def __init__(self, max_workers=None):
00270         """Initializes a new ProcessPoolExecutor instance.
00271 
00272         Args:
00273             max_workers: The maximum number of processes that can be used to
00274                 execute the given calls. If None or not given then as many
00275                 worker processes will be created as the machine has processors.
00276         """
00277         _check_system_limits()
00278 
00279         if max_workers is None:
00280             self._max_workers = multiprocessing.cpu_count()
00281         else:
00282             self._max_workers = max_workers
00283 
00284         # Make the call queue slightly larger than the number of processes to
00285         # prevent the worker processes from idling. But don't make it too big
00286         # because futures in the call queue cannot be cancelled.
00287         self._call_queue = multiprocessing.Queue(self._max_workers +
00288                                                  EXTRA_QUEUED_CALLS)
00289         self._result_queue = multiprocessing.Queue()
00290         self._work_ids = queue.Queue()
00291         self._queue_management_thread = None
00292         self._processes = set()
00293 
00294         # Shutdown is a two-step process.
00295         self._shutdown_thread = False
00296         self._shutdown_lock = threading.Lock()
00297         self._queue_count = 0
00298         self._pending_work_items = {}

Here is the call graph for this function:

Here is the caller graph for this function:


Member Function Documentation

def concurrent.futures._base.Executor.__enter__ (   self) [inherited]

Definition at line 562 of file _base.py.

00562 
00563     def __enter__(self):
00564         return self

def concurrent.futures._base.Executor.__exit__ (   self,
  exc_type,
  exc_val,
  exc_tb 
) [inherited]

Definition at line 565 of file _base.py.

00565 
00566     def __exit__(self, exc_type, exc_val, exc_tb):
00567         self.shutdown(wait=True)
00568         return False

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 317 of file process.py.

00317 
00318     def _adjust_process_count(self):
00319         for _ in range(len(self._processes), self._max_workers):
00320             p = multiprocessing.Process(
00321                     target=_process_worker,
00322                     args=(self._call_queue,
00323                           self._result_queue))
00324             p.start()
00325             self._processes.add(p)

Here is the caller graph for this function:

Definition at line 299 of file process.py.

00299 
00300     def _start_queue_management_thread(self):
00301         # When the executor gets lost, the weakref callback will wake up
00302         # the queue management thread.
00303         def weakref_cb(_, q=self._result_queue):
00304             q.put(None)
00305         if self._queue_management_thread is None:
00306             self._queue_management_thread = threading.Thread(
00307                     target=_queue_management_worker,
00308                     args=(weakref.ref(self, weakref_cb),
00309                           self._processes,
00310                           self._pending_work_items,
00311                           self._work_ids,
00312                           self._call_queue,
00313                           self._result_queue))
00314             self._queue_management_thread.daemon = True
00315             self._queue_management_thread.start()
00316             _threads_queues[self._queue_management_thread] = self._result_queue

Here is the caller graph for this function:

def concurrent.futures._base.Executor.map (   self,
  fn,
  iterables,
  timeout = None 
) [inherited]
Returns a iterator equivalent to map(fn, iter).

Args:
    fn: A callable that will take take as many arguments as there are
passed iterables.
    timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.

Returns:
    An iterator equivalent to: map(func, *iterables) but the calls may
    be evaluated out-of-order.

Raises:
    TimeoutError: If the entire result iterator could not be generated
before the given timeout.
    Exception: If fn(*args) raises for any values.

Definition at line 516 of file _base.py.

00516 
00517     def map(self, fn, *iterables, timeout=None):
00518         """Returns a iterator equivalent to map(fn, iter).
00519 
00520         Args:
00521             fn: A callable that will take take as many arguments as there are
00522                 passed iterables.
00523             timeout: The maximum number of seconds to wait. If None, then there
00524                 is no limit on the wait time.
00525 
00526         Returns:
00527             An iterator equivalent to: map(func, *iterables) but the calls may
00528             be evaluated out-of-order.
00529 
00530         Raises:
00531             TimeoutError: If the entire result iterator could not be generated
00532                 before the given timeout.
00533             Exception: If fn(*args) raises for any values.
00534         """
00535         if timeout is not None:
00536             end_time = timeout + time.time()
00537 
00538         fs = [self.submit(fn, *args) for args in zip(*iterables)]
00539 
00540         try:
00541             for future in fs:
00542                 if timeout is None:
00543                     yield future.result()
00544                 else:
00545                     yield future.result(end_time - time.time())
00546         finally:
00547             for future in fs:
00548                 future.cancel()

Here is the call graph for this function:

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
    wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.

Reimplemented from concurrent.futures._base.Executor.

Definition at line 345 of file process.py.

00345 
00346     def shutdown(self, wait=True):
00347         with self._shutdown_lock:
00348             self._shutdown_thread = True
00349         if self._queue_management_thread:
00350             # Wake up queue management thread
00351             self._result_queue.put(None)
00352             if wait:
00353                 self._queue_management_thread.join()
00354         # To reduce the risk of openning too many files, remove references to
00355         # objects that use file descriptors.
00356         self._queue_management_thread = None
00357         self._call_queue = None
00358         self._result_queue = None
        self._processes = None
def concurrent.futures.process.ProcessPoolExecutor.submit (   self,
  fn,
  args,
  kwargs 
)
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
    A Future representing the given call.

Reimplemented from concurrent.futures._base.Executor.

Definition at line 326 of file process.py.

00326 
00327     def submit(self, fn, *args, **kwargs):
00328         with self._shutdown_lock:
00329             if self._shutdown_thread:
00330                 raise RuntimeError('cannot schedule new futures after shutdown')
00331 
00332             f = _base.Future()
00333             w = _WorkItem(f, fn, args, kwargs)
00334 
00335             self._pending_work_items[self._queue_count] = w
00336             self._work_ids.put(self._queue_count)
00337             self._queue_count += 1
00338             # Wake up queue management thread
00339             self._result_queue.put(None)
00340 
00341             self._start_queue_management_thread()
00342             self._adjust_process_count()
            return f

Here is the call graph for this function:


Member Data Documentation

Definition at line 286 of file process.py.

Definition at line 279 of file process.py.

Definition at line 297 of file process.py.

Definition at line 291 of file process.py.

Definition at line 296 of file process.py.

Definition at line 290 of file process.py.

Definition at line 288 of file process.py.

Definition at line 295 of file process.py.

Definition at line 294 of file process.py.

Definition at line 289 of file process.py.

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.


The documentation for this class was generated from the following file: