Back to index

python3.2  3.2.2
Public Member Functions | Public Attributes
test.test_queue.FailingQueueTest Class Reference
Inheritance diagram for test.test_queue.FailingQueueTest:
Inheritance graph
[legend]
Collaboration diagram for test.test_queue.FailingQueueTest:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def failing_queue_test
def test_failing_queue
def do_blocking_test
def do_exceptional_blocking_test

Public Attributes

 t
 result

Detailed Description

Definition at line 263 of file test_queue.py.


Member Function Documentation

def test.test_queue.BlockingTestMixin.do_blocking_test (   self,
  block_func,
  block_args,
  trigger_func,
  trigger_args 
) [inherited]

Definition at line 49 of file test_queue.py.

00049 
00050     def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
00051         self.t = _TriggerThread(trigger_func, trigger_args)
00052         self.t.start()
00053         self.result = block_func(*block_args)
00054         # If block_func returned before our thread made the call, we failed!
00055         if not self.t.startedEvent.is_set():
00056             self.fail("blocking function '%r' appeared not to block" %
00057                       block_func)
00058         self.t.join(10) # make sure the thread terminates
00059         if self.t.is_alive():
00060             self.fail("trigger function '%r' appeared to not return" %
00061                       trigger_func)
00062         return self.result

Here is the caller graph for this function:

def test.test_queue.BlockingTestMixin.do_exceptional_blocking_test (   self,
  block_func,
  block_args,
  trigger_func,
  trigger_args,
  expected_exception_class 
) [inherited]

Definition at line 65 of file test_queue.py.

00065 
00066                                    trigger_args, expected_exception_class):
00067         self.t = _TriggerThread(trigger_func, trigger_args)
00068         self.t.start()
00069         try:
00070             try:
00071                 block_func(*block_args)
00072             except expected_exception_class:
00073                 raise
00074             else:
00075                 self.fail("expected exception of kind %r" %
00076                                  expected_exception_class)
00077         finally:
00078             self.t.join(10) # make sure the thread terminates
00079             if self.t.is_alive():
00080                 self.fail("trigger function '%r' appeared to not return" %
00081                                  trigger_func)
00082             if not self.t.startedEvent.is_set():
00083                 self.fail("trigger thread ended but event never set")
00084 

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 265 of file test_queue.py.

00265 
00266     def failing_queue_test(self, q):
00267         if q.qsize():
00268             raise RuntimeError("Call this function with an empty queue")
00269         for i in range(QUEUE_SIZE-1):
00270             q.put(i)
00271         # Test a failing non-blocking put.
00272         q.fail_next_put = True
00273         try:
00274             q.put("oops", block=0)
00275             self.fail("The queue didn't fail when it should have")
00276         except FailingQueueException:
00277             pass
00278         q.fail_next_put = True
00279         try:
00280             q.put("oops", timeout=0.1)
00281             self.fail("The queue didn't fail when it should have")
00282         except FailingQueueException:
00283             pass
00284         q.put("last")
00285         self.assertTrue(qfull(q), "Queue should be full")
00286         # Test a failing blocking put
00287         q.fail_next_put = True
00288         try:
00289             self.do_blocking_test(q.put, ("full",), q.get, ())
00290             self.fail("The queue didn't fail when it should have")
00291         except FailingQueueException:
00292             pass
00293         # Check the Queue isn't damaged.
00294         # put failed, but get succeeded - re-add
00295         q.put("last")
00296         # Test a failing timeout put
00297         q.fail_next_put = True
00298         try:
00299             self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
00300                                               FailingQueueException)
00301             self.fail("The queue didn't fail when it should have")
00302         except FailingQueueException:
00303             pass
00304         # Check the Queue isn't damaged.
00305         # put failed, but get succeeded - re-add
00306         q.put("last")
00307         self.assertTrue(qfull(q), "Queue should be full")
00308         q.get()
00309         self.assertTrue(not qfull(q), "Queue should not be full")
00310         q.put("last")
00311         self.assertTrue(qfull(q), "Queue should be full")
00312         # Test a blocking put
00313         self.do_blocking_test(q.put, ("full",), q.get, ())
00314         # Empty it
00315         for i in range(QUEUE_SIZE):
00316             q.get()
00317         self.assertTrue(not q.qsize(), "Queue should be empty")
00318         q.put("first")
00319         q.fail_next_get = True
00320         try:
00321             q.get()
00322             self.fail("The queue didn't fail when it should have")
00323         except FailingQueueException:
00324             pass
00325         self.assertTrue(q.qsize(), "Queue should not be empty")
00326         q.fail_next_get = True
00327         try:
00328             q.get(timeout=0.1)
00329             self.fail("The queue didn't fail when it should have")
00330         except FailingQueueException:
00331             pass
00332         self.assertTrue(q.qsize(), "Queue should not be empty")
00333         q.get()
00334         self.assertTrue(not q.qsize(), "Queue should be empty")
00335         q.fail_next_get = True
00336         try:
00337             self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
00338                                               FailingQueueException)
00339             self.fail("The queue didn't fail when it should have")
00340         except FailingQueueException:
00341             pass
00342         # put succeeded, but get failed.
00343         self.assertTrue(q.qsize(), "Queue should not be empty")
00344         q.get()
00345         self.assertTrue(not q.qsize(), "Queue should be empty")

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 346 of file test_queue.py.

00346 
00347     def test_failing_queue(self):
00348         # Test to make sure a queue is functioning correctly.
00349         # Done twice to the same instance.
00350         q = FailingQueue(QUEUE_SIZE)
00351         self.failing_queue_test(q)
00352         self.failing_queue_test(q)
00353 

Here is the call graph for this function:


Member Data Documentation

Definition at line 52 of file test_queue.py.

Definition at line 50 of file test_queue.py.


The documentation for this class was generated from the following file: