Back to index
|struct _typeobject *||ob_type|
Barrier. Useful for synchronizing a fixed number of threads at known synchronization points. Threads block on 'wait()' and are simultaneously once they have all made that call.
Create a barrier, initialised to 'parties' threads. 'action' is a callable which, when supplied, will be called by one of the threads after they have all entered the barrier and just prior to releasing them all. If a 'timeout' is provided, it is uses as the default for all subsequent 'wait()' calls.
00445 00446 def __init__(self, parties, action=None, timeout=None, verbose=None): 00447 """ 00448 Create a barrier, initialised to 'parties' threads. 00449 'action' is a callable which, when supplied, will be called 00450 by one of the threads after they have all entered the 00451 barrier and just prior to releasing them all. 00452 If a 'timeout' is provided, it is uses as the default for 00453 all subsequent 'wait()' calls. 00454 """ 00455 _Verbose.__init__(self, verbose) 00456 self._cond = Condition(Lock()) 00457 self._action = action 00458 self._timeout = timeout 00459 self._parties = parties 00460 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 00461 self._count = 0
Place the barrier into a 'broken' state. Useful in case of error. Any currently waiting threads and threads attempting to 'wait()' will have BrokenBarrierError raised.
Return the number of threads that are currently waiting at the barrier.
00577 00578 def n_waiting(self): 00579 """ 00580 Return the number of threads that are currently waiting at the barrier. 00581 """ 00582 # We don't need synchronization here since this is an ephemeral result 00583 # anyway. It returns the correct value in the steady state. 00584 if self._state == 0: 00585 return self._count 00586 return 0
Reset the barrier to the initial state. Any threads currently waiting will get the BrokenBarrier exception raised.
00534 00535 def reset(self): 00536 """ 00537 Reset the barrier to the initial state. 00538 Any threads currently waiting will get the BrokenBarrier exception 00539 raised. 00540 """ 00541 with self._cond: 00542 if self._count > 0: 00543 if self._state == 0: 00544 #reset the barrier, waking up threads 00545 self._state = -1 00546 elif self._state == -2: 00547 #was broken, set it to reset state 00548 #which clears when the last thread exits 00549 self._state = -1 00550 else: 00551 self._state = 0 00552 self._cond.notify_all()
Wait for the barrier. When the specified number of threads have started waiting, they are all simultaneously awoken. If an 'action' was provided for the barrier, one of the threads will have executed that callback prior to returning. Returns an individual index number from 0 to 'parties-1'.
00462 00463 def wait(self, timeout=None): 00464 """ 00465 Wait for the barrier. When the specified number of threads have 00466 started waiting, they are all simultaneously awoken. If an 'action' 00467 was provided for the barrier, one of the threads will have executed 00468 that callback prior to returning. 00469 Returns an individual index number from 0 to 'parties-1'. 00470 """ 00471 if timeout is None: 00472 timeout = self._timeout 00473 with self._cond: 00474 self._enter() # Block while the barrier drains. 00475 index = self._count 00476 self._count += 1 00477 try: 00478 if index + 1 == self._parties: 00479 # We release the barrier 00480 self._release() 00481 else: 00482 # We wait until someone releases us 00483 self._wait(timeout) 00484 return index 00485 finally: 00486 self._count -= 1 00487 # Wake up any threads waiting for barrier to drain. 00488 self._exit()