Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions
test.test_multiprocessing._TestConnection Class Reference
Inheritance diagram for test.test_multiprocessing._TestConnection:
Inheritance graph
[legend]
Collaboration diagram for test.test_multiprocessing._TestConnection:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def test_connection
def test_duplex_false
def test_spawn_close
def test_sendbytes
def assertTimingAlmostEqual
def assertReturnsIfImplemented
def __reduce__

Public Attributes

 TYPE
_PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt
struct _typeobjectob_type

Static Public Attributes

tuple ALLOWED_TYPES = ('processes', 'threads')

Private Member Functions

def _echo

Detailed Description

Definition at line 1391 of file test_multiprocessing.py.


Member Function Documentation

def test.test_multiprocessing.BaseTestCase.__reduce__ (   self,
  args 
) [inherited]

Definition at line 124 of file test_multiprocessing.py.

00124 
00125     def __reduce__(self, *args):
00126         raise NotImplementedError("shouldn't try to pickle a test case")

def test.test_multiprocessing._TestConnection._echo (   cls,
  conn 
) [private]

Definition at line 1396 of file test_multiprocessing.py.

01396 
01397     def _echo(cls, conn):
01398         for msg in iter(conn.recv_bytes, SENTINEL):
01399             conn.send_bytes(msg)
01400         conn.close()

Here is the caller graph for this function:

def test.test_multiprocessing.BaseTestCase.assertReturnsIfImplemented (   self,
  value,
  func,
  args 
) [inherited]

Definition at line 114 of file test_multiprocessing.py.

00114 
00115     def assertReturnsIfImplemented(self, value, func, *args):
00116         try:
00117             res = func(*args)
00118         except NotImplementedError:
00119             pass
00120         else:
00121             return self.assertEqual(value, res)

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_multiprocessing.BaseTestCase.assertTimingAlmostEqual (   self,
  a,
  b 
) [inherited]

Definition at line 110 of file test_multiprocessing.py.

00110 
00111     def assertTimingAlmostEqual(self, a, b):
00112         if CHECK_TIMINGS:
00113             self.assertAlmostEqual(a, b, 1)

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1401 of file test_multiprocessing.py.

01401 
01402     def test_connection(self):
01403         conn, child_conn = self.Pipe()
01404 
01405         p = self.Process(target=self._echo, args=(child_conn,))
01406         p.daemon = True
01407         p.start()
01408 
01409         seq = [1, 2.25, None]
01410         msg = latin('hello world')
01411         longmsg = msg * 10
01412         arr = array.array('i', list(range(4)))
01413 
01414         if self.TYPE == 'processes':
01415             self.assertEqual(type(conn.fileno()), int)
01416 
01417         self.assertEqual(conn.send(seq), None)
01418         self.assertEqual(conn.recv(), seq)
01419 
01420         self.assertEqual(conn.send_bytes(msg), None)
01421         self.assertEqual(conn.recv_bytes(), msg)
01422 
01423         if self.TYPE == 'processes':
01424             buffer = array.array('i', [0]*10)
01425             expected = list(arr) + [0] * (10 - len(arr))
01426             self.assertEqual(conn.send_bytes(arr), None)
01427             self.assertEqual(conn.recv_bytes_into(buffer),
01428                              len(arr) * buffer.itemsize)
01429             self.assertEqual(list(buffer), expected)
01430 
01431             buffer = array.array('i', [0]*10)
01432             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
01433             self.assertEqual(conn.send_bytes(arr), None)
01434             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
01435                              len(arr) * buffer.itemsize)
01436             self.assertEqual(list(buffer), expected)
01437 
01438             buffer = bytearray(latin(' ' * 40))
01439             self.assertEqual(conn.send_bytes(longmsg), None)
01440             try:
01441                 res = conn.recv_bytes_into(buffer)
01442             except multiprocessing.BufferTooShort as e:
01443                 self.assertEqual(e.args, (longmsg,))
01444             else:
01445                 self.fail('expected BufferTooShort, got %s' % res)
01446 
01447         poll = TimingWrapper(conn.poll)
01448 
01449         self.assertEqual(poll(), False)
01450         self.assertTimingAlmostEqual(poll.elapsed, 0)
01451 
01452         self.assertEqual(poll(TIMEOUT1), False)
01453         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
01454 
01455         conn.send(None)
01456 
01457         self.assertEqual(poll(TIMEOUT1), True)
01458         self.assertTimingAlmostEqual(poll.elapsed, 0)
01459 
01460         self.assertEqual(conn.recv(), None)
01461 
01462         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
01463         conn.send_bytes(really_big_msg)
01464         self.assertEqual(conn.recv_bytes(), really_big_msg)
01465 
01466         conn.send_bytes(SENTINEL)                          # tell child to quit
01467         child_conn.close()
01468 
01469         if self.TYPE == 'processes':
01470             self.assertEqual(conn.readable, True)
01471             self.assertEqual(conn.writable, True)
01472             self.assertRaises(EOFError, conn.recv)
01473             self.assertRaises(EOFError, conn.recv_bytes)
01474 
01475         p.join()

Here is the call graph for this function:

Definition at line 1476 of file test_multiprocessing.py.

01476 
01477     def test_duplex_false(self):
01478         reader, writer = self.Pipe(duplex=False)
01479         self.assertEqual(writer.send(1), None)
01480         self.assertEqual(reader.recv(), 1)
01481         if self.TYPE == 'processes':
01482             self.assertEqual(reader.readable, True)
01483             self.assertEqual(reader.writable, False)
01484             self.assertEqual(writer.readable, False)
01485             self.assertEqual(writer.writable, True)
01486             self.assertRaises(IOError, reader.send, 2)
01487             self.assertRaises(IOError, writer.recv)
01488             self.assertRaises(IOError, writer.poll)

Here is the call graph for this function:

Definition at line 1509 of file test_multiprocessing.py.

01509 
01510     def test_sendbytes(self):
01511         if self.TYPE != 'processes':
01512             return
01513 
01514         msg = latin('abcdefghijklmnopqrstuvwxyz')
01515         a, b = self.Pipe()
01516 
01517         a.send_bytes(msg)
01518         self.assertEqual(b.recv_bytes(), msg)
01519 
01520         a.send_bytes(msg, 5)
01521         self.assertEqual(b.recv_bytes(), msg[5:])
01522 
01523         a.send_bytes(msg, 7, 8)
01524         self.assertEqual(b.recv_bytes(), msg[7:7+8])
01525 
01526         a.send_bytes(msg, 26)
01527         self.assertEqual(b.recv_bytes(), latin(''))
01528 
01529         a.send_bytes(msg, 26, 0)
01530         self.assertEqual(b.recv_bytes(), latin(''))
01531 
01532         self.assertRaises(ValueError, a.send_bytes, msg, 27)
01533 
01534         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
01535 
01536         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
01537 
01538         self.assertRaises(ValueError, a.send_bytes, msg, -1)
01539 
01540         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)

Here is the call graph for this function:

Definition at line 1489 of file test_multiprocessing.py.

01489 
01490     def test_spawn_close(self):
01491         # We test that a pipe connection can be closed by parent
01492         # process immediately after child is spawned.  On Windows this
01493         # would have sometimes failed on old versions because
01494         # child_conn would be closed before the child got a chance to
01495         # duplicate it.
01496         conn, child_conn = self.Pipe()
01497 
01498         p = self.Process(target=self._echo, args=(child_conn,))
01499         p.start()
01500         child_conn.close()    # this might complete before child initializes
01501 
01502         msg = latin('hello')
01503         conn.send_bytes(msg)
01504         self.assertEqual(conn.recv_bytes(), msg)
01505 
01506         conn.send_bytes(SENTINEL)
01507         conn.close()
01508         p.join()

Here is the call graph for this function:


Member Data Documentation

tuple test.test_multiprocessing._TestConnection.ALLOWED_TYPES = ('processes', 'threads') [static]

Reimplemented from test.test_multiprocessing.BaseTestCase.

Definition at line 1393 of file test_multiprocessing.py.

Definition at line 107 of file object.h.

struct _typeobject* _object::ob_type [inherited]

Definition at line 108 of file object.h.

Definition at line 1413 of file test_multiprocessing.py.


The documentation for this class was generated from the following file: