Back to index

python3.2  3.2.2
Functions
mp_pool Namespace Reference

Functions

def calculate
def calculatestar
def mul
def plus
def f
def pow3
def noop
def test

Function Documentation

def mp_pool.calculate (   func,
  args 
)

Definition at line 17 of file mp_pool.py.

00017 
00018 def calculate(func, args):
00019     result = func(*args)
00020     return '%s says that %s%s = %s' % (
00021         multiprocessing.current_process().name,
00022         func.__name__, args, result
00023         )

Here is the caller graph for this function:

def mp_pool.calculatestar (   args)

Definition at line 24 of file mp_pool.py.

00024 
00025 def calculatestar(args):
00026     return calculate(*args)

Here is the call graph for this function:

def mp_pool.f (   x)

Definition at line 35 of file mp_pool.py.

00035 
00036 def f(x):
00037     return 1.0 / (x - 5.0)

def mp_pool.mul (   a,
  b 
)

Definition at line 27 of file mp_pool.py.

00027 
00028 def mul(a, b):
00029     time.sleep(0.5 * random.random())
00030     return a * b

Here is the caller graph for this function:

def mp_pool.noop (   x)

Definition at line 41 of file mp_pool.py.

00041 
00042 def noop(x):
00043     pass
00044 
00045 #
00046 # Test code
00047 #

def mp_pool.plus (   a,
  b 
)

Definition at line 31 of file mp_pool.py.

00031 
00032 def plus(a, b):
00033     time.sleep(0.5 * random.random())
00034     return a + b

Here is the caller graph for this function:

def mp_pool.pow3 (   x)

Definition at line 38 of file mp_pool.py.

00038 
00039 def pow3(x):
00040     return x ** 3

def mp_pool.test ( )

Definition at line 48 of file mp_pool.py.

00048 
00049 def test():
00050     print('cpu_count() = %d\n' % multiprocessing.cpu_count())
00051 
00052     #
00053     # Create pool
00054     #
00055 
00056     PROCESSES = 4
00057     print('Creating pool with %d processes\n' % PROCESSES)
00058     pool = multiprocessing.Pool(PROCESSES)
00059     print('pool = %s' % pool)
00060     print()
00061 
00062     #
00063     # Tests
00064     #
00065 
00066     TASKS = [(mul, (i, 7)) for i in range(10)] + \
00067             [(plus, (i, 8)) for i in range(10)]
00068 
00069     results = [pool.apply_async(calculate, t) for t in TASKS]
00070     imap_it = pool.imap(calculatestar, TASKS)
00071     imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
00072 
00073     print('Ordered results using pool.apply_async():')
00074     for r in results:
00075         print('\t', r.get())
00076     print()
00077 
00078     print('Ordered results using pool.imap():')
00079     for x in imap_it:
00080         print('\t', x)
00081     print()
00082 
00083     print('Unordered results using pool.imap_unordered():')
00084     for x in imap_unordered_it:
00085         print('\t', x)
00086     print()
00087 
00088     print('Ordered results using pool.map() --- will block till complete:')
00089     for x in pool.map(calculatestar, TASKS):
00090         print('\t', x)
00091     print()
00092 
00093     #
00094     # Simple benchmarks
00095     #
00096 
00097     N = 100000
00098     print('def pow3(x): return x**3')
00099 
00100     t = time.time()
00101     A = list(map(pow3, range(N)))
00102     print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
00103           (N, time.time() - t))
00104 
00105     t = time.time()
00106     B = pool.map(pow3, range(N))
00107     print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
00108           (N, time.time() - t))
00109 
00110     t = time.time()
00111     C = list(pool.imap(pow3, range(N), chunksize=N//8))
00112     print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
00113           ' seconds' % (N, N//8, time.time() - t))
00114 
00115     assert A == B == C, (len(A), len(B), len(C))
00116     print()
00117 
00118     L = [None] * 1000000
00119     print('def noop(x): pass')
00120     print('L = [None] * 1000000')
00121 
00122     t = time.time()
00123     A = list(map(noop, L))
00124     print('\tmap(noop, L):\n\t\t%s seconds' % \
00125           (time.time() - t))
00126 
00127     t = time.time()
00128     B = pool.map(noop, L)
00129     print('\tpool.map(noop, L):\n\t\t%s seconds' % \
00130           (time.time() - t))
00131 
00132     t = time.time()
00133     C = list(pool.imap(noop, L, chunksize=len(L)//8))
00134     print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
00135           (len(L)//8, time.time() - t))
00136 
00137     assert A == B == C, (len(A), len(B), len(C))
00138     print()
00139 
00140     del A, B, C, L
00141 
00142     #
00143     # Test error handling
00144     #
00145 
00146     print('Testing error handling:')
00147 
00148     try:
00149         print(pool.apply(f, (5,)))
00150     except ZeroDivisionError:
00151         print('\tGot ZeroDivisionError as expected from pool.apply()')
00152     else:
00153         raise AssertionError('expected ZeroDivisionError')
00154 
00155     try:
00156         print(pool.map(f, list(range(10))))
00157     except ZeroDivisionError:
00158         print('\tGot ZeroDivisionError as expected from pool.map()')
00159     else:
00160         raise AssertionError('expected ZeroDivisionError')
00161 
00162     try:
00163         print(list(pool.imap(f, list(range(10)))))
00164     except ZeroDivisionError:
00165         print('\tGot ZeroDivisionError as expected from list(pool.imap())')
00166     else:
00167         raise AssertionError('expected ZeroDivisionError')
00168 
00169     it = pool.imap(f, list(range(10)))
00170     for i in range(10):
00171         try:
00172             x = next(it)
00173         except ZeroDivisionError:
00174             if i == 5:
00175                 pass
00176         except StopIteration:
00177             break
00178         else:
00179             if i == 5:
00180                 raise AssertionError('expected ZeroDivisionError')
00181 
00182     assert i == 9
00183     print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
00184     print()
00185 
00186     #
00187     # Testing timeouts
00188     #
00189 
00190     print('Testing ApplyResult.get() with timeout:', end=' ')
00191     res = pool.apply_async(calculate, TASKS[0])
00192     while 1:
00193         sys.stdout.flush()
00194         try:
00195             sys.stdout.write('\n\t%s' % res.get(0.02))
00196             break
00197         except multiprocessing.TimeoutError:
00198             sys.stdout.write('.')
00199     print()
00200     print()
00201 
00202     print('Testing IMapIterator.next() with timeout:', end=' ')
00203     it = pool.imap(calculatestar, TASKS)
00204     while 1:
00205         sys.stdout.flush()
00206         try:
00207             sys.stdout.write('\n\t%s' % it.next(0.02))
00208         except StopIteration:
00209             break
00210         except multiprocessing.TimeoutError:
00211             sys.stdout.write('.')
00212     print()
00213     print()
00214 
00215     #
00216     # Testing callback
00217     #
00218 
00219     print('Testing callback:')
00220 
00221     A = []
00222     B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
00223 
00224     r = pool.apply_async(mul, (7, 8), callback=A.append)
00225     r.wait()
00226 
00227     r = pool.map_async(pow3, list(range(10)), callback=A.extend)
00228     r.wait()
00229 
00230     if A == B:
00231         print('\tcallbacks succeeded\n')
00232     else:
00233         print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
00234 
00235     #
00236     # Check there are no outstanding tasks
00237     #
00238 
00239     assert not pool._cache, 'cache = %r' % pool._cache
00240 
00241     #
00242     # Check close() methods
00243     #
00244 
00245     print('Testing close():')
00246 
00247     for worker in pool._pool:
00248         assert worker.is_alive()
00249 
00250     result = pool.apply_async(time.sleep, [0.5])
00251     pool.close()
00252     pool.join()
00253 
00254     assert result.get() is None
00255 
00256     for worker in pool._pool:
00257         assert not worker.is_alive()
00258 
00259     print('\tclose() succeeded\n')
00260 
00261     #
00262     # Check terminate() method
00263     #
00264 
00265     print('Testing terminate():')
00266 
00267     pool = multiprocessing.Pool(2)
00268     DELTA = 0.1
00269     ignore = pool.apply(pow3, [2])
00270     results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
00271     pool.terminate()
00272     pool.join()
00273 
00274     for worker in pool._pool:
00275         assert not worker.is_alive()
00276 
00277     print('\tterminate() succeeded\n')
00278 
00279     #
00280     # Check garbage collection
00281     #
00282 
00283     print('Testing garbage collection:')
00284 
00285     pool = multiprocessing.Pool(2)
00286     DELTA = 0.1
00287     processes = pool._pool
00288     ignore = pool.apply(pow3, [2])
00289     results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
00290 
00291     results = pool = None
00292 
00293     time.sleep(DELTA * 2)
00294 
00295     for worker in processes:
00296         assert not worker.is_alive()
00297 
00298     print('\tgarbage collection succeeded\n')
00299 

Here is the call graph for this function: