Back to index

python3.2  3.2.2
Public Member Functions | Static Public Attributes
test.test_asynchat.TestAsynchat_WithPoll Class Reference
Inheritance diagram for test.test_asynchat.TestAsynchat_WithPoll:
Inheritance graph
[legend]
Collaboration diagram for test.test_asynchat.TestAsynchat_WithPoll:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def setUp
def tearDown
def line_terminator_check
def test_line_terminator1
def test_line_terminator2
def test_line_terminator3
def numeric_terminator_check
def test_numeric_terminator1
def test_numeric_terminator2
def test_none_terminator
def test_simple_producer
def test_string_producer
def test_empty_line
def test_close_when_done

Static Public Attributes

 usepoll = True

Detailed Description

Definition at line 238 of file test_asynchat.py.


Member Function Documentation

def test.test_asynchat.TestAsynchat.line_terminator_check (   self,
  term,
  server_chunk 
) [inherited]

Definition at line 112 of file test_asynchat.py.

00112 
00113     def line_terminator_check(self, term, server_chunk):
00114         event = threading.Event()
00115         s = echo_server(event)
00116         s.chunk_size = server_chunk
00117         s.start()
00118         event.wait()
00119         event.clear()
00120         time.sleep(0.01) # Give server time to start accepting.
00121         c = echo_client(term, s.port)
00122         c.push(b"hello ")
00123         c.push(b"world" + term)
00124         c.push(b"I'm not dead yet!" + term)
00125         c.push(SERVER_QUIT)
00126         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00127         s.join()
00128 
00129         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_asynchat.TestAsynchat.numeric_terminator_check (   self,
  termlen 
) [inherited]

Definition at line 149 of file test_asynchat.py.

00149 
00150     def numeric_terminator_check(self, termlen):
00151         # Try reading a fixed number of bytes
00152         s, event = start_echo_server()
00153         c = echo_client(termlen, s.port)
00154         data = b"hello world, I'm not dead yet!\n"
00155         c.push(data)
00156         c.push(SERVER_QUIT)
00157         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00158         s.join()
00159 
00160         self.assertEqual(c.contents, [data[:termlen]])

Here is the call graph for this function:

Here is the caller graph for this function:

def test.test_asynchat.TestAsynchat.setUp (   self) [inherited]

Definition at line 106 of file test_asynchat.py.

00106 
00107     def setUp (self):
00108         self._threads = support.threading_setup()

Here is the caller graph for this function:

def test.test_asynchat.TestAsynchat.tearDown (   self) [inherited]

Definition at line 109 of file test_asynchat.py.

00109 
00110     def tearDown (self):
00111         support.threading_cleanup(*self._threads)

Here is the caller graph for this function:

Definition at line 215 of file test_asynchat.py.

00215 
00216     def test_close_when_done(self):
00217         s, event = start_echo_server()
00218         s.start_resend_event = threading.Event()
00219         c = echo_client(b'\n', s.port)
00220         c.push(b"hello world\nI'm not dead yet!\n")
00221         c.push(SERVER_QUIT)
00222         c.close_when_done()
00223         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00224 
00225         # Only allow the server to start echoing data back to the client after
00226         # the client has closed its connection.  This prevents a race condition
00227         # where the server echoes all of its data before we can check that it
00228         # got any down below.
00229         s.start_resend_event.set()
00230         s.join()
00231 
00232         self.assertEqual(c.contents, [])
00233         # the server might have been able to send a byte or two back, but this
00234         # at least checks that it received something and didn't just fail
00235         # (which could still result in the client not having received anything)
00236         self.assertGreater(len(s.buffer), 0)
00237 

Here is the call graph for this function:

Definition at line 203 of file test_asynchat.py.

00203 
00204     def test_empty_line(self):
00205         # checks that empty lines are handled correctly
00206         s, event = start_echo_server()
00207         c = echo_client(b'\n', s.port)
00208         c.push(b"hello world\n\nI'm not dead yet!\n")
00209         c.push(SERVER_QUIT)
00210         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00211         s.join()
00212 
00213         self.assertEqual(c.contents,
00214                          [b"hello world", b"", b"I'm not dead yet!"])

Here is the call graph for this function:

Definition at line 134 of file test_asynchat.py.

00134 
00135     def test_line_terminator1(self):
00136         # test one-character terminator
00137         for l in (1,2,3):
00138             self.line_terminator_check(b'\n', l)

Here is the call graph for this function:

Definition at line 139 of file test_asynchat.py.

00139 
00140     def test_line_terminator2(self):
00141         # test two-character terminator
00142         for l in (1,2,3):
00143             self.line_terminator_check(b'\r\n', l)

Here is the call graph for this function:

Definition at line 144 of file test_asynchat.py.

00144 
00145     def test_line_terminator3(self):
00146         # test three-character terminator
00147         for l in (1,2,3):
00148             self.line_terminator_check(b'qqq', l)

Here is the call graph for this function:

Definition at line 169 of file test_asynchat.py.

00169 
00170     def test_none_terminator(self):
00171         # Try reading a fixed number of bytes
00172         s, event = start_echo_server()
00173         c = echo_client(None, s.port)
00174         data = b"hello world, I'm not dead yet!\n"
00175         c.push(data)
00176         c.push(SERVER_QUIT)
00177         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00178         s.join()
00179 
00180         self.assertEqual(c.contents, [])
00181         self.assertEqual(c.buffer, data)

Here is the call graph for this function:

Definition at line 161 of file test_asynchat.py.

00161 
00162     def test_numeric_terminator1(self):
00163         # check that ints & longs both work (since type is
00164         # explicitly checked in async_chat.handle_read)
00165         self.numeric_terminator_check(1)

Here is the call graph for this function:

Definition at line 166 of file test_asynchat.py.

Here is the call graph for this function:

Definition at line 182 of file test_asynchat.py.

00182 
00183     def test_simple_producer(self):
00184         s, event = start_echo_server()
00185         c = echo_client(b'\n', s.port)
00186         data = b"hello world\nI'm not dead yet!\n"
00187         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
00188         c.push_with_producer(p)
00189         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00190         s.join()
00191 
00192         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Here is the call graph for this function:

Definition at line 193 of file test_asynchat.py.

00193 
00194     def test_string_producer(self):
00195         s, event = start_echo_server()
00196         c = echo_client(b'\n', s.port)
00197         data = b"hello world\nI'm not dead yet!\n"
00198         c.push_with_producer(data+SERVER_QUIT)
00199         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
00200         s.join()
00201 
00202         self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Here is the call graph for this function:


Member Data Documentation

Reimplemented from test.test_asynchat.TestAsynchat.

Definition at line 239 of file test_asynchat.py.


The documentation for this class was generated from the following file: