Back to index

python3.2  3.2.2
lock_tests.py
Go to the documentation of this file.
00001 """
00002 Various tests for synchronization primitives.
00003 """
00004 
00005 import sys
00006 import time
00007 from _thread import start_new_thread, get_ident, TIMEOUT_MAX
00008 import threading
00009 import unittest
00010 
00011 from test import support
00012 
00013 
00014 def _wait():
00015     # A crude wait/yield function not relying on synchronization primitives.
00016     time.sleep(0.01)
00017 
00018 class Bunch(object):
00019     """
00020     A bunch of threads.
00021     """
00022     def __init__(self, f, n, wait_before_exit=False):
00023         """
00024         Construct a bunch of `n` threads running the same function `f`.
00025         If `wait_before_exit` is True, the threads won't terminate until
00026         do_finish() is called.
00027         """
00028         self.f = f
00029         self.n = n
00030         self.started = []
00031         self.finished = []
00032         self._can_exit = not wait_before_exit
00033         def task():
00034             tid = get_ident()
00035             self.started.append(tid)
00036             try:
00037                 f()
00038             finally:
00039                 self.finished.append(tid)
00040                 while not self._can_exit:
00041                     _wait()
00042         for i in range(n):
00043             start_new_thread(task, ())
00044 
00045     def wait_for_started(self):
00046         while len(self.started) < self.n:
00047             _wait()
00048 
00049     def wait_for_finished(self):
00050         while len(self.finished) < self.n:
00051             _wait()
00052 
00053     def do_finish(self):
00054         self._can_exit = True
00055 
00056 
00057 class BaseTestCase(unittest.TestCase):
00058     def setUp(self):
00059         self._threads = support.threading_setup()
00060 
00061     def tearDown(self):
00062         support.threading_cleanup(*self._threads)
00063         support.reap_children()
00064 
00065     def assertTimeout(self, actual, expected):
00066         # The waiting and/or time.time() can be imprecise, which
00067         # is why comparing to the expected value would sometimes fail
00068         # (especially under Windows).
00069         self.assertGreaterEqual(actual, expected * 0.6)
00070         # Test nothing insane happened
00071         self.assertLess(actual, expected * 10.0)
00072 
00073 
00074 class BaseLockTests(BaseTestCase):
00075     """
00076     Tests for both recursive and non-recursive locks.
00077     """
00078 
00079     def test_constructor(self):
00080         lock = self.locktype()
00081         del lock
00082 
00083     def test_acquire_destroy(self):
00084         lock = self.locktype()
00085         lock.acquire()
00086         del lock
00087 
00088     def test_acquire_release(self):
00089         lock = self.locktype()
00090         lock.acquire()
00091         lock.release()
00092         del lock
00093 
00094     def test_try_acquire(self):
00095         lock = self.locktype()
00096         self.assertTrue(lock.acquire(False))
00097         lock.release()
00098 
00099     def test_try_acquire_contended(self):
00100         lock = self.locktype()
00101         lock.acquire()
00102         result = []
00103         def f():
00104             result.append(lock.acquire(False))
00105         Bunch(f, 1).wait_for_finished()
00106         self.assertFalse(result[0])
00107         lock.release()
00108 
00109     def test_acquire_contended(self):
00110         lock = self.locktype()
00111         lock.acquire()
00112         N = 5
00113         def f():
00114             lock.acquire()
00115             lock.release()
00116 
00117         b = Bunch(f, N)
00118         b.wait_for_started()
00119         _wait()
00120         self.assertEqual(len(b.finished), 0)
00121         lock.release()
00122         b.wait_for_finished()
00123         self.assertEqual(len(b.finished), N)
00124 
00125     def test_with(self):
00126         lock = self.locktype()
00127         def f():
00128             lock.acquire()
00129             lock.release()
00130         def _with(err=None):
00131             with lock:
00132                 if err is not None:
00133                     raise err
00134         _with()
00135         # Check the lock is unacquired
00136         Bunch(f, 1).wait_for_finished()
00137         self.assertRaises(TypeError, _with, TypeError)
00138         # Check the lock is unacquired
00139         Bunch(f, 1).wait_for_finished()
00140 
00141     def test_thread_leak(self):
00142         # The lock shouldn't leak a Thread instance when used from a foreign
00143         # (non-threading) thread.
00144         lock = self.locktype()
00145         def f():
00146             lock.acquire()
00147             lock.release()
00148         n = len(threading.enumerate())
00149         # We run many threads in the hope that existing threads ids won't
00150         # be recycled.
00151         Bunch(f, 15).wait_for_finished()
00152         if len(threading.enumerate()) != n:
00153             # There is a small window during which a Thread instance's
00154             # target function has finished running, but the Thread is still
00155             # alive and registered.  Avoid spurious failures by waiting a
00156             # bit more (seen on a buildbot).
00157             time.sleep(0.4)
00158             self.assertEqual(n, len(threading.enumerate()))
00159 
00160     def test_timeout(self):
00161         lock = self.locktype()
00162         # Can't set timeout if not blocking
00163         self.assertRaises(ValueError, lock.acquire, 0, 1)
00164         # Invalid timeout values
00165         self.assertRaises(ValueError, lock.acquire, timeout=-100)
00166         self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
00167         self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
00168         # TIMEOUT_MAX is ok
00169         lock.acquire(timeout=TIMEOUT_MAX)
00170         lock.release()
00171         t1 = time.time()
00172         self.assertTrue(lock.acquire(timeout=5))
00173         t2 = time.time()
00174         # Just a sanity test that it didn't actually wait for the timeout.
00175         self.assertLess(t2 - t1, 5)
00176         results = []
00177         def f():
00178             t1 = time.time()
00179             results.append(lock.acquire(timeout=0.5))
00180             t2 = time.time()
00181             results.append(t2 - t1)
00182         Bunch(f, 1).wait_for_finished()
00183         self.assertFalse(results[0])
00184         self.assertTimeout(results[1], 0.5)
00185 
00186 
00187 class LockTests(BaseLockTests):
00188     """
00189     Tests for non-recursive, weak locks
00190     (which can be acquired and released from different threads).
00191     """
00192     def test_reacquire(self):
00193         # Lock needs to be released before re-acquiring.
00194         lock = self.locktype()
00195         phase = []
00196         def f():
00197             lock.acquire()
00198             phase.append(None)
00199             lock.acquire()
00200             phase.append(None)
00201         start_new_thread(f, ())
00202         while len(phase) == 0:
00203             _wait()
00204         _wait()
00205         self.assertEqual(len(phase), 1)
00206         lock.release()
00207         while len(phase) == 1:
00208             _wait()
00209         self.assertEqual(len(phase), 2)
00210 
00211     def test_different_thread(self):
00212         # Lock can be released from a different thread.
00213         lock = self.locktype()
00214         lock.acquire()
00215         def f():
00216             lock.release()
00217         b = Bunch(f, 1)
00218         b.wait_for_finished()
00219         lock.acquire()
00220         lock.release()
00221 
00222     def test_state_after_timeout(self):
00223         # Issue #11618: check that lock is in a proper state after a
00224         # (non-zero) timeout.
00225         lock = self.locktype()
00226         lock.acquire()
00227         self.assertFalse(lock.acquire(timeout=0.01))
00228         lock.release()
00229         self.assertFalse(lock.locked())
00230         self.assertTrue(lock.acquire(blocking=False))
00231 
00232 
00233 class RLockTests(BaseLockTests):
00234     """
00235     Tests for recursive locks.
00236     """
00237     def test_reacquire(self):
00238         lock = self.locktype()
00239         lock.acquire()
00240         lock.acquire()
00241         lock.release()
00242         lock.acquire()
00243         lock.release()
00244         lock.release()
00245 
00246     def test_release_unacquired(self):
00247         # Cannot release an unacquired lock
00248         lock = self.locktype()
00249         self.assertRaises(RuntimeError, lock.release)
00250         lock.acquire()
00251         lock.acquire()
00252         lock.release()
00253         lock.acquire()
00254         lock.release()
00255         lock.release()
00256         self.assertRaises(RuntimeError, lock.release)
00257 
00258     def test_different_thread(self):
00259         # Cannot release from a different thread
00260         lock = self.locktype()
00261         def f():
00262             lock.acquire()
00263         b = Bunch(f, 1, True)
00264         try:
00265             self.assertRaises(RuntimeError, lock.release)
00266         finally:
00267             b.do_finish()
00268 
00269     def test__is_owned(self):
00270         lock = self.locktype()
00271         self.assertFalse(lock._is_owned())
00272         lock.acquire()
00273         self.assertTrue(lock._is_owned())
00274         lock.acquire()
00275         self.assertTrue(lock._is_owned())
00276         result = []
00277         def f():
00278             result.append(lock._is_owned())
00279         Bunch(f, 1).wait_for_finished()
00280         self.assertFalse(result[0])
00281         lock.release()
00282         self.assertTrue(lock._is_owned())
00283         lock.release()
00284         self.assertFalse(lock._is_owned())
00285 
00286 
00287 class EventTests(BaseTestCase):
00288     """
00289     Tests for Event objects.
00290     """
00291 
00292     def test_is_set(self):
00293         evt = self.eventtype()
00294         self.assertFalse(evt.is_set())
00295         evt.set()
00296         self.assertTrue(evt.is_set())
00297         evt.set()
00298         self.assertTrue(evt.is_set())
00299         evt.clear()
00300         self.assertFalse(evt.is_set())
00301         evt.clear()
00302         self.assertFalse(evt.is_set())
00303 
00304     def _check_notify(self, evt):
00305         # All threads get notified
00306         N = 5
00307         results1 = []
00308         results2 = []
00309         def f():
00310             results1.append(evt.wait())
00311             results2.append(evt.wait())
00312         b = Bunch(f, N)
00313         b.wait_for_started()
00314         _wait()
00315         self.assertEqual(len(results1), 0)
00316         evt.set()
00317         b.wait_for_finished()
00318         self.assertEqual(results1, [True] * N)
00319         self.assertEqual(results2, [True] * N)
00320 
00321     def test_notify(self):
00322         evt = self.eventtype()
00323         self._check_notify(evt)
00324         # Another time, after an explicit clear()
00325         evt.set()
00326         evt.clear()
00327         self._check_notify(evt)
00328 
00329     def test_timeout(self):
00330         evt = self.eventtype()
00331         results1 = []
00332         results2 = []
00333         N = 5
00334         def f():
00335             results1.append(evt.wait(0.0))
00336             t1 = time.time()
00337             r = evt.wait(0.5)
00338             t2 = time.time()
00339             results2.append((r, t2 - t1))
00340         Bunch(f, N).wait_for_finished()
00341         self.assertEqual(results1, [False] * N)
00342         for r, dt in results2:
00343             self.assertFalse(r)
00344             self.assertTimeout(dt, 0.5)
00345         # The event is set
00346         results1 = []
00347         results2 = []
00348         evt.set()
00349         Bunch(f, N).wait_for_finished()
00350         self.assertEqual(results1, [True] * N)
00351         for r, dt in results2:
00352             self.assertTrue(r)
00353 
00354 
00355 class ConditionTests(BaseTestCase):
00356     """
00357     Tests for condition variables.
00358     """
00359 
00360     def test_acquire(self):
00361         cond = self.condtype()
00362         # Be default we have an RLock: the condition can be acquired multiple
00363         # times.
00364         cond.acquire()
00365         cond.acquire()
00366         cond.release()
00367         cond.release()
00368         lock = threading.Lock()
00369         cond = self.condtype(lock)
00370         cond.acquire()
00371         self.assertFalse(lock.acquire(False))
00372         cond.release()
00373         self.assertTrue(lock.acquire(False))
00374         self.assertFalse(cond.acquire(False))
00375         lock.release()
00376         with cond:
00377             self.assertFalse(lock.acquire(False))
00378 
00379     def test_unacquired_wait(self):
00380         cond = self.condtype()
00381         self.assertRaises(RuntimeError, cond.wait)
00382 
00383     def test_unacquired_notify(self):
00384         cond = self.condtype()
00385         self.assertRaises(RuntimeError, cond.notify)
00386 
00387     def _check_notify(self, cond):
00388         N = 5
00389         results1 = []
00390         results2 = []
00391         phase_num = 0
00392         def f():
00393             cond.acquire()
00394             result = cond.wait()
00395             cond.release()
00396             results1.append((result, phase_num))
00397             cond.acquire()
00398             result = cond.wait()
00399             cond.release()
00400             results2.append((result, phase_num))
00401         b = Bunch(f, N)
00402         b.wait_for_started()
00403         _wait()
00404         self.assertEqual(results1, [])
00405         # Notify 3 threads at first
00406         cond.acquire()
00407         cond.notify(3)
00408         _wait()
00409         phase_num = 1
00410         cond.release()
00411         while len(results1) < 3:
00412             _wait()
00413         self.assertEqual(results1, [(True, 1)] * 3)
00414         self.assertEqual(results2, [])
00415         # Notify 5 threads: they might be in their first or second wait
00416         cond.acquire()
00417         cond.notify(5)
00418         _wait()
00419         phase_num = 2
00420         cond.release()
00421         while len(results1) + len(results2) < 8:
00422             _wait()
00423         self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
00424         self.assertEqual(results2, [(True, 2)] * 3)
00425         # Notify all threads: they are all in their second wait
00426         cond.acquire()
00427         cond.notify_all()
00428         _wait()
00429         phase_num = 3
00430         cond.release()
00431         while len(results2) < 5:
00432             _wait()
00433         self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
00434         self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
00435         b.wait_for_finished()
00436 
00437     def test_notify(self):
00438         cond = self.condtype()
00439         self._check_notify(cond)
00440         # A second time, to check internal state is still ok.
00441         self._check_notify(cond)
00442 
00443     def test_timeout(self):
00444         cond = self.condtype()
00445         results = []
00446         N = 5
00447         def f():
00448             cond.acquire()
00449             t1 = time.time()
00450             result = cond.wait(0.5)
00451             t2 = time.time()
00452             cond.release()
00453             results.append((t2 - t1, result))
00454         Bunch(f, N).wait_for_finished()
00455         self.assertEqual(len(results), N)
00456         for dt, result in results:
00457             self.assertTimeout(dt, 0.5)
00458             # Note that conceptually (that"s the condition variable protocol)
00459             # a wait() may succeed even if no one notifies us and before any
00460             # timeout occurs.  Spurious wakeups can occur.
00461             # This makes it hard to verify the result value.
00462             # In practice, this implementation has no spurious wakeups.
00463             self.assertFalse(result)
00464 
00465     def test_waitfor(self):
00466         cond = self.condtype()
00467         state = 0
00468         def f():
00469             with cond:
00470                 result = cond.wait_for(lambda : state==4)
00471                 self.assertTrue(result)
00472                 self.assertEqual(state, 4)
00473         b = Bunch(f, 1)
00474         b.wait_for_started()
00475         for i in range(4):
00476             time.sleep(0.01)
00477             with cond:
00478                 state += 1
00479                 cond.notify()
00480         b.wait_for_finished()
00481 
00482     def test_waitfor_timeout(self):
00483         cond = self.condtype()
00484         state = 0
00485         success = []
00486         def f():
00487             with cond:
00488                 dt = time.time()
00489                 result = cond.wait_for(lambda : state==4, timeout=0.1)
00490                 dt = time.time() - dt
00491                 self.assertFalse(result)
00492                 self.assertTimeout(dt, 0.1)
00493                 success.append(None)
00494         b = Bunch(f, 1)
00495         b.wait_for_started()
00496         # Only increment 3 times, so state == 4 is never reached.
00497         for i in range(3):
00498             time.sleep(0.01)
00499             with cond:
00500                 state += 1
00501                 cond.notify()
00502         b.wait_for_finished()
00503         self.assertEqual(len(success), 1)
00504 
00505 
00506 class BaseSemaphoreTests(BaseTestCase):
00507     """
00508     Common tests for {bounded, unbounded} semaphore objects.
00509     """
00510 
00511     def test_constructor(self):
00512         self.assertRaises(ValueError, self.semtype, value = -1)
00513         self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
00514 
00515     def test_acquire(self):
00516         sem = self.semtype(1)
00517         sem.acquire()
00518         sem.release()
00519         sem = self.semtype(2)
00520         sem.acquire()
00521         sem.acquire()
00522         sem.release()
00523         sem.release()
00524 
00525     def test_acquire_destroy(self):
00526         sem = self.semtype()
00527         sem.acquire()
00528         del sem
00529 
00530     def test_acquire_contended(self):
00531         sem = self.semtype(7)
00532         sem.acquire()
00533         N = 10
00534         results1 = []
00535         results2 = []
00536         phase_num = 0
00537         def f():
00538             sem.acquire()
00539             results1.append(phase_num)
00540             sem.acquire()
00541             results2.append(phase_num)
00542         b = Bunch(f, 10)
00543         b.wait_for_started()
00544         while len(results1) + len(results2) < 6:
00545             _wait()
00546         self.assertEqual(results1 + results2, [0] * 6)
00547         phase_num = 1
00548         for i in range(7):
00549             sem.release()
00550         while len(results1) + len(results2) < 13:
00551             _wait()
00552         self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
00553         phase_num = 2
00554         for i in range(6):
00555             sem.release()
00556         while len(results1) + len(results2) < 19:
00557             _wait()
00558         self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
00559         # The semaphore is still locked
00560         self.assertFalse(sem.acquire(False))
00561         # Final release, to let the last thread finish
00562         sem.release()
00563         b.wait_for_finished()
00564 
00565     def test_try_acquire(self):
00566         sem = self.semtype(2)
00567         self.assertTrue(sem.acquire(False))
00568         self.assertTrue(sem.acquire(False))
00569         self.assertFalse(sem.acquire(False))
00570         sem.release()
00571         self.assertTrue(sem.acquire(False))
00572 
00573     def test_try_acquire_contended(self):
00574         sem = self.semtype(4)
00575         sem.acquire()
00576         results = []
00577         def f():
00578             results.append(sem.acquire(False))
00579             results.append(sem.acquire(False))
00580         Bunch(f, 5).wait_for_finished()
00581         # There can be a thread switch between acquiring the semaphore and
00582         # appending the result, therefore results will not necessarily be
00583         # ordered.
00584         self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
00585 
00586     def test_acquire_timeout(self):
00587         sem = self.semtype(2)
00588         self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
00589         self.assertTrue(sem.acquire(timeout=0.005))
00590         self.assertTrue(sem.acquire(timeout=0.005))
00591         self.assertFalse(sem.acquire(timeout=0.005))
00592         sem.release()
00593         self.assertTrue(sem.acquire(timeout=0.005))
00594         t = time.time()
00595         self.assertFalse(sem.acquire(timeout=0.5))
00596         dt = time.time() - t
00597         self.assertTimeout(dt, 0.5)
00598 
00599     def test_default_value(self):
00600         # The default initial value is 1.
00601         sem = self.semtype()
00602         sem.acquire()
00603         def f():
00604             sem.acquire()
00605             sem.release()
00606         b = Bunch(f, 1)
00607         b.wait_for_started()
00608         _wait()
00609         self.assertFalse(b.finished)
00610         sem.release()
00611         b.wait_for_finished()
00612 
00613     def test_with(self):
00614         sem = self.semtype(2)
00615         def _with(err=None):
00616             with sem:
00617                 self.assertTrue(sem.acquire(False))
00618                 sem.release()
00619                 with sem:
00620                     self.assertFalse(sem.acquire(False))
00621                     if err:
00622                         raise err
00623         _with()
00624         self.assertTrue(sem.acquire(False))
00625         sem.release()
00626         self.assertRaises(TypeError, _with, TypeError)
00627         self.assertTrue(sem.acquire(False))
00628         sem.release()
00629 
00630 class SemaphoreTests(BaseSemaphoreTests):
00631     """
00632     Tests for unbounded semaphores.
00633     """
00634 
00635     def test_release_unacquired(self):
00636         # Unbounded releases are allowed and increment the semaphore's value
00637         sem = self.semtype(1)
00638         sem.release()
00639         sem.acquire()
00640         sem.acquire()
00641         sem.release()
00642 
00643 
00644 class BoundedSemaphoreTests(BaseSemaphoreTests):
00645     """
00646     Tests for bounded semaphores.
00647     """
00648 
00649     def test_release_unacquired(self):
00650         # Cannot go past the initial value
00651         sem = self.semtype()
00652         self.assertRaises(ValueError, sem.release)
00653         sem.acquire()
00654         sem.release()
00655         self.assertRaises(ValueError, sem.release)
00656 
00657 
00658 class BarrierTests(BaseTestCase):
00659     """
00660     Tests for Barrier objects.
00661     """
00662     N = 5
00663     defaultTimeout = 2.0
00664 
00665     def setUp(self):
00666         self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
00667     def tearDown(self):
00668         self.barrier.abort()
00669 
00670     def run_threads(self, f):
00671         b = Bunch(f, self.N-1)
00672         f()
00673         b.wait_for_finished()
00674 
00675     def multipass(self, results, n):
00676         m = self.barrier.parties
00677         self.assertEqual(m, self.N)
00678         for i in range(n):
00679             results[0].append(True)
00680             self.assertEqual(len(results[1]), i * m)
00681             self.barrier.wait()
00682             results[1].append(True)
00683             self.assertEqual(len(results[0]), (i + 1) * m)
00684             self.barrier.wait()
00685         self.assertEqual(self.barrier.n_waiting, 0)
00686         self.assertFalse(self.barrier.broken)
00687 
00688     def test_barrier(self, passes=1):
00689         """
00690         Test that a barrier is passed in lockstep
00691         """
00692         results = [[],[]]
00693         def f():
00694             self.multipass(results, passes)
00695         self.run_threads(f)
00696 
00697     def test_barrier_10(self):
00698         """
00699         Test that a barrier works for 10 consecutive runs
00700         """
00701         return self.test_barrier(10)
00702 
00703     def test_wait_return(self):
00704         """
00705         test the return value from barrier.wait
00706         """
00707         results = []
00708         def f():
00709             r = self.barrier.wait()
00710             results.append(r)
00711 
00712         self.run_threads(f)
00713         self.assertEqual(sum(results), sum(range(self.N)))
00714 
00715     def test_action(self):
00716         """
00717         Test the 'action' callback
00718         """
00719         results = []
00720         def action():
00721             results.append(True)
00722         barrier = self.barriertype(self.N, action)
00723         def f():
00724             barrier.wait()
00725             self.assertEqual(len(results), 1)
00726 
00727         self.run_threads(f)
00728 
00729     def test_abort(self):
00730         """
00731         Test that an abort will put the barrier in a broken state
00732         """
00733         results1 = []
00734         results2 = []
00735         def f():
00736             try:
00737                 i = self.barrier.wait()
00738                 if i == self.N//2:
00739                     raise RuntimeError
00740                 self.barrier.wait()
00741                 results1.append(True)
00742             except threading.BrokenBarrierError:
00743                 results2.append(True)
00744             except RuntimeError:
00745                 self.barrier.abort()
00746                 pass
00747 
00748         self.run_threads(f)
00749         self.assertEqual(len(results1), 0)
00750         self.assertEqual(len(results2), self.N-1)
00751         self.assertTrue(self.barrier.broken)
00752 
00753     def test_reset(self):
00754         """
00755         Test that a 'reset' on a barrier frees the waiting threads
00756         """
00757         results1 = []
00758         results2 = []
00759         results3 = []
00760         def f():
00761             i = self.barrier.wait()
00762             if i == self.N//2:
00763                 # Wait until the other threads are all in the barrier.
00764                 while self.barrier.n_waiting < self.N-1:
00765                     time.sleep(0.001)
00766                 self.barrier.reset()
00767             else:
00768                 try:
00769                     self.barrier.wait()
00770                     results1.append(True)
00771                 except threading.BrokenBarrierError:
00772                     results2.append(True)
00773             # Now, pass the barrier again
00774             self.barrier.wait()
00775             results3.append(True)
00776 
00777         self.run_threads(f)
00778         self.assertEqual(len(results1), 0)
00779         self.assertEqual(len(results2), self.N-1)
00780         self.assertEqual(len(results3), self.N)
00781 
00782 
00783     def test_abort_and_reset(self):
00784         """
00785         Test that a barrier can be reset after being broken.
00786         """
00787         results1 = []
00788         results2 = []
00789         results3 = []
00790         barrier2 = self.barriertype(self.N)
00791         def f():
00792             try:
00793                 i = self.barrier.wait()
00794                 if i == self.N//2:
00795                     raise RuntimeError
00796                 self.barrier.wait()
00797                 results1.append(True)
00798             except threading.BrokenBarrierError:
00799                 results2.append(True)
00800             except RuntimeError:
00801                 self.barrier.abort()
00802                 pass
00803             # Synchronize and reset the barrier.  Must synchronize first so
00804             # that everyone has left it when we reset, and after so that no
00805             # one enters it before the reset.
00806             if barrier2.wait() == self.N//2:
00807                 self.barrier.reset()
00808             barrier2.wait()
00809             self.barrier.wait()
00810             results3.append(True)
00811 
00812         self.run_threads(f)
00813         self.assertEqual(len(results1), 0)
00814         self.assertEqual(len(results2), self.N-1)
00815         self.assertEqual(len(results3), self.N)
00816 
00817     def test_timeout(self):
00818         """
00819         Test wait(timeout)
00820         """
00821         def f():
00822             i = self.barrier.wait()
00823             if i == self.N // 2:
00824                 # One thread is late!
00825                 time.sleep(1.0)
00826             # Default timeout is 2.0, so this is shorter.
00827             self.assertRaises(threading.BrokenBarrierError,
00828                               self.barrier.wait, 0.5)
00829         self.run_threads(f)
00830 
00831     def test_default_timeout(self):
00832         """
00833         Test the barrier's default timeout
00834         """
00835         # create a barrier with a low default timeout
00836         barrier = self.barriertype(self.N, timeout=0.3)
00837         def f():
00838             i = barrier.wait()
00839             if i == self.N // 2:
00840                 # One thread is later than the default timeout of 0.3s.
00841                 time.sleep(1.0)
00842             self.assertRaises(threading.BrokenBarrierError, barrier.wait)
00843         self.run_threads(f)
00844 
00845     def test_single_thread(self):
00846         b = self.barriertype(1)
00847         b.wait()
00848         b.wait()