Back to index

python3.2  3.2.2
Classes | Functions | Variables
concurrent.futures.process Namespace Reference

Classes

class  _WorkItem
class  _ResultItem
class  _CallItem
class  ProcessPoolExecutor

Functions

def _python_exit
def _process_worker
def _add_call_item_to_queue
def _queue_management_worker
def _check_system_limits

Variables

string __author__ = 'Brian Quinlan (brian@sweetapp.com)'
tuple _threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
int EXTRA_QUEUED_CALLS = 1
 _system_limits_checked = False
 _system_limited = None

Function Documentation

def concurrent.futures.process._add_call_item_to_queue (   pending_work_items,
  work_ids,
  call_queue 
) [private]
Fills call_queue with _WorkItems from pending_work_items.

This function never blocks.

Args:
    pending_work_items: A dict mapping work ids to _WorkItems e.g.
        {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
    work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
        are consumed and the corresponding _WorkItems from
        pending_work_items are transformed into _CallItems and put in
        call_queue.
    call_queue: A multiprocessing.Queue that will be filled with _CallItems
        derived from _WorkItems.

Definition at line 137 of file process.py.

00137 
00138                             call_queue):
00139     """Fills call_queue with _WorkItems from pending_work_items.
00140 
00141     This function never blocks.
00142 
00143     Args:
00144         pending_work_items: A dict mapping work ids to _WorkItems e.g.
00145             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
00146         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
00147             are consumed and the corresponding _WorkItems from
00148             pending_work_items are transformed into _CallItems and put in
00149             call_queue.
00150         call_queue: A multiprocessing.Queue that will be filled with _CallItems
00151             derived from _WorkItems.
00152     """
00153     while True:
00154         if call_queue.full():
00155             return
00156         try:
00157             work_id = work_ids.get(block=False)
00158         except queue.Empty:
00159             return
00160         else:
00161             work_item = pending_work_items[work_id]
00162 
00163             if work_item.future.set_running_or_notify_cancel():
00164                 call_queue.put(_CallItem(work_id,
00165                                          work_item.fn,
00166                                          work_item.args,
00167                                          work_item.kwargs),
00168                                block=True)
00169             else:
00170                 del pending_work_items[work_id]
00171                 continue

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 244 of file process.py.

00244 
00245 def _check_system_limits():
00246     global _system_limits_checked, _system_limited
00247     if _system_limits_checked:
00248         if _system_limited:
00249             raise NotImplementedError(_system_limited)
00250     _system_limits_checked = True
00251     try:
00252         import os
00253         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
00254     except (AttributeError, ValueError):
00255         # sysconf not available or setting not available
00256         return
00257     if nsems_max == -1:
00258         # indetermine limit, assume that limit is determined
00259         # by available memory only
00260         return
00261     if nsems_max >= 256:
00262         # minimum number of semaphores available
00263         # according to POSIX
00264         return
00265     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
00266     raise NotImplementedError(_system_limited)

Here is the caller graph for this function:

def concurrent.futures.process._process_worker (   call_queue,
  result_queue 
) [private]
Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.

Args:
    call_queue: A multiprocessing.Queue of _CallItems that will be read and
        evaluated by the worker.
    result_queue: A multiprocessing.Queue of _ResultItems that will written
        to by the worker.
    shutdown: A multiprocessing.Event that will be set as a signal to the
        worker that it should exit when call_queue is empty.

Definition at line 107 of file process.py.

00107 
00108 def _process_worker(call_queue, result_queue):
00109     """Evaluates calls from call_queue and places the results in result_queue.
00110 
00111     This worker is run in a separate process.
00112 
00113     Args:
00114         call_queue: A multiprocessing.Queue of _CallItems that will be read and
00115             evaluated by the worker.
00116         result_queue: A multiprocessing.Queue of _ResultItems that will written
00117             to by the worker.
00118         shutdown: A multiprocessing.Event that will be set as a signal to the
00119             worker that it should exit when call_queue is empty.
00120     """
00121     while True:
00122         call_item = call_queue.get(block=True)
00123         if call_item is None:
00124             # Wake up queue management thread
00125             result_queue.put(None)
00126             return
00127         try:
00128             r = call_item.fn(*call_item.args, **call_item.kwargs)
00129         except BaseException as e:
00130             result_queue.put(_ResultItem(call_item.work_id,
00131                                          exception=e))
00132         else:
00133             result_queue.put(_ResultItem(call_item.work_id,
00134                                          result=r))

Here is the call graph for this function:

Definition at line 72 of file process.py.

00072 
00073 def _python_exit():
00074     global _shutdown
00075     _shutdown = True
00076     items = list(_threads_queues.items())
00077     for t, q in items:
00078         q.put(None)
00079     for t, q in items:
00080         t.join()
00081 
00082 # Controls how many more calls than processes will be queued in the call queue.
00083 # A smaller number will mean that processes spend more time idle waiting for
00084 # work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
def concurrent.futures.process._queue_management_worker (   executor_reference,
  processes,
  pending_work_items,
  work_ids_queue,
  call_queue,
  result_queue 
) [private]
Manages the communication between this process and the worker processes.

This function is run in a local thread.

Args:
    executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
        this thread. Used to determine if the ProcessPoolExecutor has been
        garbage collected and that this function can exit.
    process: A list of the multiprocessing.Process instances used as
        workers.
    pending_work_items: A dict mapping work ids to _WorkItems e.g.
        {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
    work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
    call_queue: A multiprocessing.Queue that will be filled with _CallItems
        derived from _WorkItems for processing by the process workers.
    result_queue: A multiprocessing.Queue of _ResultItems generated by the
        process workers.

Definition at line 177 of file process.py.

00177 
00178                              result_queue):
00179     """Manages the communication between this process and the worker processes.
00180 
00181     This function is run in a local thread.
00182 
00183     Args:
00184         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
00185             this thread. Used to determine if the ProcessPoolExecutor has been
00186             garbage collected and that this function can exit.
00187         process: A list of the multiprocessing.Process instances used as
00188             workers.
00189         pending_work_items: A dict mapping work ids to _WorkItems e.g.
00190             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
00191         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
00192         call_queue: A multiprocessing.Queue that will be filled with _CallItems
00193             derived from _WorkItems for processing by the process workers.
00194         result_queue: A multiprocessing.Queue of _ResultItems generated by the
00195             process workers.
00196     """
00197     nb_shutdown_processes = 0
00198     def shutdown_one_process():
00199         """Tell a worker to terminate, which will in turn wake us again"""
00200         nonlocal nb_shutdown_processes
00201         call_queue.put(None)
00202         nb_shutdown_processes += 1
00203     while True:
00204         _add_call_item_to_queue(pending_work_items,
00205                                 work_ids_queue,
00206                                 call_queue)
00207 
00208         result_item = result_queue.get(block=True)
00209         if result_item is not None:
00210             work_item = pending_work_items[result_item.work_id]
00211             del pending_work_items[result_item.work_id]
00212 
00213             if result_item.exception:
00214                 work_item.future.set_exception(result_item.exception)
00215             else:
00216                 work_item.future.set_result(result_item.result)
00217             continue
00218         # If we come here, we either got a timeout or were explicitly woken up.
00219         # In either case, check whether we should start shutting down.
00220         executor = executor_reference()
00221         # No more work items can be added if:
00222         #   - The interpreter is shutting down OR
00223         #   - The executor that owns this worker has been collected OR
00224         #   - The executor that owns this worker has been shutdown.
00225         if _shutdown or executor is None or executor._shutdown_thread:
00226             # Since no new work items can be added, it is safe to shutdown
00227             # this thread if there are no pending work items.
00228             if not pending_work_items:
00229                 while nb_shutdown_processes < len(processes):
00230                     shutdown_one_process()
00231                 # If .join() is not called on the created processes then
00232                 # some multiprocessing.Queue methods may deadlock on Mac OS
00233                 # X.
00234                 for p in processes:
00235                     p.join()
00236                 call_queue.close()
00237                 return
00238             else:
00239                 # Start shutting down by telling a process it can exit.
00240                 shutdown_one_process()
00241         del executor

Here is the call graph for this function:

Here is the caller graph for this function:


Variable Documentation

string concurrent.futures.process.__author__ = 'Brian Quinlan (brian@sweetapp.com)'

Definition at line 46 of file process.py.

Definition at line 70 of file process.py.

Definition at line 243 of file process.py.

Definition at line 242 of file process.py.

Definition at line 69 of file process.py.

Definition at line 85 of file process.py.