Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Private Member Functions | Static Private Member Functions | Private Attributes
multiprocessing.queues.Queue Class Reference
Inheritance diagram for multiprocessing.queues.Queue:
Inheritance graph
[legend]
Collaboration diagram for multiprocessing.queues.Queue:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def __init__
def __getstate__
def __setstate__
def put
def get
def qsize
def empty
def full
def get_nowait
def put_nowait
def close
def join_thread
def cancel_join_thread

Public Attributes

_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Private Member Functions

def _after_fork
def _start_thread

Static Private Member Functions

def _finalize_join
def _finalize_close
def _feed

Private Attributes

 _maxsize
 _writer
 _rlock
 _opid
 _wlock
 _sem
 _notempty
 _buffer
 _thread
 _jointhread
 _joincancelled
 _closed
 _close
 _send
 _recv
 _poll

Detailed Description

Definition at line 56 of file queues.py.


Constructor & Destructor Documentation

def multiprocessing.queues.Queue.__init__ (   self,
  maxsize = 0 
)

Reimplemented in multiprocessing.queues.JoinableQueue.

Definition at line 58 of file queues.py.

00058 
00059     def __init__(self, maxsize=0):
00060         if maxsize <= 0:
00061             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
00062         self._maxsize = maxsize
00063         self._reader, self._writer = Pipe(duplex=False)
00064         self._rlock = Lock()
00065         self._opid = os.getpid()
00066         if sys.platform == 'win32':
00067             self._wlock = None
00068         else:
00069             self._wlock = Lock()
00070         self._sem = BoundedSemaphore(maxsize)
00071 
00072         self._after_fork()
00073 
00074         if sys.platform != 'win32':
00075             register_after_fork(self, Queue._after_fork)

Here is the caller graph for this function:


Member Function Documentation

Reimplemented in multiprocessing.queues.JoinableQueue.

Definition at line 76 of file queues.py.

00076 
00077     def __getstate__(self):
00078         assert_spawning(self)
00079         return (self._maxsize, self._reader, self._writer,
00080                 self._rlock, self._wlock, self._sem, self._opid)

Here is the call graph for this function:

def multiprocessing.queues.Queue.__setstate__ (   self,
  state 
)

Reimplemented in multiprocessing.queues.JoinableQueue.

Definition at line 81 of file queues.py.

00081 
00082     def __setstate__(self, state):
00083         (self._maxsize, self._reader, self._writer,
00084          self._rlock, self._wlock, self._sem, self._opid) = state
00085         self._after_fork()

Here is the call graph for this function:

def multiprocessing.queues.Queue._after_fork (   self) [private]

Definition at line 86 of file queues.py.

00086 
00087     def _after_fork(self):
00088         debug('Queue._after_fork()')
00089         self._notempty = threading.Condition(threading.Lock())
00090         self._buffer = collections.deque()
00091         self._thread = None
00092         self._jointhread = None
00093         self._joincancelled = False
00094         self._closed = False
00095         self._close = None
00096         self._send = self._writer.send
00097         self._recv = self._reader.recv
00098         self._poll = self._reader.poll

Here is the caller graph for this function:

def multiprocessing.queues.Queue._feed (   buffer,
  notempty,
  send,
  writelock,
  close 
) [static, private]

Definition at line 232 of file queues.py.

00232 
00233     def _feed(buffer, notempty, send, writelock, close):
00234         debug('starting thread to feed data to pipe')
00235         from .util import is_exiting
00236 
00237         nacquire = notempty.acquire
00238         nrelease = notempty.release
00239         nwait = notempty.wait
00240         bpopleft = buffer.popleft
00241         sentinel = _sentinel
00242         if sys.platform != 'win32':
00243             wacquire = writelock.acquire
00244             wrelease = writelock.release
00245         else:
00246             wacquire = None
00247 
00248         try:
00249             while 1:
00250                 nacquire()
00251                 try:
00252                     if not buffer:
00253                         nwait()
00254                 finally:
00255                     nrelease()
00256                 try:
00257                     while 1:
00258                         obj = bpopleft()
00259                         if obj is sentinel:
00260                             debug('feeder thread got sentinel -- exiting')
00261                             close()
00262                             return
00263 
00264                         if wacquire is None:
00265                             send(obj)
00266                         else:
00267                             wacquire()
00268                             try:
00269                                 send(obj)
00270                             finally:
00271                                 wrelease()
00272                 except IndexError:
00273                     pass
00274         except Exception as e:
00275             # Since this runs in a daemon thread the resources it uses
00276             # may be become unusable while the process is cleaning up.
00277             # We ignore errors which happen after the process has
00278             # started to cleanup.
00279             try:
00280                 if is_exiting():
00281                     info('error in queue thread: %s', e)
00282                 else:
00283                     import traceback
00284                     traceback.print_exc()
00285             except Exception:
00286                 pass

Here is the call graph for this function:

def multiprocessing.queues.Queue._finalize_close (   buffer,
  notempty 
) [static, private]

Definition at line 222 of file queues.py.

00222 
00223     def _finalize_close(buffer, notempty):
00224         debug('telling queue thread to quit')
00225         notempty.acquire()
00226         try:
00227             buffer.append(_sentinel)
00228             notempty.notify()
00229         finally:
00230             notempty.release()

def multiprocessing.queues.Queue._finalize_join (   twr) [static, private]

Definition at line 212 of file queues.py.

00212 
00213     def _finalize_join(twr):
00214         debug('joining queue thread')
00215         thread = twr()
00216         if thread is not None:
00217             thread.join()
00218             debug('... queue thread joined')
00219         else:
00220             debug('... queue thread already dead')

Definition at line 173 of file queues.py.

00173 
00174     def _start_thread(self):
00175         debug('Queue._start_thread()')
00176 
00177         # Start thread which transfers data from buffer to pipe
00178         self._buffer.clear()
00179         self._thread = threading.Thread(
00180             target=Queue._feed,
00181             args=(self._buffer, self._notempty, self._send,
00182                   self._wlock, self._writer.close),
00183             name='QueueFeederThread'
00184             )
00185         self._thread.daemon = True
00186 
00187         debug('doing self._thread.start()')
00188         self._thread.start()
00189         debug('... done self._thread.start()')
00190 
00191         # On process exit we will wait for data to be flushed to pipe.
00192         #
00193         # However, if this process created the queue then all
00194         # processes which use the queue will be descendants of this
00195         # process.  Therefore waiting for the queue to be flushed
00196         # is pointless once all the child processes have been joined.
00197         created_by_this_process = (self._opid == os.getpid())
00198         if not self._joincancelled and not created_by_this_process:
00199             self._jointhread = Finalize(
00200                 self._thread, Queue._finalize_join,
00201                 [weakref.ref(self._thread)],
00202                 exitpriority=-5
00203                 )
00204 
00205         # Send sentinel to the thread queue object when garbage collected
00206         self._close = Finalize(
00207             self, Queue._finalize_close,
00208             [self._buffer, self._notempty],
00209             exitpriority=10
00210             )

Here is the caller graph for this function:

Definition at line 165 of file queues.py.

00165 
00166     def cancel_join_thread(self):
00167         debug('Queue.cancel_join_thread()')
00168         self._joincancelled = True
00169         try:
00170             self._jointhread.cancel()
00171         except AttributeError:
00172             pass

Definition at line 153 of file queues.py.

00153 
00154     def close(self):
00155         self._closed = True
00156         self._reader.close()
00157         if self._close:
00158             self._close()

Here is the caller graph for this function:

Definition at line 141 of file queues.py.

00141 
00142     def empty(self):
00143         return not self._poll()

Definition at line 144 of file queues.py.

00144 
00145     def full(self):
00146         return self._sem._semlock._is_zero()

def multiprocessing.queues.Queue.get (   self,
  block = True,
  timeout = None 
)

Definition at line 113 of file queues.py.

00113 
00114     def get(self, block=True, timeout=None):
00115         if block and timeout is None:
00116             self._rlock.acquire()
00117             try:
00118                 res = self._recv()
00119                 self._sem.release()
00120                 return res
00121             finally:
00122                 self._rlock.release()
00123 
00124         else:
00125             if block:
00126                 deadline = time.time() + timeout
00127             if not self._rlock.acquire(block, timeout):
00128                 raise Empty
00129             try:
00130                 if not self._poll(block and (deadline-time.time()) or 0.0):
00131                     raise Empty
00132                 res = self._recv()
00133                 self._sem.release()
00134                 return res
00135             finally:
00136                 self._rlock.release()

Here is the caller graph for this function:

Definition at line 147 of file queues.py.

00147 
00148     def get_nowait(self):
00149         return self.get(False)

Here is the call graph for this function:

Definition at line 159 of file queues.py.

00159 
00160     def join_thread(self):
00161         debug('Queue.join_thread()')
00162         assert self._closed
00163         if self._jointhread:
00164             self._jointhread()

def multiprocessing.queues.Queue.put (   self,
  obj,
  block = True,
  timeout = None 
)

Reimplemented in multiprocessing.queues.JoinableQueue.

Definition at line 99 of file queues.py.

00099 
00100     def put(self, obj, block=True, timeout=None):
00101         assert not self._closed
00102         if not self._sem.acquire(block, timeout):
00103             raise Full
00104 
00105         self._notempty.acquire()
00106         try:
00107             if self._thread is None:
00108                 self._start_thread()
00109             self._buffer.append(obj)
00110             self._notempty.notify()
00111         finally:
00112             self._notempty.release()

Here is the call graph for this function:

Here is the caller graph for this function:

def multiprocessing.queues.Queue.put_nowait (   self,
  obj 
)

Definition at line 150 of file queues.py.

00150 
00151     def put_nowait(self, obj):
00152         return self.put(obj, False)

Here is the call graph for this function:

Definition at line 137 of file queues.py.

00137 
00138     def qsize(self):
00139         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
00140         return self._maxsize - self._sem._semlock._get_value()


Member Data Documentation

Definition at line 89 of file queues.py.

Definition at line 94 of file queues.py.

Definition at line 93 of file queues.py.

Definition at line 92 of file queues.py.

Definition at line 91 of file queues.py.

Definition at line 61 of file queues.py.

Definition at line 88 of file queues.py.

Definition at line 64 of file queues.py.

Definition at line 97 of file queues.py.

Definition at line 96 of file queues.py.

Definition at line 63 of file queues.py.

Definition at line 69 of file queues.py.

Definition at line 95 of file queues.py.

Definition at line 90 of file queues.py.

Definition at line 66 of file queues.py.

Definition at line 62 of file queues.py.

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.


The documentation for this class was generated from the following file: