Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Static Public Attributes
test.test_multiprocessing._TestPool Class Reference
Inheritance diagram for test.test_multiprocessing._TestPool:
Inheritance graph
[legend]
Collaboration diagram for test.test_multiprocessing._TestPool:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def test_apply
def test_map
def test_map_chunksize
def test_async
def test_async_timeout
def test_imap
def test_imap_unordered
def test_make_pool
def test_terminate
def assertTimingAlmostEqual
def assertReturnsIfImplemented
def __reduce__

Public Attributes

 TYPE
_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Static Public Attributes

tuple ALLOWED_TYPES = ('processes', 'manager', 'threads')

Detailed Description

Definition at line 1039 of file test_multiprocessing.py.


Member Function Documentation

def test.test_multiprocessing.BaseTestCase.__reduce__ (   self,
  args 
) [inherited]

Definition at line 124 of file test_multiprocessing.py.

00124 
00125     def __reduce__(self, *args):
00126         raise NotImplementedError("shouldn't try to pickle a test case")

def test.test_multiprocessing.BaseTestCase.assertReturnsIfImplemented (   self,
  value,
  func,
  args 
) [inherited]

Definition at line 114 of file test_multiprocessing.py.

00114 
00115     def assertReturnsIfImplemented(self, value, func, *args):
00116         try:
00117             res = func(*args)
00118         except NotImplementedError:
00119             pass
00120         else:
00121             return self.assertEqual(value, res)

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_multiprocessing.BaseTestCase.assertTimingAlmostEqual (   self,
  a,
  b 
) [inherited]

Definition at line 110 of file test_multiprocessing.py.

00110 
00111     def assertTimingAlmostEqual(self, a, b):
00112         if CHECK_TIMINGS:
00113             self.assertAlmostEqual(a, b, 1)

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1041 of file test_multiprocessing.py.

01041 
01042     def test_apply(self):
01043         papply = self.pool.apply
01044         self.assertEqual(papply(sqr, (5,)), sqr(5))
01045         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))

Here is the call graph for this function:

Definition at line 1058 of file test_multiprocessing.py.

01058 
01059     def test_async(self):
01060         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
01061         get = TimingWrapper(res.get)
01062         self.assertEqual(get(), 49)
01063         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)

Here is the call graph for this function:

Definition at line 1064 of file test_multiprocessing.py.

01064 
01065     def test_async_timeout(self):
01066         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
01067         get = TimingWrapper(res.get)
01068         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
01069         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)

Here is the call graph for this function:

Definition at line 1070 of file test_multiprocessing.py.

01070 
01071     def test_imap(self):
01072         it = self.pool.imap(sqr, list(range(10)))
01073         self.assertEqual(list(it), list(map(sqr, list(range(10)))))
01074 
01075         it = self.pool.imap(sqr, list(range(10)))
01076         for i in range(10):
01077             self.assertEqual(next(it), i*i)
01078         self.assertRaises(StopIteration, it.__next__)
01079 
01080         it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
01081         for i in range(1000):
01082             self.assertEqual(next(it), i*i)
01083         self.assertRaises(StopIteration, it.__next__)

Here is the call graph for this function:

Definition at line 1084 of file test_multiprocessing.py.

01084 
01085     def test_imap_unordered(self):
01086         it = self.pool.imap_unordered(sqr, list(range(1000)))
01087         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
01088 
01089         it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
01090         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

Here is the call graph for this function:

Definition at line 1091 of file test_multiprocessing.py.

01091 
01092     def test_make_pool(self):
01093         self.assertRaises(ValueError, multiprocessing.Pool, -1)
01094         self.assertRaises(ValueError, multiprocessing.Pool, 0)
01095 
01096         p = multiprocessing.Pool(3)
01097         self.assertEqual(3, len(p._pool))
01098         p.close()
01099         p.join()

Here is the call graph for this function:

Definition at line 1046 of file test_multiprocessing.py.

01046 
01047     def test_map(self):
01048         pmap = self.pool.map
01049         self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
01050         self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
01051                          list(map(sqr, list(range(100)))))

Here is the call graph for this function:

Definition at line 1052 of file test_multiprocessing.py.

01052 
01053     def test_map_chunksize(self):
01054         try:
01055             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
01056         except multiprocessing.TimeoutError:
01057             self.fail("pool.map_async with chunksize stalled on null list")

Here is the call graph for this function:

Definition at line 1100 of file test_multiprocessing.py.

01100 
01101     def test_terminate(self):
01102         if self.TYPE == 'manager':
01103             # On Unix a forked process increfs each shared object to
01104             # which its parent process held a reference.  If the
01105             # forked process gets terminated then there is likely to
01106             # be a reference leak.  So to prevent
01107             # _TestZZZNumberOfObjects from failing we skip this test
01108             # when using a manager.
01109             return
01110 
01111         result = self.pool.map_async(
01112             time.sleep, [0.1 for i in range(10000)], chunksize=1
01113             )
01114         self.pool.terminate()
01115         join = TimingWrapper(self.pool.join)
01116         join()
01117         self.assertLess(join.elapsed, 0.5)


Member Data Documentation

tuple test.test_multiprocessing.BaseTestCase.ALLOWED_TYPES = ('processes', 'manager', 'threads') [static, inherited]

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.

Definition at line 1101 of file test_multiprocessing.py.


The documentation for this class was generated from the following file: