Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Static Public Attributes
test.test_multiprocessing._TestCondition Class Reference
Inheritance diagram for test.test_multiprocessing._TestCondition:
Inheritance graph
[legend]
Collaboration diagram for test.test_multiprocessing._TestCondition:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def f
def check_invariant
def test_notify
def test_notify_all
def test_timeout
def assertTimingAlmostEqual
def assertReturnsIfImplemented
def __reduce__

Public Attributes

 TYPE
_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Static Public Attributes

tuple ALLOWED_TYPES = ('processes', 'manager', 'threads')

Detailed Description

Definition at line 643 of file test_multiprocessing.py.


Member Function Documentation

def test.test_multiprocessing.BaseTestCase.__reduce__ (   self,
  args 
) [inherited]

Definition at line 124 of file test_multiprocessing.py.

00124 
00125     def __reduce__(self, *args):
00126         raise NotImplementedError("shouldn't try to pickle a test case")

def test.test_multiprocessing.BaseTestCase.assertReturnsIfImplemented (   self,
  value,
  func,
  args 
) [inherited]

Definition at line 114 of file test_multiprocessing.py.

00114 
00115     def assertReturnsIfImplemented(self, value, func, *args):
00116         try:
00117             res = func(*args)
00118         except NotImplementedError:
00119             pass
00120         else:
00121             return self.assertEqual(value, res)

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_multiprocessing.BaseTestCase.assertTimingAlmostEqual (   self,
  a,
  b 
) [inherited]

Definition at line 110 of file test_multiprocessing.py.

00110 
00111     def assertTimingAlmostEqual(self, a, b):
00112         if CHECK_TIMINGS:
00113             self.assertAlmostEqual(a, b, 1)

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 653 of file test_multiprocessing.py.

00653 
00654     def check_invariant(self, cond):
00655         # this is only supposed to succeed when there are no sleepers
00656         if self.TYPE == 'processes':
00657             try:
00658                 sleepers = (cond._sleeping_count.get_value() -
00659                             cond._woken_count.get_value())
00660                 self.assertEqual(sleepers, 0)
00661                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
00662             except NotImplementedError:
00663                 pass

Here is the caller graph for this function:

def test.test_multiprocessing._TestCondition.f (   cls,
  cond,
  sleeping,
  woken,
  timeout = None 
)

Definition at line 646 of file test_multiprocessing.py.

00646 
00647     def f(cls, cond, sleeping, woken, timeout=None):
00648         cond.acquire()
00649         sleeping.release()
00650         cond.wait(timeout)
00651         woken.release()
00652         cond.release()

Here is the caller graph for this function:

Definition at line 664 of file test_multiprocessing.py.

00664 
00665     def test_notify(self):
00666         cond = self.Condition()
00667         sleeping = self.Semaphore(0)
00668         woken = self.Semaphore(0)
00669 
00670         p = self.Process(target=self.f, args=(cond, sleeping, woken))
00671         p.daemon = True
00672         p.start()
00673 
00674         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
00675         p.daemon = True
00676         p.start()
00677 
00678         # wait for both children to start sleeping
00679         sleeping.acquire()
00680         sleeping.acquire()
00681 
00682         # check no process/thread has woken up
00683         time.sleep(DELTA)
00684         self.assertReturnsIfImplemented(0, get_value, woken)
00685 
00686         # wake up one process/thread
00687         cond.acquire()
00688         cond.notify()
00689         cond.release()
00690 
00691         # check one process/thread has woken up
00692         time.sleep(DELTA)
00693         self.assertReturnsIfImplemented(1, get_value, woken)
00694 
00695         # wake up another
00696         cond.acquire()
00697         cond.notify()
00698         cond.release()
00699 
00700         # check other has woken up
00701         time.sleep(DELTA)
00702         self.assertReturnsIfImplemented(2, get_value, woken)
00703 
00704         # check state is not mucked up
00705         self.check_invariant(cond)
00706         p.join()

Here is the call graph for this function:

Definition at line 707 of file test_multiprocessing.py.

00707 
00708     def test_notify_all(self):
00709         cond = self.Condition()
00710         sleeping = self.Semaphore(0)
00711         woken = self.Semaphore(0)
00712 
00713         # start some threads/processes which will timeout
00714         for i in range(3):
00715             p = self.Process(target=self.f,
00716                              args=(cond, sleeping, woken, TIMEOUT1))
00717             p.daemon = True
00718             p.start()
00719 
00720             t = threading.Thread(target=self.f,
00721                                  args=(cond, sleeping, woken, TIMEOUT1))
00722             t.daemon = True
00723             t.start()
00724 
00725         # wait for them all to sleep
00726         for i in range(6):
00727             sleeping.acquire()
00728 
00729         # check they have all timed out
00730         for i in range(6):
00731             woken.acquire()
00732         self.assertReturnsIfImplemented(0, get_value, woken)
00733 
00734         # check state is not mucked up
00735         self.check_invariant(cond)
00736 
00737         # start some more threads/processes
00738         for i in range(3):
00739             p = self.Process(target=self.f, args=(cond, sleeping, woken))
00740             p.daemon = True
00741             p.start()
00742 
00743             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
00744             t.daemon = True
00745             t.start()
00746 
00747         # wait for them to all sleep
00748         for i in range(6):
00749             sleeping.acquire()
00750 
00751         # check no process/thread has woken up
00752         time.sleep(DELTA)
00753         self.assertReturnsIfImplemented(0, get_value, woken)
00754 
00755         # wake them all up
00756         cond.acquire()
00757         cond.notify_all()
00758         cond.release()
00759 
00760         # check they have all woken
00761         for i in range(10):
00762             try:
00763                 if get_value(woken) == 6:
00764                     break
00765             except NotImplementedError:
00766                 break
00767             time.sleep(DELTA)
00768         self.assertReturnsIfImplemented(6, get_value, woken)
00769 
00770         # check state is not mucked up
00771         self.check_invariant(cond)

Here is the call graph for this function:

Definition at line 772 of file test_multiprocessing.py.

00772 
00773     def test_timeout(self):
00774         cond = self.Condition()
00775         wait = TimingWrapper(cond.wait)
00776         cond.acquire()
00777         res = wait(TIMEOUT1)
00778         cond.release()
00779         self.assertEqual(res, False)
00780         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
00781 

Here is the call graph for this function:


Member Data Documentation

tuple test.test_multiprocessing.BaseTestCase.ALLOWED_TYPES = ('processes', 'manager', 'threads') [static, inherited]

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.

Definition at line 655 of file test_multiprocessing.py.


The documentation for this class was generated from the following file: