Back to index

python3.2  3.2.2
Public Member Functions
test.test_kqueue.TestKQueue Class Reference

List of all members.

Public Member Functions

def test_create_queue
def test_create_event
def test_queue_event
def testPair

Detailed Description

Definition at line 16 of file test_kqueue.py.


Member Function Documentation

Definition at line 25 of file test_kqueue.py.

00025 
00026     def test_create_event(self):
00027         from operator import lt, le, gt, ge
00028 
00029         fd = os.open(os.devnull, os.O_WRONLY)
00030         self.addCleanup(os.close, fd)
00031 
00032         ev = select.kevent(fd)
00033         other = select.kevent(1000)
00034         self.assertEqual(ev.ident, fd)
00035         self.assertEqual(ev.filter, select.KQ_FILTER_READ)
00036         self.assertEqual(ev.flags, select.KQ_EV_ADD)
00037         self.assertEqual(ev.fflags, 0)
00038         self.assertEqual(ev.data, 0)
00039         self.assertEqual(ev.udata, 0)
00040         self.assertEqual(ev, ev)
00041         self.assertNotEqual(ev, other)
00042         self.assertTrue(ev < other)
00043         self.assertTrue(other >= ev)
00044         for op in lt, le, gt, ge:
00045             self.assertRaises(TypeError, op, ev, None)
00046             self.assertRaises(TypeError, op, ev, 1)
00047             self.assertRaises(TypeError, op, ev, "ev")
00048 
00049         ev = select.kevent(fd, select.KQ_FILTER_WRITE)
00050         self.assertEqual(ev.ident, fd)
00051         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
00052         self.assertEqual(ev.flags, select.KQ_EV_ADD)
00053         self.assertEqual(ev.fflags, 0)
00054         self.assertEqual(ev.data, 0)
00055         self.assertEqual(ev.udata, 0)
00056         self.assertEqual(ev, ev)
00057         self.assertNotEqual(ev, other)
00058 
00059         ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
00060         self.assertEqual(ev.ident, fd)
00061         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
00062         self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
00063         self.assertEqual(ev.fflags, 0)
00064         self.assertEqual(ev.data, 0)
00065         self.assertEqual(ev.udata, 0)
00066         self.assertEqual(ev, ev)
00067         self.assertNotEqual(ev, other)
00068 
00069         ev = select.kevent(1, 2, 3, 4, 5, 6)
00070         self.assertEqual(ev.ident, 1)
00071         self.assertEqual(ev.filter, 2)
00072         self.assertEqual(ev.flags, 3)
00073         self.assertEqual(ev.fflags, 4)
00074         self.assertEqual(ev.data, 5)
00075         self.assertEqual(ev.udata, 6)
00076         self.assertEqual(ev, ev)
00077         self.assertNotEqual(ev, other)
00078 
00079         bignum = sys.maxsize * 2 + 1
00080         ev = select.kevent(bignum, 1, 2, 3, sys.maxsize, bignum)
00081         self.assertEqual(ev.ident, bignum)
00082         self.assertEqual(ev.filter, 1)
00083         self.assertEqual(ev.flags, 2)
00084         self.assertEqual(ev.fflags, 3)
00085         self.assertEqual(ev.data, sys.maxsize)
00086         self.assertEqual(ev.udata, bignum)
00087         self.assertEqual(ev, ev)
00088         self.assertNotEqual(ev, other)

Here is the call graph for this function:

Definition at line 17 of file test_kqueue.py.

00017 
00018     def test_create_queue(self):
00019         kq = select.kqueue()
00020         self.assertTrue(kq.fileno() > 0, kq.fileno())
00021         self.assertTrue(not kq.closed)
00022         kq.close()
00023         self.assertTrue(kq.closed)
00024         self.assertRaises(ValueError, kq.fileno)

Here is the call graph for this function:

Definition at line 89 of file test_kqueue.py.

00089 
00090     def test_queue_event(self):
00091         serverSocket = socket.socket()
00092         serverSocket.bind(('127.0.0.1', 0))
00093         serverSocket.listen(1)
00094         client = socket.socket()
00095         client.setblocking(False)
00096         try:
00097             client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
00098         except socket.error as e:
00099             self.assertEqual(e.args[0], errno.EINPROGRESS)
00100         else:
00101             #raise AssertionError("Connect should have raised EINPROGRESS")
00102             pass # FreeBSD doesn't raise an exception here
00103         server, addr = serverSocket.accept()
00104 
00105         if sys.platform.startswith("darwin"):
00106             flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
00107         else:
00108             flags = 0
00109 
00110         kq = select.kqueue()
00111         kq2 = select.kqueue.fromfd(kq.fileno())
00112 
00113         ev = select.kevent(server.fileno(),
00114                            select.KQ_FILTER_WRITE,
00115                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00116         kq.control([ev], 0)
00117         ev = select.kevent(server.fileno(),
00118                            select.KQ_FILTER_READ,
00119                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00120         kq.control([ev], 0)
00121         ev = select.kevent(client.fileno(),
00122                            select.KQ_FILTER_WRITE,
00123                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00124         kq2.control([ev], 0)
00125         ev = select.kevent(client.fileno(),
00126                            select.KQ_FILTER_READ,
00127                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00128         kq2.control([ev], 0)
00129 
00130         events = kq.control(None, 4, 1)
00131         events = [(e.ident, e.filter, e.flags) for e in events]
00132         events.sort()
00133         self.assertEqual(events, [
00134             (client.fileno(), select.KQ_FILTER_WRITE, flags),
00135             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
00136 
00137         client.send(b"Hello!")
00138         server.send(b"world!!!")
00139 
00140         # We may need to call it several times
00141         for i in range(10):
00142             events = kq.control(None, 4, 1)
00143             if len(events) == 4:
00144                 break
00145             time.sleep(1.0)
00146         else:
00147             self.fail('timeout waiting for event notifications')
00148 
00149         events = [(e.ident, e.filter, e.flags) for e in events]
00150         events.sort()
00151 
00152         self.assertEqual(events, [
00153             (client.fileno(), select.KQ_FILTER_WRITE, flags),
00154             (client.fileno(), select.KQ_FILTER_READ, flags),
00155             (server.fileno(), select.KQ_FILTER_WRITE, flags),
00156             (server.fileno(), select.KQ_FILTER_READ, flags)])
00157 
00158         # Remove completely client, and server read part
00159         ev = select.kevent(client.fileno(),
00160                            select.KQ_FILTER_WRITE,
00161                            select.KQ_EV_DELETE)
00162         kq.control([ev], 0)
00163         ev = select.kevent(client.fileno(),
00164                            select.KQ_FILTER_READ,
00165                            select.KQ_EV_DELETE)
00166         kq.control([ev], 0)
00167         ev = select.kevent(server.fileno(),
00168                            select.KQ_FILTER_READ,
00169                            select.KQ_EV_DELETE)
00170         kq.control([ev], 0, 0)
00171 
00172         events = kq.control([], 4, 0.99)
00173         events = [(e.ident, e.filter, e.flags) for e in events]
00174         events.sort()
00175         self.assertEqual(events, [
00176             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
00177 
00178         client.close()
00179         server.close()
00180         serverSocket.close()

Here is the call graph for this function:

Definition at line 181 of file test_kqueue.py.

00181 
00182     def testPair(self):
00183         kq = select.kqueue()
00184         a, b = socket.socketpair()
00185 
00186         a.send(b'foo')
00187         event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00188         event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
00189         r = kq.control([event1, event2], 1, 1)
00190         self.assertTrue(r)
00191         self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
00192         self.assertEqual(b.recv(r[0].data), b'foo')
00193 
00194         a.close()
00195         b.close()
00196         kq.close()

Here is the call graph for this function:


The documentation for this class was generated from the following file: