Back to index

python3.2  3.2.2
test_multiprocessing.py
Go to the documentation of this file.
00001 #!/usr/bin/env python3
00002 
00003 #
00004 # Unit tests for the multiprocessing package
00005 #
00006 
00007 import unittest
00008 import queue as pyqueue
00009 import time
00010 import io
00011 import sys
00012 import os
00013 import gc
00014 import errno
00015 import signal
00016 import array
00017 import socket
00018 import random
00019 import logging
00020 import test.support
00021 
00022 
00023 # Skip tests if _multiprocessing wasn't built.
00024 _multiprocessing = test.support.import_module('_multiprocessing')
00025 # Skip tests if sem_open implementation is broken.
00026 test.support.import_module('multiprocessing.synchronize')
00027 # import threading after _multiprocessing to raise a more revelant error
00028 # message: "No module named _multiprocessing". _multiprocessing is not compiled
00029 # without thread support.
00030 import threading
00031 
00032 import multiprocessing.dummy
00033 import multiprocessing.connection
00034 import multiprocessing.managers
00035 import multiprocessing.heap
00036 import multiprocessing.pool
00037 
00038 from multiprocessing import util
00039 
00040 try:
00041     from multiprocessing.sharedctypes import Value, copy
00042     HAS_SHAREDCTYPES = True
00043 except ImportError:
00044     HAS_SHAREDCTYPES = False
00045 
00046 #
00047 #
00048 #
00049 
00050 def latin(s):
00051     return s.encode('latin')
00052 
00053 #
00054 # Constants
00055 #
00056 
00057 LOG_LEVEL = util.SUBWARNING
00058 #LOG_LEVEL = logging.DEBUG
00059 
00060 DELTA = 0.1
00061 CHECK_TIMINGS = False     # making true makes tests take a lot longer
00062                           # and can sometimes cause some non-serious
00063                           # failures because some calls block a bit
00064                           # longer than expected
00065 if CHECK_TIMINGS:
00066     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
00067 else:
00068     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
00069 
00070 HAVE_GETVALUE = not getattr(_multiprocessing,
00071                             'HAVE_BROKEN_SEM_GETVALUE', False)
00072 
00073 WIN32 = (sys.platform == "win32")
00074 
00075 #
00076 # Some tests require ctypes
00077 #
00078 
00079 try:
00080     from ctypes import Structure, c_int, c_double
00081 except ImportError:
00082     Structure = object
00083     c_int = c_double = None
00084 
00085 #
00086 # Creates a wrapper for a function which records the time it takes to finish
00087 #
00088 
00089 class TimingWrapper(object):
00090 
00091     def __init__(self, func):
00092         self.func = func
00093         self.elapsed = None
00094 
00095     def __call__(self, *args, **kwds):
00096         t = time.time()
00097         try:
00098             return self.func(*args, **kwds)
00099         finally:
00100             self.elapsed = time.time() - t
00101 
00102 #
00103 # Base class for test cases
00104 #
00105 
00106 class BaseTestCase(object):
00107 
00108     ALLOWED_TYPES = ('processes', 'manager', 'threads')
00109 
00110     def assertTimingAlmostEqual(self, a, b):
00111         if CHECK_TIMINGS:
00112             self.assertAlmostEqual(a, b, 1)
00113 
00114     def assertReturnsIfImplemented(self, value, func, *args):
00115         try:
00116             res = func(*args)
00117         except NotImplementedError:
00118             pass
00119         else:
00120             return self.assertEqual(value, res)
00121 
00122     # For the sanity of Windows users, rather than crashing or freezing in
00123     # multiple ways.
00124     def __reduce__(self, *args):
00125         raise NotImplementedError("shouldn't try to pickle a test case")
00126 
00127     __reduce_ex__ = __reduce__
00128 
00129 #
00130 # Return the value of a semaphore
00131 #
00132 
00133 def get_value(self):
00134     try:
00135         return self.get_value()
00136     except AttributeError:
00137         try:
00138             return self._Semaphore__value
00139         except AttributeError:
00140             try:
00141                 return self._value
00142             except AttributeError:
00143                 raise NotImplementedError
00144 
00145 #
00146 # Testcases
00147 #
00148 
00149 class _TestProcess(BaseTestCase):
00150 
00151     ALLOWED_TYPES = ('processes', 'threads')
00152 
00153     def test_current(self):
00154         if self.TYPE == 'threads':
00155             return
00156 
00157         current = self.current_process()
00158         authkey = current.authkey
00159 
00160         self.assertTrue(current.is_alive())
00161         self.assertTrue(not current.daemon)
00162         self.assertIsInstance(authkey, bytes)
00163         self.assertTrue(len(authkey) > 0)
00164         self.assertEqual(current.ident, os.getpid())
00165         self.assertEqual(current.exitcode, None)
00166 
00167     @classmethod
00168     def _test(cls, q, *args, **kwds):
00169         current = cls.current_process()
00170         q.put(args)
00171         q.put(kwds)
00172         q.put(current.name)
00173         if cls.TYPE != 'threads':
00174             q.put(bytes(current.authkey))
00175             q.put(current.pid)
00176 
00177     def test_process(self):
00178         q = self.Queue(1)
00179         e = self.Event()
00180         args = (q, 1, 2)
00181         kwargs = {'hello':23, 'bye':2.54}
00182         name = 'SomeProcess'
00183         p = self.Process(
00184             target=self._test, args=args, kwargs=kwargs, name=name
00185             )
00186         p.daemon = True
00187         current = self.current_process()
00188 
00189         if self.TYPE != 'threads':
00190             self.assertEqual(p.authkey, current.authkey)
00191         self.assertEqual(p.is_alive(), False)
00192         self.assertEqual(p.daemon, True)
00193         self.assertNotIn(p, self.active_children())
00194         self.assertTrue(type(self.active_children()) is list)
00195         self.assertEqual(p.exitcode, None)
00196 
00197         p.start()
00198 
00199         self.assertEqual(p.exitcode, None)
00200         self.assertEqual(p.is_alive(), True)
00201         self.assertIn(p, self.active_children())
00202 
00203         self.assertEqual(q.get(), args[1:])
00204         self.assertEqual(q.get(), kwargs)
00205         self.assertEqual(q.get(), p.name)
00206         if self.TYPE != 'threads':
00207             self.assertEqual(q.get(), current.authkey)
00208             self.assertEqual(q.get(), p.pid)
00209 
00210         p.join()
00211 
00212         self.assertEqual(p.exitcode, 0)
00213         self.assertEqual(p.is_alive(), False)
00214         self.assertNotIn(p, self.active_children())
00215 
00216     @classmethod
00217     def _test_terminate(cls):
00218         time.sleep(1000)
00219 
00220     def test_terminate(self):
00221         if self.TYPE == 'threads':
00222             return
00223 
00224         p = self.Process(target=self._test_terminate)
00225         p.daemon = True
00226         p.start()
00227 
00228         self.assertEqual(p.is_alive(), True)
00229         self.assertIn(p, self.active_children())
00230         self.assertEqual(p.exitcode, None)
00231 
00232         p.terminate()
00233 
00234         join = TimingWrapper(p.join)
00235         self.assertEqual(join(), None)
00236         self.assertTimingAlmostEqual(join.elapsed, 0.0)
00237 
00238         self.assertEqual(p.is_alive(), False)
00239         self.assertNotIn(p, self.active_children())
00240 
00241         p.join()
00242 
00243         # XXX sometimes get p.exitcode == 0 on Windows ...
00244         #self.assertEqual(p.exitcode, -signal.SIGTERM)
00245 
00246     def test_cpu_count(self):
00247         try:
00248             cpus = multiprocessing.cpu_count()
00249         except NotImplementedError:
00250             cpus = 1
00251         self.assertTrue(type(cpus) is int)
00252         self.assertTrue(cpus >= 1)
00253 
00254     def test_active_children(self):
00255         self.assertEqual(type(self.active_children()), list)
00256 
00257         p = self.Process(target=time.sleep, args=(DELTA,))
00258         self.assertNotIn(p, self.active_children())
00259 
00260         p.start()
00261         self.assertIn(p, self.active_children())
00262 
00263         p.join()
00264         self.assertNotIn(p, self.active_children())
00265 
00266     @classmethod
00267     def _test_recursion(cls, wconn, id):
00268         from multiprocessing import forking
00269         wconn.send(id)
00270         if len(id) < 2:
00271             for i in range(2):
00272                 p = cls.Process(
00273                     target=cls._test_recursion, args=(wconn, id+[i])
00274                     )
00275                 p.start()
00276                 p.join()
00277 
00278     def test_recursion(self):
00279         rconn, wconn = self.Pipe(duplex=False)
00280         self._test_recursion(wconn, [])
00281 
00282         time.sleep(DELTA)
00283         result = []
00284         while rconn.poll():
00285             result.append(rconn.recv())
00286 
00287         expected = [
00288             [],
00289               [0],
00290                 [0, 0],
00291                 [0, 1],
00292               [1],
00293                 [1, 0],
00294                 [1, 1]
00295             ]
00296         self.assertEqual(result, expected)
00297 
00298 #
00299 #
00300 #
00301 
00302 class _UpperCaser(multiprocessing.Process):
00303 
00304     def __init__(self):
00305         multiprocessing.Process.__init__(self)
00306         self.child_conn, self.parent_conn = multiprocessing.Pipe()
00307 
00308     def run(self):
00309         self.parent_conn.close()
00310         for s in iter(self.child_conn.recv, None):
00311             self.child_conn.send(s.upper())
00312         self.child_conn.close()
00313 
00314     def submit(self, s):
00315         assert type(s) is str
00316         self.parent_conn.send(s)
00317         return self.parent_conn.recv()
00318 
00319     def stop(self):
00320         self.parent_conn.send(None)
00321         self.parent_conn.close()
00322         self.child_conn.close()
00323 
00324 class _TestSubclassingProcess(BaseTestCase):
00325 
00326     ALLOWED_TYPES = ('processes',)
00327 
00328     def test_subclassing(self):
00329         uppercaser = _UpperCaser()
00330         uppercaser.start()
00331         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
00332         self.assertEqual(uppercaser.submit('world'), 'WORLD')
00333         uppercaser.stop()
00334         uppercaser.join()
00335 
00336 #
00337 #
00338 #
00339 
00340 def queue_empty(q):
00341     if hasattr(q, 'empty'):
00342         return q.empty()
00343     else:
00344         return q.qsize() == 0
00345 
00346 def queue_full(q, maxsize):
00347     if hasattr(q, 'full'):
00348         return q.full()
00349     else:
00350         return q.qsize() == maxsize
00351 
00352 
00353 class _TestQueue(BaseTestCase):
00354 
00355 
00356     @classmethod
00357     def _test_put(cls, queue, child_can_start, parent_can_continue):
00358         child_can_start.wait()
00359         for i in range(6):
00360             queue.get()
00361         parent_can_continue.set()
00362 
00363     def test_put(self):
00364         MAXSIZE = 6
00365         queue = self.Queue(maxsize=MAXSIZE)
00366         child_can_start = self.Event()
00367         parent_can_continue = self.Event()
00368 
00369         proc = self.Process(
00370             target=self._test_put,
00371             args=(queue, child_can_start, parent_can_continue)
00372             )
00373         proc.daemon = True
00374         proc.start()
00375 
00376         self.assertEqual(queue_empty(queue), True)
00377         self.assertEqual(queue_full(queue, MAXSIZE), False)
00378 
00379         queue.put(1)
00380         queue.put(2, True)
00381         queue.put(3, True, None)
00382         queue.put(4, False)
00383         queue.put(5, False, None)
00384         queue.put_nowait(6)
00385 
00386         # the values may be in buffer but not yet in pipe so sleep a bit
00387         time.sleep(DELTA)
00388 
00389         self.assertEqual(queue_empty(queue), False)
00390         self.assertEqual(queue_full(queue, MAXSIZE), True)
00391 
00392         put = TimingWrapper(queue.put)
00393         put_nowait = TimingWrapper(queue.put_nowait)
00394 
00395         self.assertRaises(pyqueue.Full, put, 7, False)
00396         self.assertTimingAlmostEqual(put.elapsed, 0)
00397 
00398         self.assertRaises(pyqueue.Full, put, 7, False, None)
00399         self.assertTimingAlmostEqual(put.elapsed, 0)
00400 
00401         self.assertRaises(pyqueue.Full, put_nowait, 7)
00402         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
00403 
00404         self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
00405         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
00406 
00407         self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
00408         self.assertTimingAlmostEqual(put.elapsed, 0)
00409 
00410         self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
00411         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
00412 
00413         child_can_start.set()
00414         parent_can_continue.wait()
00415 
00416         self.assertEqual(queue_empty(queue), True)
00417         self.assertEqual(queue_full(queue, MAXSIZE), False)
00418 
00419         proc.join()
00420 
00421     @classmethod
00422     def _test_get(cls, queue, child_can_start, parent_can_continue):
00423         child_can_start.wait()
00424         #queue.put(1)
00425         queue.put(2)
00426         queue.put(3)
00427         queue.put(4)
00428         queue.put(5)
00429         parent_can_continue.set()
00430 
00431     def test_get(self):
00432         queue = self.Queue()
00433         child_can_start = self.Event()
00434         parent_can_continue = self.Event()
00435 
00436         proc = self.Process(
00437             target=self._test_get,
00438             args=(queue, child_can_start, parent_can_continue)
00439             )
00440         proc.daemon = True
00441         proc.start()
00442 
00443         self.assertEqual(queue_empty(queue), True)
00444 
00445         child_can_start.set()
00446         parent_can_continue.wait()
00447 
00448         time.sleep(DELTA)
00449         self.assertEqual(queue_empty(queue), False)
00450 
00451         # Hangs unexpectedly, remove for now
00452         #self.assertEqual(queue.get(), 1)
00453         self.assertEqual(queue.get(True, None), 2)
00454         self.assertEqual(queue.get(True), 3)
00455         self.assertEqual(queue.get(timeout=1), 4)
00456         self.assertEqual(queue.get_nowait(), 5)
00457 
00458         self.assertEqual(queue_empty(queue), True)
00459 
00460         get = TimingWrapper(queue.get)
00461         get_nowait = TimingWrapper(queue.get_nowait)
00462 
00463         self.assertRaises(pyqueue.Empty, get, False)
00464         self.assertTimingAlmostEqual(get.elapsed, 0)
00465 
00466         self.assertRaises(pyqueue.Empty, get, False, None)
00467         self.assertTimingAlmostEqual(get.elapsed, 0)
00468 
00469         self.assertRaises(pyqueue.Empty, get_nowait)
00470         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
00471 
00472         self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
00473         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
00474 
00475         self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
00476         self.assertTimingAlmostEqual(get.elapsed, 0)
00477 
00478         self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
00479         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
00480 
00481         proc.join()
00482 
00483     @classmethod
00484     def _test_fork(cls, queue):
00485         for i in range(10, 20):
00486             queue.put(i)
00487         # note that at this point the items may only be buffered, so the
00488         # process cannot shutdown until the feeder thread has finished
00489         # pushing items onto the pipe.
00490 
00491     def test_fork(self):
00492         # Old versions of Queue would fail to create a new feeder
00493         # thread for a forked process if the original process had its
00494         # own feeder thread.  This test checks that this no longer
00495         # happens.
00496 
00497         queue = self.Queue()
00498 
00499         # put items on queue so that main process starts a feeder thread
00500         for i in range(10):
00501             queue.put(i)
00502 
00503         # wait to make sure thread starts before we fork a new process
00504         time.sleep(DELTA)
00505 
00506         # fork process
00507         p = self.Process(target=self._test_fork, args=(queue,))
00508         p.start()
00509 
00510         # check that all expected items are in the queue
00511         for i in range(20):
00512             self.assertEqual(queue.get(), i)
00513         self.assertRaises(pyqueue.Empty, queue.get, False)
00514 
00515         p.join()
00516 
00517     def test_qsize(self):
00518         q = self.Queue()
00519         try:
00520             self.assertEqual(q.qsize(), 0)
00521         except NotImplementedError:
00522             return
00523         q.put(1)
00524         self.assertEqual(q.qsize(), 1)
00525         q.put(5)
00526         self.assertEqual(q.qsize(), 2)
00527         q.get()
00528         self.assertEqual(q.qsize(), 1)
00529         q.get()
00530         self.assertEqual(q.qsize(), 0)
00531 
00532     @classmethod
00533     def _test_task_done(cls, q):
00534         for obj in iter(q.get, None):
00535             time.sleep(DELTA)
00536             q.task_done()
00537 
00538     def test_task_done(self):
00539         queue = self.JoinableQueue()
00540 
00541         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
00542             self.skipTest("requires 'queue.task_done()' method")
00543 
00544         workers = [self.Process(target=self._test_task_done, args=(queue,))
00545                    for i in range(4)]
00546 
00547         for p in workers:
00548             p.start()
00549 
00550         for i in range(10):
00551             queue.put(i)
00552 
00553         queue.join()
00554 
00555         for p in workers:
00556             queue.put(None)
00557 
00558         for p in workers:
00559             p.join()
00560 
00561 #
00562 #
00563 #
00564 
00565 class _TestLock(BaseTestCase):
00566 
00567     def test_lock(self):
00568         lock = self.Lock()
00569         self.assertEqual(lock.acquire(), True)
00570         self.assertEqual(lock.acquire(False), False)
00571         self.assertEqual(lock.release(), None)
00572         self.assertRaises((ValueError, threading.ThreadError), lock.release)
00573 
00574     def test_rlock(self):
00575         lock = self.RLock()
00576         self.assertEqual(lock.acquire(), True)
00577         self.assertEqual(lock.acquire(), True)
00578         self.assertEqual(lock.acquire(), True)
00579         self.assertEqual(lock.release(), None)
00580         self.assertEqual(lock.release(), None)
00581         self.assertEqual(lock.release(), None)
00582         self.assertRaises((AssertionError, RuntimeError), lock.release)
00583 
00584     def test_lock_context(self):
00585         with self.Lock():
00586             pass
00587 
00588 
00589 class _TestSemaphore(BaseTestCase):
00590 
00591     def _test_semaphore(self, sem):
00592         self.assertReturnsIfImplemented(2, get_value, sem)
00593         self.assertEqual(sem.acquire(), True)
00594         self.assertReturnsIfImplemented(1, get_value, sem)
00595         self.assertEqual(sem.acquire(), True)
00596         self.assertReturnsIfImplemented(0, get_value, sem)
00597         self.assertEqual(sem.acquire(False), False)
00598         self.assertReturnsIfImplemented(0, get_value, sem)
00599         self.assertEqual(sem.release(), None)
00600         self.assertReturnsIfImplemented(1, get_value, sem)
00601         self.assertEqual(sem.release(), None)
00602         self.assertReturnsIfImplemented(2, get_value, sem)
00603 
00604     def test_semaphore(self):
00605         sem = self.Semaphore(2)
00606         self._test_semaphore(sem)
00607         self.assertEqual(sem.release(), None)
00608         self.assertReturnsIfImplemented(3, get_value, sem)
00609         self.assertEqual(sem.release(), None)
00610         self.assertReturnsIfImplemented(4, get_value, sem)
00611 
00612     def test_bounded_semaphore(self):
00613         sem = self.BoundedSemaphore(2)
00614         self._test_semaphore(sem)
00615         # Currently fails on OS/X
00616         #if HAVE_GETVALUE:
00617         #    self.assertRaises(ValueError, sem.release)
00618         #    self.assertReturnsIfImplemented(2, get_value, sem)
00619 
00620     def test_timeout(self):
00621         if self.TYPE != 'processes':
00622             return
00623 
00624         sem = self.Semaphore(0)
00625         acquire = TimingWrapper(sem.acquire)
00626 
00627         self.assertEqual(acquire(False), False)
00628         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
00629 
00630         self.assertEqual(acquire(False, None), False)
00631         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
00632 
00633         self.assertEqual(acquire(False, TIMEOUT1), False)
00634         self.assertTimingAlmostEqual(acquire.elapsed, 0)
00635 
00636         self.assertEqual(acquire(True, TIMEOUT2), False)
00637         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
00638 
00639         self.assertEqual(acquire(timeout=TIMEOUT3), False)
00640         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
00641 
00642 
00643 class _TestCondition(BaseTestCase):
00644 
00645     @classmethod
00646     def f(cls, cond, sleeping, woken, timeout=None):
00647         cond.acquire()
00648         sleeping.release()
00649         cond.wait(timeout)
00650         woken.release()
00651         cond.release()
00652 
00653     def check_invariant(self, cond):
00654         # this is only supposed to succeed when there are no sleepers
00655         if self.TYPE == 'processes':
00656             try:
00657                 sleepers = (cond._sleeping_count.get_value() -
00658                             cond._woken_count.get_value())
00659                 self.assertEqual(sleepers, 0)
00660                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
00661             except NotImplementedError:
00662                 pass
00663 
00664     def test_notify(self):
00665         cond = self.Condition()
00666         sleeping = self.Semaphore(0)
00667         woken = self.Semaphore(0)
00668 
00669         p = self.Process(target=self.f, args=(cond, sleeping, woken))
00670         p.daemon = True
00671         p.start()
00672 
00673         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
00674         p.daemon = True
00675         p.start()
00676 
00677         # wait for both children to start sleeping
00678         sleeping.acquire()
00679         sleeping.acquire()
00680 
00681         # check no process/thread has woken up
00682         time.sleep(DELTA)
00683         self.assertReturnsIfImplemented(0, get_value, woken)
00684 
00685         # wake up one process/thread
00686         cond.acquire()
00687         cond.notify()
00688         cond.release()
00689 
00690         # check one process/thread has woken up
00691         time.sleep(DELTA)
00692         self.assertReturnsIfImplemented(1, get_value, woken)
00693 
00694         # wake up another
00695         cond.acquire()
00696         cond.notify()
00697         cond.release()
00698 
00699         # check other has woken up
00700         time.sleep(DELTA)
00701         self.assertReturnsIfImplemented(2, get_value, woken)
00702 
00703         # check state is not mucked up
00704         self.check_invariant(cond)
00705         p.join()
00706 
00707     def test_notify_all(self):
00708         cond = self.Condition()
00709         sleeping = self.Semaphore(0)
00710         woken = self.Semaphore(0)
00711 
00712         # start some threads/processes which will timeout
00713         for i in range(3):
00714             p = self.Process(target=self.f,
00715                              args=(cond, sleeping, woken, TIMEOUT1))
00716             p.daemon = True
00717             p.start()
00718 
00719             t = threading.Thread(target=self.f,
00720                                  args=(cond, sleeping, woken, TIMEOUT1))
00721             t.daemon = True
00722             t.start()
00723 
00724         # wait for them all to sleep
00725         for i in range(6):
00726             sleeping.acquire()
00727 
00728         # check they have all timed out
00729         for i in range(6):
00730             woken.acquire()
00731         self.assertReturnsIfImplemented(0, get_value, woken)
00732 
00733         # check state is not mucked up
00734         self.check_invariant(cond)
00735 
00736         # start some more threads/processes
00737         for i in range(3):
00738             p = self.Process(target=self.f, args=(cond, sleeping, woken))
00739             p.daemon = True
00740             p.start()
00741 
00742             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
00743             t.daemon = True
00744             t.start()
00745 
00746         # wait for them to all sleep
00747         for i in range(6):
00748             sleeping.acquire()
00749 
00750         # check no process/thread has woken up
00751         time.sleep(DELTA)
00752         self.assertReturnsIfImplemented(0, get_value, woken)
00753 
00754         # wake them all up
00755         cond.acquire()
00756         cond.notify_all()
00757         cond.release()
00758 
00759         # check they have all woken
00760         for i in range(10):
00761             try:
00762                 if get_value(woken) == 6:
00763                     break
00764             except NotImplementedError:
00765                 break
00766             time.sleep(DELTA)
00767         self.assertReturnsIfImplemented(6, get_value, woken)
00768 
00769         # check state is not mucked up
00770         self.check_invariant(cond)
00771 
00772     def test_timeout(self):
00773         cond = self.Condition()
00774         wait = TimingWrapper(cond.wait)
00775         cond.acquire()
00776         res = wait(TIMEOUT1)
00777         cond.release()
00778         self.assertEqual(res, False)
00779         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
00780 
00781 
00782 class _TestEvent(BaseTestCase):
00783 
00784     @classmethod
00785     def _test_event(cls, event):
00786         time.sleep(TIMEOUT2)
00787         event.set()
00788 
00789     def test_event(self):
00790         event = self.Event()
00791         wait = TimingWrapper(event.wait)
00792 
00793         # Removed temporarily, due to API shear, this does not
00794         # work with threading._Event objects. is_set == isSet
00795         self.assertEqual(event.is_set(), False)
00796 
00797         # Removed, threading.Event.wait() will return the value of the __flag
00798         # instead of None. API Shear with the semaphore backed mp.Event
00799         self.assertEqual(wait(0.0), False)
00800         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
00801         self.assertEqual(wait(TIMEOUT1), False)
00802         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
00803 
00804         event.set()
00805 
00806         # See note above on the API differences
00807         self.assertEqual(event.is_set(), True)
00808         self.assertEqual(wait(), True)
00809         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
00810         self.assertEqual(wait(TIMEOUT1), True)
00811         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
00812         # self.assertEqual(event.is_set(), True)
00813 
00814         event.clear()
00815 
00816         #self.assertEqual(event.is_set(), False)
00817 
00818         self.Process(target=self._test_event, args=(event,)).start()
00819         self.assertEqual(wait(), True)
00820 
00821 #
00822 #
00823 #
00824 
00825 class _TestValue(BaseTestCase):
00826 
00827     ALLOWED_TYPES = ('processes',)
00828 
00829     codes_values = [
00830         ('i', 4343, 24234),
00831         ('d', 3.625, -4.25),
00832         ('h', -232, 234),
00833         ('c', latin('x'), latin('y'))
00834         ]
00835 
00836     def setUp(self):
00837         if not HAS_SHAREDCTYPES:
00838             self.skipTest("requires multiprocessing.sharedctypes")
00839 
00840     @classmethod
00841     def _test(cls, values):
00842         for sv, cv in zip(values, cls.codes_values):
00843             sv.value = cv[2]
00844 
00845 
00846     def test_value(self, raw=False):
00847         if raw:
00848             values = [self.RawValue(code, value)
00849                       for code, value, _ in self.codes_values]
00850         else:
00851             values = [self.Value(code, value)
00852                       for code, value, _ in self.codes_values]
00853 
00854         for sv, cv in zip(values, self.codes_values):
00855             self.assertEqual(sv.value, cv[1])
00856 
00857         proc = self.Process(target=self._test, args=(values,))
00858         proc.start()
00859         proc.join()
00860 
00861         for sv, cv in zip(values, self.codes_values):
00862             self.assertEqual(sv.value, cv[2])
00863 
00864     def test_rawvalue(self):
00865         self.test_value(raw=True)
00866 
00867     def test_getobj_getlock(self):
00868         val1 = self.Value('i', 5)
00869         lock1 = val1.get_lock()
00870         obj1 = val1.get_obj()
00871 
00872         val2 = self.Value('i', 5, lock=None)
00873         lock2 = val2.get_lock()
00874         obj2 = val2.get_obj()
00875 
00876         lock = self.Lock()
00877         val3 = self.Value('i', 5, lock=lock)
00878         lock3 = val3.get_lock()
00879         obj3 = val3.get_obj()
00880         self.assertEqual(lock, lock3)
00881 
00882         arr4 = self.Value('i', 5, lock=False)
00883         self.assertFalse(hasattr(arr4, 'get_lock'))
00884         self.assertFalse(hasattr(arr4, 'get_obj'))
00885 
00886         self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
00887 
00888         arr5 = self.RawValue('i', 5)
00889         self.assertFalse(hasattr(arr5, 'get_lock'))
00890         self.assertFalse(hasattr(arr5, 'get_obj'))
00891 
00892 
00893 class _TestArray(BaseTestCase):
00894 
00895     ALLOWED_TYPES = ('processes',)
00896 
00897     @classmethod
00898     def f(cls, seq):
00899         for i in range(1, len(seq)):
00900             seq[i] += seq[i-1]
00901 
00902     @unittest.skipIf(c_int is None, "requires _ctypes")
00903     def test_array(self, raw=False):
00904         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
00905         if raw:
00906             arr = self.RawArray('i', seq)
00907         else:
00908             arr = self.Array('i', seq)
00909 
00910         self.assertEqual(len(arr), len(seq))
00911         self.assertEqual(arr[3], seq[3])
00912         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
00913 
00914         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
00915 
00916         self.assertEqual(list(arr[:]), seq)
00917 
00918         self.f(seq)
00919 
00920         p = self.Process(target=self.f, args=(arr,))
00921         p.start()
00922         p.join()
00923 
00924         self.assertEqual(list(arr[:]), seq)
00925 
00926     @unittest.skipIf(c_int is None, "requires _ctypes")
00927     def test_array_from_size(self):
00928         size = 10
00929         # Test for zeroing (see issue #11675).
00930         # The repetition below strengthens the test by increasing the chances
00931         # of previously allocated non-zero memory being used for the new array
00932         # on the 2nd and 3rd loops.
00933         for _ in range(3):
00934             arr = self.Array('i', size)
00935             self.assertEqual(len(arr), size)
00936             self.assertEqual(list(arr), [0] * size)
00937             arr[:] = range(10)
00938             self.assertEqual(list(arr), list(range(10)))
00939             del arr
00940 
00941     @unittest.skipIf(c_int is None, "requires _ctypes")
00942     def test_rawarray(self):
00943         self.test_array(raw=True)
00944 
00945     @unittest.skipIf(c_int is None, "requires _ctypes")
00946     def test_getobj_getlock_obj(self):
00947         arr1 = self.Array('i', list(range(10)))
00948         lock1 = arr1.get_lock()
00949         obj1 = arr1.get_obj()
00950 
00951         arr2 = self.Array('i', list(range(10)), lock=None)
00952         lock2 = arr2.get_lock()
00953         obj2 = arr2.get_obj()
00954 
00955         lock = self.Lock()
00956         arr3 = self.Array('i', list(range(10)), lock=lock)
00957         lock3 = arr3.get_lock()
00958         obj3 = arr3.get_obj()
00959         self.assertEqual(lock, lock3)
00960 
00961         arr4 = self.Array('i', range(10), lock=False)
00962         self.assertFalse(hasattr(arr4, 'get_lock'))
00963         self.assertFalse(hasattr(arr4, 'get_obj'))
00964         self.assertRaises(AttributeError,
00965                           self.Array, 'i', range(10), lock='notalock')
00966 
00967         arr5 = self.RawArray('i', range(10))
00968         self.assertFalse(hasattr(arr5, 'get_lock'))
00969         self.assertFalse(hasattr(arr5, 'get_obj'))
00970 
00971 #
00972 #
00973 #
00974 
00975 class _TestContainers(BaseTestCase):
00976 
00977     ALLOWED_TYPES = ('manager',)
00978 
00979     def test_list(self):
00980         a = self.list(list(range(10)))
00981         self.assertEqual(a[:], list(range(10)))
00982 
00983         b = self.list()
00984         self.assertEqual(b[:], [])
00985 
00986         b.extend(list(range(5)))
00987         self.assertEqual(b[:], list(range(5)))
00988 
00989         self.assertEqual(b[2], 2)
00990         self.assertEqual(b[2:10], [2,3,4])
00991 
00992         b *= 2
00993         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
00994 
00995         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
00996 
00997         self.assertEqual(a[:], list(range(10)))
00998 
00999         d = [a, b]
01000         e = self.list(d)
01001         self.assertEqual(
01002             e[:],
01003             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
01004             )
01005 
01006         f = self.list([a])
01007         a.append('hello')
01008         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
01009 
01010     def test_dict(self):
01011         d = self.dict()
01012         indices = list(range(65, 70))
01013         for i in indices:
01014             d[i] = chr(i)
01015         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
01016         self.assertEqual(sorted(d.keys()), indices)
01017         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
01018         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
01019 
01020     def test_namespace(self):
01021         n = self.Namespace()
01022         n.name = 'Bob'
01023         n.job = 'Builder'
01024         n._hidden = 'hidden'
01025         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
01026         del n.job
01027         self.assertEqual(str(n), "Namespace(name='Bob')")
01028         self.assertTrue(hasattr(n, 'name'))
01029         self.assertTrue(not hasattr(n, 'job'))
01030 
01031 #
01032 #
01033 #
01034 
01035 def sqr(x, wait=0.0):
01036     time.sleep(wait)
01037     return x*x
01038 
01039 class _TestPool(BaseTestCase):
01040 
01041     def test_apply(self):
01042         papply = self.pool.apply
01043         self.assertEqual(papply(sqr, (5,)), sqr(5))
01044         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
01045 
01046     def test_map(self):
01047         pmap = self.pool.map
01048         self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
01049         self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
01050                          list(map(sqr, list(range(100)))))
01051 
01052     def test_map_chunksize(self):
01053         try:
01054             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
01055         except multiprocessing.TimeoutError:
01056             self.fail("pool.map_async with chunksize stalled on null list")
01057 
01058     def test_async(self):
01059         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
01060         get = TimingWrapper(res.get)
01061         self.assertEqual(get(), 49)
01062         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
01063 
01064     def test_async_timeout(self):
01065         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
01066         get = TimingWrapper(res.get)
01067         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
01068         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
01069 
01070     def test_imap(self):
01071         it = self.pool.imap(sqr, list(range(10)))
01072         self.assertEqual(list(it), list(map(sqr, list(range(10)))))
01073 
01074         it = self.pool.imap(sqr, list(range(10)))
01075         for i in range(10):
01076             self.assertEqual(next(it), i*i)
01077         self.assertRaises(StopIteration, it.__next__)
01078 
01079         it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
01080         for i in range(1000):
01081             self.assertEqual(next(it), i*i)
01082         self.assertRaises(StopIteration, it.__next__)
01083 
01084     def test_imap_unordered(self):
01085         it = self.pool.imap_unordered(sqr, list(range(1000)))
01086         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
01087 
01088         it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
01089         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
01090 
01091     def test_make_pool(self):
01092         self.assertRaises(ValueError, multiprocessing.Pool, -1)
01093         self.assertRaises(ValueError, multiprocessing.Pool, 0)
01094 
01095         p = multiprocessing.Pool(3)
01096         self.assertEqual(3, len(p._pool))
01097         p.close()
01098         p.join()
01099 
01100     def test_terminate(self):
01101         if self.TYPE == 'manager':
01102             # On Unix a forked process increfs each shared object to
01103             # which its parent process held a reference.  If the
01104             # forked process gets terminated then there is likely to
01105             # be a reference leak.  So to prevent
01106             # _TestZZZNumberOfObjects from failing we skip this test
01107             # when using a manager.
01108             return
01109 
01110         result = self.pool.map_async(
01111             time.sleep, [0.1 for i in range(10000)], chunksize=1
01112             )
01113         self.pool.terminate()
01114         join = TimingWrapper(self.pool.join)
01115         join()
01116         self.assertLess(join.elapsed, 0.5)
01117 
01118 def raising():
01119     raise KeyError("key")
01120 
01121 def unpickleable_result():
01122     return lambda: 42
01123 
01124 class _TestPoolWorkerErrors(BaseTestCase):
01125     ALLOWED_TYPES = ('processes', )
01126 
01127     def test_async_error_callback(self):
01128         p = multiprocessing.Pool(2)
01129 
01130         scratchpad = [None]
01131         def errback(exc):
01132             scratchpad[0] = exc
01133 
01134         res = p.apply_async(raising, error_callback=errback)
01135         self.assertRaises(KeyError, res.get)
01136         self.assertTrue(scratchpad[0])
01137         self.assertIsInstance(scratchpad[0], KeyError)
01138 
01139         p.close()
01140         p.join()
01141 
01142     def test_unpickleable_result(self):
01143         from multiprocessing.pool import MaybeEncodingError
01144         p = multiprocessing.Pool(2)
01145 
01146         # Make sure we don't lose pool processes because of encoding errors.
01147         for iteration in range(20):
01148 
01149             scratchpad = [None]
01150             def errback(exc):
01151                 scratchpad[0] = exc
01152 
01153             res = p.apply_async(unpickleable_result, error_callback=errback)
01154             self.assertRaises(MaybeEncodingError, res.get)
01155             wrapped = scratchpad[0]
01156             self.assertTrue(wrapped)
01157             self.assertIsInstance(scratchpad[0], MaybeEncodingError)
01158             self.assertIsNotNone(wrapped.exc)
01159             self.assertIsNotNone(wrapped.value)
01160 
01161         p.close()
01162         p.join()
01163 
01164 class _TestPoolWorkerLifetime(BaseTestCase):
01165     ALLOWED_TYPES = ('processes', )
01166 
01167     def test_pool_worker_lifetime(self):
01168         p = multiprocessing.Pool(3, maxtasksperchild=10)
01169         self.assertEqual(3, len(p._pool))
01170         origworkerpids = [w.pid for w in p._pool]
01171         # Run many tasks so each worker gets replaced (hopefully)
01172         results = []
01173         for i in range(100):
01174             results.append(p.apply_async(sqr, (i, )))
01175         # Fetch the results and verify we got the right answers,
01176         # also ensuring all the tasks have completed.
01177         for (j, res) in enumerate(results):
01178             self.assertEqual(res.get(), sqr(j))
01179         # Refill the pool
01180         p._repopulate_pool()
01181         # Wait until all workers are alive
01182         # (countdown * DELTA = 5 seconds max startup process time)
01183         countdown = 50
01184         while countdown and not all(w.is_alive() for w in p._pool):
01185             countdown -= 1
01186             time.sleep(DELTA)
01187         finalworkerpids = [w.pid for w in p._pool]
01188         # All pids should be assigned.  See issue #7805.
01189         self.assertNotIn(None, origworkerpids)
01190         self.assertNotIn(None, finalworkerpids)
01191         # Finally, check that the worker pids have changed
01192         self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
01193         p.close()
01194         p.join()
01195 
01196 #
01197 # Test that manager has expected number of shared objects left
01198 #
01199 
01200 class _TestZZZNumberOfObjects(BaseTestCase):
01201     # Because test cases are sorted alphabetically, this one will get
01202     # run after all the other tests for the manager.  It tests that
01203     # there have been no "reference leaks" for the manager's shared
01204     # objects.  Note the comment in _TestPool.test_terminate().
01205     ALLOWED_TYPES = ('manager',)
01206 
01207     def test_number_of_objects(self):
01208         EXPECTED_NUMBER = 1                # the pool object is still alive
01209         multiprocessing.active_children()  # discard dead process objs
01210         gc.collect()                       # do garbage collection
01211         refs = self.manager._number_of_objects()
01212         debug_info = self.manager._debug_info()
01213         if refs != EXPECTED_NUMBER:
01214             print(self.manager._debug_info())
01215             print(debug_info)
01216 
01217         self.assertEqual(refs, EXPECTED_NUMBER)
01218 
01219 #
01220 # Test of creating a customized manager class
01221 #
01222 
01223 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
01224 
01225 class FooBar(object):
01226     def f(self):
01227         return 'f()'
01228     def g(self):
01229         raise ValueError
01230     def _h(self):
01231         return '_h()'
01232 
01233 def baz():
01234     for i in range(10):
01235         yield i*i
01236 
01237 class IteratorProxy(BaseProxy):
01238     _exposed_ = ('__next__',)
01239     def __iter__(self):
01240         return self
01241     def __next__(self):
01242         return self._callmethod('__next__')
01243 
01244 class MyManager(BaseManager):
01245     pass
01246 
01247 MyManager.register('Foo', callable=FooBar)
01248 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
01249 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
01250 
01251 
01252 class _TestMyManager(BaseTestCase):
01253 
01254     ALLOWED_TYPES = ('manager',)
01255 
01256     def test_mymanager(self):
01257         manager = MyManager()
01258         manager.start()
01259 
01260         foo = manager.Foo()
01261         bar = manager.Bar()
01262         baz = manager.baz()
01263 
01264         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
01265         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
01266 
01267         self.assertEqual(foo_methods, ['f', 'g'])
01268         self.assertEqual(bar_methods, ['f', '_h'])
01269 
01270         self.assertEqual(foo.f(), 'f()')
01271         self.assertRaises(ValueError, foo.g)
01272         self.assertEqual(foo._callmethod('f'), 'f()')
01273         self.assertRaises(RemoteError, foo._callmethod, '_h')
01274 
01275         self.assertEqual(bar.f(), 'f()')
01276         self.assertEqual(bar._h(), '_h()')
01277         self.assertEqual(bar._callmethod('f'), 'f()')
01278         self.assertEqual(bar._callmethod('_h'), '_h()')
01279 
01280         self.assertEqual(list(baz), [i*i for i in range(10)])
01281 
01282         manager.shutdown()
01283 
01284 #
01285 # Test of connecting to a remote server and using xmlrpclib for serialization
01286 #
01287 
01288 _queue = pyqueue.Queue()
01289 def get_queue():
01290     return _queue
01291 
01292 class QueueManager(BaseManager):
01293     '''manager class used by server process'''
01294 QueueManager.register('get_queue', callable=get_queue)
01295 
01296 class QueueManager2(BaseManager):
01297     '''manager class which specifies the same interface as QueueManager'''
01298 QueueManager2.register('get_queue')
01299 
01300 
01301 SERIALIZER = 'xmlrpclib'
01302 
01303 class _TestRemoteManager(BaseTestCase):
01304 
01305     ALLOWED_TYPES = ('manager',)
01306 
01307     @classmethod
01308     def _putter(cls, address, authkey):
01309         manager = QueueManager2(
01310             address=address, authkey=authkey, serializer=SERIALIZER
01311             )
01312         manager.connect()
01313         queue = manager.get_queue()
01314         queue.put(('hello world', None, True, 2.25))
01315 
01316     def test_remote(self):
01317         authkey = os.urandom(32)
01318 
01319         manager = QueueManager(
01320             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
01321             )
01322         manager.start()
01323 
01324         p = self.Process(target=self._putter, args=(manager.address, authkey))
01325         p.start()
01326 
01327         manager2 = QueueManager2(
01328             address=manager.address, authkey=authkey, serializer=SERIALIZER
01329             )
01330         manager2.connect()
01331         queue = manager2.get_queue()
01332 
01333         # Note that xmlrpclib will deserialize object as a list not a tuple
01334         self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
01335 
01336         # Because we are using xmlrpclib for serialization instead of
01337         # pickle this will cause a serialization error.
01338         self.assertRaises(Exception, queue.put, time.sleep)
01339 
01340         # Make queue finalizer run before the server is stopped
01341         del queue
01342         manager.shutdown()
01343 
01344 class _TestManagerRestart(BaseTestCase):
01345 
01346     @classmethod
01347     def _putter(cls, address, authkey):
01348         manager = QueueManager(
01349             address=address, authkey=authkey, serializer=SERIALIZER)
01350         manager.connect()
01351         queue = manager.get_queue()
01352         queue.put('hello world')
01353 
01354     def test_rapid_restart(self):
01355         authkey = os.urandom(32)
01356         manager = QueueManager(
01357             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
01358         srvr = manager.get_server()
01359         addr = srvr.address
01360         # Close the connection.Listener socket which gets opened as a part
01361         # of manager.get_server(). It's not needed for the test.
01362         srvr.listener.close()
01363         manager.start()
01364 
01365         p = self.Process(target=self._putter, args=(manager.address, authkey))
01366         p.start()
01367         queue = manager.get_queue()
01368         self.assertEqual(queue.get(), 'hello world')
01369         del queue
01370         manager.shutdown()
01371         manager = QueueManager(
01372             address=addr, authkey=authkey, serializer=SERIALIZER)
01373         try:
01374             manager.start()
01375         except IOError as e:
01376             if e.errno != errno.EADDRINUSE:
01377                 raise
01378             # Retry after some time, in case the old socket was lingering
01379             # (sporadic failure on buildbots)
01380             time.sleep(1.0)
01381             manager = QueueManager(
01382                 address=addr, authkey=authkey, serializer=SERIALIZER)
01383         manager.shutdown()
01384 
01385 #
01386 #
01387 #
01388 
01389 SENTINEL = latin('')
01390 
01391 class _TestConnection(BaseTestCase):
01392 
01393     ALLOWED_TYPES = ('processes', 'threads')
01394 
01395     @classmethod
01396     def _echo(cls, conn):
01397         for msg in iter(conn.recv_bytes, SENTINEL):
01398             conn.send_bytes(msg)
01399         conn.close()
01400 
01401     def test_connection(self):
01402         conn, child_conn = self.Pipe()
01403 
01404         p = self.Process(target=self._echo, args=(child_conn,))
01405         p.daemon = True
01406         p.start()
01407 
01408         seq = [1, 2.25, None]
01409         msg = latin('hello world')
01410         longmsg = msg * 10
01411         arr = array.array('i', list(range(4)))
01412 
01413         if self.TYPE == 'processes':
01414             self.assertEqual(type(conn.fileno()), int)
01415 
01416         self.assertEqual(conn.send(seq), None)
01417         self.assertEqual(conn.recv(), seq)
01418 
01419         self.assertEqual(conn.send_bytes(msg), None)
01420         self.assertEqual(conn.recv_bytes(), msg)
01421 
01422         if self.TYPE == 'processes':
01423             buffer = array.array('i', [0]*10)
01424             expected = list(arr) + [0] * (10 - len(arr))
01425             self.assertEqual(conn.send_bytes(arr), None)
01426             self.assertEqual(conn.recv_bytes_into(buffer),
01427                              len(arr) * buffer.itemsize)
01428             self.assertEqual(list(buffer), expected)
01429 
01430             buffer = array.array('i', [0]*10)
01431             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
01432             self.assertEqual(conn.send_bytes(arr), None)
01433             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
01434                              len(arr) * buffer.itemsize)
01435             self.assertEqual(list(buffer), expected)
01436 
01437             buffer = bytearray(latin(' ' * 40))
01438             self.assertEqual(conn.send_bytes(longmsg), None)
01439             try:
01440                 res = conn.recv_bytes_into(buffer)
01441             except multiprocessing.BufferTooShort as e:
01442                 self.assertEqual(e.args, (longmsg,))
01443             else:
01444                 self.fail('expected BufferTooShort, got %s' % res)
01445 
01446         poll = TimingWrapper(conn.poll)
01447 
01448         self.assertEqual(poll(), False)
01449         self.assertTimingAlmostEqual(poll.elapsed, 0)
01450 
01451         self.assertEqual(poll(TIMEOUT1), False)
01452         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
01453 
01454         conn.send(None)
01455 
01456         self.assertEqual(poll(TIMEOUT1), True)
01457         self.assertTimingAlmostEqual(poll.elapsed, 0)
01458 
01459         self.assertEqual(conn.recv(), None)
01460 
01461         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
01462         conn.send_bytes(really_big_msg)
01463         self.assertEqual(conn.recv_bytes(), really_big_msg)
01464 
01465         conn.send_bytes(SENTINEL)                          # tell child to quit
01466         child_conn.close()
01467 
01468         if self.TYPE == 'processes':
01469             self.assertEqual(conn.readable, True)
01470             self.assertEqual(conn.writable, True)
01471             self.assertRaises(EOFError, conn.recv)
01472             self.assertRaises(EOFError, conn.recv_bytes)
01473 
01474         p.join()
01475 
01476     def test_duplex_false(self):
01477         reader, writer = self.Pipe(duplex=False)
01478         self.assertEqual(writer.send(1), None)
01479         self.assertEqual(reader.recv(), 1)
01480         if self.TYPE == 'processes':
01481             self.assertEqual(reader.readable, True)
01482             self.assertEqual(reader.writable, False)
01483             self.assertEqual(writer.readable, False)
01484             self.assertEqual(writer.writable, True)
01485             self.assertRaises(IOError, reader.send, 2)
01486             self.assertRaises(IOError, writer.recv)
01487             self.assertRaises(IOError, writer.poll)
01488 
01489     def test_spawn_close(self):
01490         # We test that a pipe connection can be closed by parent
01491         # process immediately after child is spawned.  On Windows this
01492         # would have sometimes failed on old versions because
01493         # child_conn would be closed before the child got a chance to
01494         # duplicate it.
01495         conn, child_conn = self.Pipe()
01496 
01497         p = self.Process(target=self._echo, args=(child_conn,))
01498         p.start()
01499         child_conn.close()    # this might complete before child initializes
01500 
01501         msg = latin('hello')
01502         conn.send_bytes(msg)
01503         self.assertEqual(conn.recv_bytes(), msg)
01504 
01505         conn.send_bytes(SENTINEL)
01506         conn.close()
01507         p.join()
01508 
01509     def test_sendbytes(self):
01510         if self.TYPE != 'processes':
01511             return
01512 
01513         msg = latin('abcdefghijklmnopqrstuvwxyz')
01514         a, b = self.Pipe()
01515 
01516         a.send_bytes(msg)
01517         self.assertEqual(b.recv_bytes(), msg)
01518 
01519         a.send_bytes(msg, 5)
01520         self.assertEqual(b.recv_bytes(), msg[5:])
01521 
01522         a.send_bytes(msg, 7, 8)
01523         self.assertEqual(b.recv_bytes(), msg[7:7+8])
01524 
01525         a.send_bytes(msg, 26)
01526         self.assertEqual(b.recv_bytes(), latin(''))
01527 
01528         a.send_bytes(msg, 26, 0)
01529         self.assertEqual(b.recv_bytes(), latin(''))
01530 
01531         self.assertRaises(ValueError, a.send_bytes, msg, 27)
01532 
01533         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
01534 
01535         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
01536 
01537         self.assertRaises(ValueError, a.send_bytes, msg, -1)
01538 
01539         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
01540 
01541 class _TestListenerClient(BaseTestCase):
01542 
01543     ALLOWED_TYPES = ('processes', 'threads')
01544 
01545     @classmethod
01546     def _test(cls, address):
01547         conn = cls.connection.Client(address)
01548         conn.send('hello')
01549         conn.close()
01550 
01551     def test_listener_client(self):
01552         for family in self.connection.families:
01553             l = self.connection.Listener(family=family)
01554             p = self.Process(target=self._test, args=(l.address,))
01555             p.daemon = True
01556             p.start()
01557             conn = l.accept()
01558             self.assertEqual(conn.recv(), 'hello')
01559             p.join()
01560             l.close()
01561 #
01562 # Test of sending connection and socket objects between processes
01563 #
01564 """
01565 class _TestPicklingConnections(BaseTestCase):
01566 
01567     ALLOWED_TYPES = ('processes',)
01568 
01569     def _listener(self, conn, families):
01570         for fam in families:
01571             l = self.connection.Listener(family=fam)
01572             conn.send(l.address)
01573             new_conn = l.accept()
01574             conn.send(new_conn)
01575 
01576         if self.TYPE == 'processes':
01577             l = socket.socket()
01578             l.bind(('localhost', 0))
01579             conn.send(l.getsockname())
01580             l.listen(1)
01581             new_conn, addr = l.accept()
01582             conn.send(new_conn)
01583 
01584         conn.recv()
01585 
01586     def _remote(self, conn):
01587         for (address, msg) in iter(conn.recv, None):
01588             client = self.connection.Client(address)
01589             client.send(msg.upper())
01590             client.close()
01591 
01592         if self.TYPE == 'processes':
01593             address, msg = conn.recv()
01594             client = socket.socket()
01595             client.connect(address)
01596             client.sendall(msg.upper())
01597             client.close()
01598 
01599         conn.close()
01600 
01601     def test_pickling(self):
01602         try:
01603             multiprocessing.allow_connection_pickling()
01604         except ImportError:
01605             return
01606 
01607         families = self.connection.families
01608 
01609         lconn, lconn0 = self.Pipe()
01610         lp = self.Process(target=self._listener, args=(lconn0, families))
01611         lp.start()
01612         lconn0.close()
01613 
01614         rconn, rconn0 = self.Pipe()
01615         rp = self.Process(target=self._remote, args=(rconn0,))
01616         rp.start()
01617         rconn0.close()
01618 
01619         for fam in families:
01620             msg = ('This connection uses family %s' % fam).encode('ascii')
01621             address = lconn.recv()
01622             rconn.send((address, msg))
01623             new_conn = lconn.recv()
01624             self.assertEqual(new_conn.recv(), msg.upper())
01625 
01626         rconn.send(None)
01627 
01628         if self.TYPE == 'processes':
01629             msg = latin('This connection uses a normal socket')
01630             address = lconn.recv()
01631             rconn.send((address, msg))
01632             if hasattr(socket, 'fromfd'):
01633                 new_conn = lconn.recv()
01634                 self.assertEqual(new_conn.recv(100), msg.upper())
01635             else:
01636                 # XXX On Windows with Py2.6 need to backport fromfd()
01637                 discard = lconn.recv_bytes()
01638 
01639         lconn.send(None)
01640 
01641         rconn.close()
01642         lconn.close()
01643 
01644         lp.join()
01645         rp.join()
01646 """
01647 #
01648 #
01649 #
01650 
01651 class _TestHeap(BaseTestCase):
01652 
01653     ALLOWED_TYPES = ('processes',)
01654 
01655     def test_heap(self):
01656         iterations = 5000
01657         maxblocks = 50
01658         blocks = []
01659 
01660         # create and destroy lots of blocks of different sizes
01661         for i in range(iterations):
01662             size = int(random.lognormvariate(0, 1) * 1000)
01663             b = multiprocessing.heap.BufferWrapper(size)
01664             blocks.append(b)
01665             if len(blocks) > maxblocks:
01666                 i = random.randrange(maxblocks)
01667                 del blocks[i]
01668 
01669         # get the heap object
01670         heap = multiprocessing.heap.BufferWrapper._heap
01671 
01672         # verify the state of the heap
01673         all = []
01674         occupied = 0
01675         heap._lock.acquire()
01676         self.addCleanup(heap._lock.release)
01677         for L in list(heap._len_to_seq.values()):
01678             for arena, start, stop in L:
01679                 all.append((heap._arenas.index(arena), start, stop,
01680                             stop-start, 'free'))
01681         for arena, start, stop in heap._allocated_blocks:
01682             all.append((heap._arenas.index(arena), start, stop,
01683                         stop-start, 'occupied'))
01684             occupied += (stop-start)
01685 
01686         all.sort()
01687 
01688         for i in range(len(all)-1):
01689             (arena, start, stop) = all[i][:3]
01690             (narena, nstart, nstop) = all[i+1][:3]
01691             self.assertTrue((arena != narena and nstart == 0) or
01692                             (stop == nstart))
01693 
01694     def test_free_from_gc(self):
01695         # Check that freeing of blocks by the garbage collector doesn't deadlock
01696         # (issue #12352).
01697         # Make sure the GC is enabled, and set lower collection thresholds to
01698         # make collections more frequent (and increase the probability of
01699         # deadlock).
01700         if not gc.isenabled():
01701             gc.enable()
01702             self.addCleanup(gc.disable)
01703         thresholds = gc.get_threshold()
01704         self.addCleanup(gc.set_threshold, *thresholds)
01705         gc.set_threshold(10)
01706 
01707         # perform numerous block allocations, with cyclic references to make
01708         # sure objects are collected asynchronously by the gc
01709         for i in range(5000):
01710             a = multiprocessing.heap.BufferWrapper(1)
01711             b = multiprocessing.heap.BufferWrapper(1)
01712             # circular references
01713             a.buddy = b
01714             b.buddy = a
01715 
01716 #
01717 #
01718 #
01719 
01720 class _Foo(Structure):
01721     _fields_ = [
01722         ('x', c_int),
01723         ('y', c_double)
01724         ]
01725 
01726 class _TestSharedCTypes(BaseTestCase):
01727 
01728     ALLOWED_TYPES = ('processes',)
01729 
01730     def setUp(self):
01731         if not HAS_SHAREDCTYPES:
01732             self.skipTest("requires multiprocessing.sharedctypes")
01733 
01734     @classmethod
01735     def _double(cls, x, y, foo, arr, string):
01736         x.value *= 2
01737         y.value *= 2
01738         foo.x *= 2
01739         foo.y *= 2
01740         string.value *= 2
01741         for i in range(len(arr)):
01742             arr[i] *= 2
01743 
01744     def test_sharedctypes(self, lock=False):
01745         x = Value('i', 7, lock=lock)
01746         y = Value(c_double, 1.0/3.0, lock=lock)
01747         foo = Value(_Foo, 3, 2, lock=lock)
01748         arr = self.Array('d', list(range(10)), lock=lock)
01749         string = self.Array('c', 20, lock=lock)
01750         string.value = latin('hello')
01751 
01752         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
01753         p.start()
01754         p.join()
01755 
01756         self.assertEqual(x.value, 14)
01757         self.assertAlmostEqual(y.value, 2.0/3.0)
01758         self.assertEqual(foo.x, 6)
01759         self.assertAlmostEqual(foo.y, 4.0)
01760         for i in range(10):
01761             self.assertAlmostEqual(arr[i], i*2)
01762         self.assertEqual(string.value, latin('hellohello'))
01763 
01764     def test_synchronize(self):
01765         self.test_sharedctypes(lock=True)
01766 
01767     def test_copy(self):
01768         foo = _Foo(2, 5.0)
01769         bar = copy(foo)
01770         foo.x = 0
01771         foo.y = 0
01772         self.assertEqual(bar.x, 2)
01773         self.assertAlmostEqual(bar.y, 5.0)
01774 
01775 #
01776 #
01777 #
01778 
01779 class _TestFinalize(BaseTestCase):
01780 
01781     ALLOWED_TYPES = ('processes',)
01782 
01783     @classmethod
01784     def _test_finalize(cls, conn):
01785         class Foo(object):
01786             pass
01787 
01788         a = Foo()
01789         util.Finalize(a, conn.send, args=('a',))
01790         del a           # triggers callback for a
01791 
01792         b = Foo()
01793         close_b = util.Finalize(b, conn.send, args=('b',))
01794         close_b()       # triggers callback for b
01795         close_b()       # does nothing because callback has already been called
01796         del b           # does nothing because callback has already been called
01797 
01798         c = Foo()
01799         util.Finalize(c, conn.send, args=('c',))
01800 
01801         d10 = Foo()
01802         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
01803 
01804         d01 = Foo()
01805         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
01806         d02 = Foo()
01807         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
01808         d03 = Foo()
01809         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
01810 
01811         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
01812 
01813         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
01814 
01815         # call multiprocessing's cleanup function then exit process without
01816         # garbage collecting locals
01817         util._exit_function()
01818         conn.close()
01819         os._exit(0)
01820 
01821     def test_finalize(self):
01822         conn, child_conn = self.Pipe()
01823 
01824         p = self.Process(target=self._test_finalize, args=(child_conn,))
01825         p.start()
01826         p.join()
01827 
01828         result = [obj for obj in iter(conn.recv, 'STOP')]
01829         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
01830 
01831 #
01832 # Test that from ... import * works for each module
01833 #
01834 
01835 class _TestImportStar(BaseTestCase):
01836 
01837     ALLOWED_TYPES = ('processes',)
01838 
01839     def test_import(self):
01840         modules = [
01841             'multiprocessing', 'multiprocessing.connection',
01842             'multiprocessing.heap', 'multiprocessing.managers',
01843             'multiprocessing.pool', 'multiprocessing.process',
01844             'multiprocessing.reduction',
01845             'multiprocessing.synchronize', 'multiprocessing.util'
01846             ]
01847 
01848         if c_int is not None:
01849             # This module requires _ctypes
01850             modules.append('multiprocessing.sharedctypes')
01851 
01852         for name in modules:
01853             __import__(name)
01854             mod = sys.modules[name]
01855 
01856             for attr in getattr(mod, '__all__', ()):
01857                 self.assertTrue(
01858                     hasattr(mod, attr),
01859                     '%r does not have attribute %r' % (mod, attr)
01860                     )
01861 
01862 #
01863 # Quick test that logging works -- does not test logging output
01864 #
01865 
01866 class _TestLogging(BaseTestCase):
01867 
01868     ALLOWED_TYPES = ('processes',)
01869 
01870     def test_enable_logging(self):
01871         logger = multiprocessing.get_logger()
01872         logger.setLevel(util.SUBWARNING)
01873         self.assertTrue(logger is not None)
01874         logger.debug('this will not be printed')
01875         logger.info('nor will this')
01876         logger.setLevel(LOG_LEVEL)
01877 
01878     @classmethod
01879     def _test_level(cls, conn):
01880         logger = multiprocessing.get_logger()
01881         conn.send(logger.getEffectiveLevel())
01882 
01883     def test_level(self):
01884         LEVEL1 = 32
01885         LEVEL2 = 37
01886 
01887         logger = multiprocessing.get_logger()
01888         root_logger = logging.getLogger()
01889         root_level = root_logger.level
01890 
01891         reader, writer = multiprocessing.Pipe(duplex=False)
01892 
01893         logger.setLevel(LEVEL1)
01894         self.Process(target=self._test_level, args=(writer,)).start()
01895         self.assertEqual(LEVEL1, reader.recv())
01896 
01897         logger.setLevel(logging.NOTSET)
01898         root_logger.setLevel(LEVEL2)
01899         self.Process(target=self._test_level, args=(writer,)).start()
01900         self.assertEqual(LEVEL2, reader.recv())
01901 
01902         root_logger.setLevel(root_level)
01903         logger.setLevel(level=LOG_LEVEL)
01904 
01905 
01906 # class _TestLoggingProcessName(BaseTestCase):
01907 #
01908 #     def handle(self, record):
01909 #         assert record.processName == multiprocessing.current_process().name
01910 #         self.__handled = True
01911 #
01912 #     def test_logging(self):
01913 #         handler = logging.Handler()
01914 #         handler.handle = self.handle
01915 #         self.__handled = False
01916 #         # Bypass getLogger() and side-effects
01917 #         logger = logging.getLoggerClass()(
01918 #                 'multiprocessing.test.TestLoggingProcessName')
01919 #         logger.addHandler(handler)
01920 #         logger.propagate = False
01921 #
01922 #         logger.warn('foo')
01923 #         assert self.__handled
01924 
01925 #
01926 # Test to verify handle verification, see issue 3321
01927 #
01928 
01929 class TestInvalidHandle(unittest.TestCase):
01930 
01931     @unittest.skipIf(WIN32, "skipped on Windows")
01932     def test_invalid_handles(self):
01933         conn = _multiprocessing.Connection(44977608)
01934         self.assertRaises(IOError, conn.poll)
01935         self.assertRaises(IOError, _multiprocessing.Connection, -1)
01936 
01937 #
01938 # Functions used to create test cases from the base ones in this module
01939 #
01940 
01941 def get_attributes(Source, names):
01942     d = {}
01943     for name in names:
01944         obj = getattr(Source, name)
01945         if type(obj) == type(get_attributes):
01946             obj = staticmethod(obj)
01947         d[name] = obj
01948     return d
01949 
01950 def create_test_cases(Mixin, type):
01951     result = {}
01952     glob = globals()
01953     Type = type.capitalize()
01954 
01955     for name in list(glob.keys()):
01956         if name.startswith('_Test'):
01957             base = glob[name]
01958             if type in base.ALLOWED_TYPES:
01959                 newname = 'With' + Type + name[1:]
01960                 class Temp(base, unittest.TestCase, Mixin):
01961                     pass
01962                 result[newname] = Temp
01963                 Temp.__name__ = newname
01964                 Temp.__module__ = Mixin.__module__
01965     return result
01966 
01967 #
01968 # Create test cases
01969 #
01970 
01971 class ProcessesMixin(object):
01972     TYPE = 'processes'
01973     Process = multiprocessing.Process
01974     locals().update(get_attributes(multiprocessing, (
01975         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
01976         'Condition', 'Event', 'Value', 'Array', 'RawValue',
01977         'RawArray', 'current_process', 'active_children', 'Pipe',
01978         'connection', 'JoinableQueue'
01979         )))
01980 
01981 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
01982 globals().update(testcases_processes)
01983 
01984 
01985 class ManagerMixin(object):
01986     TYPE = 'manager'
01987     Process = multiprocessing.Process
01988     manager = object.__new__(multiprocessing.managers.SyncManager)
01989     locals().update(get_attributes(manager, (
01990         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
01991        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
01992         'Namespace', 'JoinableQueue'
01993         )))
01994 
01995 testcases_manager = create_test_cases(ManagerMixin, type='manager')
01996 globals().update(testcases_manager)
01997 
01998 
01999 class ThreadsMixin(object):
02000     TYPE = 'threads'
02001     Process = multiprocessing.dummy.Process
02002     locals().update(get_attributes(multiprocessing.dummy, (
02003         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
02004         'Condition', 'Event', 'Value', 'Array', 'current_process',
02005         'active_children', 'Pipe', 'connection', 'dict', 'list',
02006         'Namespace', 'JoinableQueue'
02007         )))
02008 
02009 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
02010 globals().update(testcases_threads)
02011 
02012 class OtherTest(unittest.TestCase):
02013     # TODO: add more tests for deliver/answer challenge.
02014     def test_deliver_challenge_auth_failure(self):
02015         class _FakeConnection(object):
02016             def recv_bytes(self, size):
02017                 return b'something bogus'
02018             def send_bytes(self, data):
02019                 pass
02020         self.assertRaises(multiprocessing.AuthenticationError,
02021                           multiprocessing.connection.deliver_challenge,
02022                           _FakeConnection(), b'abc')
02023 
02024     def test_answer_challenge_auth_failure(self):
02025         class _FakeConnection(object):
02026             def __init__(self):
02027                 self.count = 0
02028             def recv_bytes(self, size):
02029                 self.count += 1
02030                 if self.count == 1:
02031                     return multiprocessing.connection.CHALLENGE
02032                 elif self.count == 2:
02033                     return b'something bogus'
02034                 return b''
02035             def send_bytes(self, data):
02036                 pass
02037         self.assertRaises(multiprocessing.AuthenticationError,
02038                           multiprocessing.connection.answer_challenge,
02039                           _FakeConnection(), b'abc')
02040 
02041 #
02042 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
02043 #
02044 
02045 def initializer(ns):
02046     ns.test += 1
02047 
02048 class TestInitializers(unittest.TestCase):
02049     def setUp(self):
02050         self.mgr = multiprocessing.Manager()
02051         self.ns = self.mgr.Namespace()
02052         self.ns.test = 0
02053 
02054     def tearDown(self):
02055         self.mgr.shutdown()
02056 
02057     def test_manager_initializer(self):
02058         m = multiprocessing.managers.SyncManager()
02059         self.assertRaises(TypeError, m.start, 1)
02060         m.start(initializer, (self.ns,))
02061         self.assertEqual(self.ns.test, 1)
02062         m.shutdown()
02063 
02064     def test_pool_initializer(self):
02065         self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
02066         p = multiprocessing.Pool(1, initializer, (self.ns,))
02067         p.close()
02068         p.join()
02069         self.assertEqual(self.ns.test, 1)
02070 
02071 #
02072 # Issue 5155, 5313, 5331: Test process in processes
02073 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
02074 #
02075 
02076 def _ThisSubProcess(q):
02077     try:
02078         item = q.get(block=False)
02079     except pyqueue.Empty:
02080         pass
02081 
02082 def _TestProcess(q):
02083     queue = multiprocessing.Queue()
02084     subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
02085     subProc.start()
02086     subProc.join()
02087 
02088 def _afunc(x):
02089     return x*x
02090 
02091 def pool_in_process():
02092     pool = multiprocessing.Pool(processes=4)
02093     x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
02094 
02095 class _file_like(object):
02096     def __init__(self, delegate):
02097         self._delegate = delegate
02098         self._pid = None
02099 
02100     @property
02101     def cache(self):
02102         pid = os.getpid()
02103         # There are no race conditions since fork keeps only the running thread
02104         if pid != self._pid:
02105             self._pid = pid
02106             self._cache = []
02107         return self._cache
02108 
02109     def write(self, data):
02110         self.cache.append(data)
02111 
02112     def flush(self):
02113         self._delegate.write(''.join(self.cache))
02114         self._cache = []
02115 
02116 class TestStdinBadfiledescriptor(unittest.TestCase):
02117 
02118     def test_queue_in_process(self):
02119         queue = multiprocessing.Queue()
02120         proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
02121         proc.start()
02122         proc.join()
02123 
02124     def test_pool_in_process(self):
02125         p = multiprocessing.Process(target=pool_in_process)
02126         p.start()
02127         p.join()
02128 
02129     def test_flushing(self):
02130         sio = io.StringIO()
02131         flike = _file_like(sio)
02132         flike.write('foo')
02133         proc = multiprocessing.Process(target=lambda: flike.flush())
02134         flike.flush()
02135         assert sio.getvalue() == 'foo'
02136 
02137 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
02138                    TestStdinBadfiledescriptor]
02139 
02140 #
02141 #
02142 #
02143 
02144 def test_main(run=None):
02145     if sys.platform.startswith("linux"):
02146         try:
02147             lock = multiprocessing.RLock()
02148         except OSError:
02149             raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
02150 
02151     if run is None:
02152         from test.support import run_unittest as run
02153 
02154     util.get_temp_dir()     # creates temp directory for use by all processes
02155 
02156     multiprocessing.get_logger().setLevel(LOG_LEVEL)
02157 
02158     ProcessesMixin.pool = multiprocessing.Pool(4)
02159     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
02160     ManagerMixin.manager.__init__()
02161     ManagerMixin.manager.start()
02162     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
02163 
02164     testcases = (
02165         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
02166         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
02167         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
02168         testcases_other
02169         )
02170 
02171     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
02172     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
02173     run(suite)
02174 
02175     ThreadsMixin.pool.terminate()
02176     ProcessesMixin.pool.terminate()
02177     ManagerMixin.pool.terminate()
02178     ManagerMixin.manager.shutdown()
02179 
02180     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
02181 
02182 def main():
02183     test_main(unittest.TextTestRunner(verbosity=2).run)
02184 
02185 if __name__ == '__main__':
02186     main()