Back to index

python3.2  3.2.2
Public Member Functions
test.test_poll.PollTests Class Reference

List of all members.

Public Member Functions

def test_poll1
def poll_unit_tests
def test_poll2
def test_poll3

Detailed Description

Definition at line 19 of file test_poll.py.


Member Function Documentation

Definition at line 70 of file test_poll.py.

00070 
00071     def poll_unit_tests(self):
00072         # returns NVAL for invalid file descriptor
00073         FD = 42
00074         try:
00075             os.close(FD)
00076         except OSError:
00077             pass
00078         p = select.poll()
00079         p.register(FD)
00080         r = p.poll()
00081         self.assertEqual(r[0], (FD, select.POLLNVAL))
00082 
00083         f = open(TESTFN, 'w')
00084         fd = f.fileno()
00085         p = select.poll()
00086         p.register(f)
00087         r = p.poll()
00088         self.assertEqual(r[0][0], fd)
00089         f.close()
00090         r = p.poll()
00091         self.assertEqual(r[0], (fd, select.POLLNVAL))
00092         os.unlink(TESTFN)
00093 
00094         # type error for invalid arguments
00095         p = select.poll()
00096         self.assertRaises(TypeError, p.register, p)
00097         self.assertRaises(TypeError, p.unregister, p)
00098 
00099         # can't unregister non-existent object
00100         p = select.poll()
00101         self.assertRaises(KeyError, p.unregister, 3)
00102 
00103         # Test error cases
00104         pollster = select.poll()
00105         class Nope:
00106             pass
00107 
00108         class Almost:
00109             def fileno(self):
00110                 return 'fileno'
00111 
00112         self.assertRaises(TypeError, pollster.register, Nope(), 0)
00113         self.assertRaises(TypeError, pollster.register, Almost(), 0)

Here is the call graph for this function:

Definition at line 21 of file test_poll.py.

00021 
00022     def test_poll1(self):
00023         # Basic functional test of poll object
00024         # Create a bunch of pipe and test that poll works with them.
00025 
00026         p = select.poll()
00027 
00028         NUM_PIPES = 12
00029         MSG = b" This is a test."
00030         MSG_LEN = len(MSG)
00031         readers = []
00032         writers = []
00033         r2w = {}
00034         w2r = {}
00035 
00036         for i in range(NUM_PIPES):
00037             rd, wr = os.pipe()
00038             p.register(rd)
00039             p.modify(rd, select.POLLIN)
00040             p.register(wr, select.POLLOUT)
00041             readers.append(rd)
00042             writers.append(wr)
00043             r2w[rd] = wr
00044             w2r[wr] = rd
00045 
00046         bufs = []
00047 
00048         while writers:
00049             ready = p.poll()
00050             ready_writers = find_ready_matching(ready, select.POLLOUT)
00051             if not ready_writers:
00052                 raise RuntimeError("no pipes ready for writing")
00053             wr = random.choice(ready_writers)
00054             os.write(wr, MSG)
00055 
00056             ready = p.poll()
00057             ready_readers = find_ready_matching(ready, select.POLLIN)
00058             if not ready_readers:
00059                 raise RuntimeError("no pipes ready for reading")
00060             rd = random.choice(ready_readers)
00061             buf = os.read(rd, MSG_LEN)
00062             self.assertEqual(len(buf), MSG_LEN)
00063             bufs.append(buf)
00064             os.close(r2w[rd]) ; os.close( rd )
00065             p.unregister( r2w[rd] )
00066             p.unregister( rd )
00067             writers.remove(r2w[rd])
00068 
00069         self.assertEqual(bufs, [MSG] * NUM_PIPES)

Here is the call graph for this function:

Definition at line 117 of file test_poll.py.

00117 
00118     def test_poll2(self):
00119         cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
00120         p = os.popen(cmd, 'r')
00121         pollster = select.poll()
00122         pollster.register( p, select.POLLIN )
00123         for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
00124             fdlist = pollster.poll(tout)
00125             if (fdlist == []):
00126                 continue
00127             fd, flags = fdlist[0]
00128             if flags & select.POLLHUP:
00129                 line = p.readline()
00130                 if line != "":
00131                     self.fail('error: pipe seems to be closed, but still returns data')
00132                 continue
00133 
00134             elif flags & select.POLLIN:
00135                 line = p.readline()
00136                 if not line:
00137                     break
00138                 continue
00139             else:
00140                 self.fail('Unexpected return value from select.poll: %s' % fdlist)
00141         p.close()

Here is the call graph for this function:

Definition at line 142 of file test_poll.py.

00142 
00143     def test_poll3(self):
00144         # test int overflow
00145         pollster = select.poll()
00146         pollster.register(1)
00147 
00148         self.assertRaises(OverflowError, pollster.poll, 1 << 64)
00149 
00150         x = 2 + 3
00151         if x != 5:
00152             self.fail('Overflow must have occurred')

Here is the call graph for this function:


The documentation for this class was generated from the following file: