Back to index

rabbitmq-server  2.8.4
ack.py
Go to the documentation of this file.
00001 import unittest
00002 import stomp
00003 import base
00004 import time
00005 
00006 class TestAck(base.BaseTest):
00007 
00008     def test_ack_client(self):
00009         d = "/queue/ack-test"
00010 
00011         # subscribe and send message
00012         self.listener.reset(2) ## expecting 2 messages
00013         self.conn.subscribe(destination=d, ack='client',
00014                             headers={'prefetch-count': '10'})
00015         self.conn.send("test1", destination=d)
00016         self.conn.send("test2", destination=d)
00017         self.assertTrue(self.listener.await(4), "initial message not received")
00018         self.assertEquals(2, len(self.listener.messages))
00019 
00020         # disconnect with no ack
00021         self.conn.disconnect()
00022 
00023         # now reconnect
00024         conn2 = self.create_connection()
00025         try:
00026             listener2 = base.WaitableListener()
00027             listener2.reset(2)
00028             conn2.set_listener('', listener2)
00029             conn2.subscribe(destination=d, ack='client',
00030                             headers={'prefetch-count': '10'})
00031             self.assertTrue(listener2.await(), "message not received again")
00032             self.assertEquals(2, len(listener2.messages))
00033 
00034             # now ack only the last message - expecting cumulative behaviour
00035             mid = listener2.messages[1]['headers']['message-id']
00036             conn2.ack({'message-id':mid})
00037         finally:
00038             conn2.stop()
00039 
00040         # now reconnect again, shouldn't see the message
00041         conn3 = self.create_connection()
00042         try:
00043             listener3 = base.WaitableListener()
00044             conn3.set_listener('', listener3)
00045             conn3.subscribe(destination=d)
00046             self.assertFalse(listener3.await(3),
00047                              "unexpected message. ACK not working?")
00048         finally:
00049             conn3.stop()
00050 
00051     def test_ack_client_individual(self):
00052         d = "/queue/ack-test-individual"
00053 
00054         # subscribe and send message
00055         self.listener.reset(2) ## expecting 2 messages
00056         self.conn.subscribe(destination=d, ack='client-individual',
00057                             headers={'prefetch-count': '10'})
00058         self.conn.send("test1", destination=d)
00059         self.conn.send("test2", destination=d)
00060         self.assertTrue(self.listener.await(4), "Both initial messages not received")
00061         self.assertEquals(2, len(self.listener.messages))
00062 
00063         # disconnect without acks
00064         self.conn.disconnect()
00065 
00066         # now reconnect
00067         conn2 = self.create_connection()
00068         try:
00069             listener2 = base.WaitableListener()
00070             listener2.reset(2) ## expect 2 messages
00071             conn2.set_listener('', listener2)
00072             conn2.subscribe(destination=d, ack='client-individual',
00073                             headers={'prefetch-count': '10'})
00074             self.assertTrue(listener2.await(2.5), "Did not receive 2 messages")
00075             self.assertEquals(2, len(listener2.messages), "Not exactly 2 messages received")
00076 
00077             # now ack only the 'test2' message - expecting individual behaviour
00078             nummsgs = len(listener2.messages)
00079             mid = None
00080             for ind in range(nummsgs):
00081                 if listener2.messages[ind]['message']=="test2":
00082                     mid = listener2.messages[ind]['headers']['message-id']
00083                     self.assertEquals(1, ind, 'Expecting test2 to be second message')
00084                     break
00085             self.assertTrue(mid, "Did not find test2 message id.")
00086             conn2.ack({'message-id':mid})
00087         finally:
00088             conn2.stop()
00089 
00090         # now reconnect again, shouldn't see the message
00091         conn3 = self.create_connection()
00092         try:
00093             listener3 = base.WaitableListener()
00094             listener3.reset(2) ## expecting a single message, but wait for two
00095             conn3.set_listener('', listener3)
00096             conn3.subscribe(destination=d)
00097             self.assertFalse(listener3.await(2.5),
00098                              "Expected to see only one message. ACK not working?")
00099             self.assertEquals(1, len(listener3.messages), "Expecting exactly one message")
00100             self.assertEquals("test1", listener3.messages[0]['message'], "Unexpected message remains")
00101         finally:
00102             conn3.stop()
00103 
00104     def test_ack_client_tx(self):
00105         d = "/queue/ack-test-tx"
00106 
00107         # subscribe and send message
00108         self.listener.reset()
00109         self.conn.subscribe(destination=d, ack='client')
00110         self.conn.send("test", destination=d)
00111         self.assertTrue(self.listener.await(3), "initial message not received")
00112         self.assertEquals(1, len(self.listener.messages))
00113 
00114         # disconnect with no ack
00115         self.conn.disconnect()
00116 
00117         # now reconnect
00118         conn2 = self.create_connection()
00119         try:
00120             tx = "abc"
00121             listener2 = base.WaitableListener()
00122             conn2.set_listener('', listener2)
00123             conn2.begin(transaction=tx)
00124             conn2.subscribe(destination=d, ack='client')
00125             self.assertTrue(listener2.await(), "message not received again")
00126             self.assertEquals(1, len(listener2.messages))
00127 
00128             # now ack
00129             mid = listener2.messages[0]['headers']['message-id']
00130             conn2.ack({'message-id':mid, 'transaction':tx})
00131 
00132             #now commit
00133             conn2.commit(transaction=tx)
00134         finally:
00135             conn2.stop()
00136 
00137         # now reconnect again, shouldn't see the message
00138         conn3 = self.create_connection()
00139         try:
00140             listener3 = base.WaitableListener()
00141             conn3.set_listener('', listener3)
00142             conn3.subscribe(destination=d)
00143             self.assertFalse(listener3.await(3),
00144                              "unexpected message. TX ACK not working?")
00145         finally:
00146             conn3.stop()
00147 
00148     def test_topic_prefetch(self):
00149         d = "/topic/prefetch-test"
00150 
00151         # subscribe and send message
00152         self.listener.reset(6) ## expect 6 messages
00153         self.conn.subscribe(destination=d, ack='client',
00154                             headers={'prefetch-count': '5'})
00155 
00156         for x in range(10):
00157             self.conn.send("test" + str(x), destination=d)
00158 
00159         self.assertFalse(self.listener.await(3),
00160                          "Should not have been able to see 6 messages")
00161         self.assertEquals(5, len(self.listener.messages))
00162 
00163     def test_nack(self):
00164         d = "/queue/nack-test"
00165 
00166         #subscribe and send
00167         self.conn.subscribe(destination=d, ack='client-individual')
00168         self.conn.send("nack-test", destination=d)
00169 
00170         self.assertTrue(self.listener.await(), "Not received message")
00171         message_id = self.listener.messages[0]['headers']['message-id']
00172         self.listener.reset()
00173 
00174         self.conn.send_frame("NACK", {"message-id" : message_id})
00175         self.assertTrue(self.listener.await(), "Not received message after NACK")
00176         message_id = self.listener.messages[0]['headers']['message-id']
00177         self.conn.ack({'message-id' : message_id})
00178 
00179     def test_nack_multi(self):
00180         d = "/queue/nack-multi"
00181 
00182         self.listener.reset(2)
00183 
00184         #subscribe and send
00185         self.conn.subscribe(destination=d, ack='client',
00186                             headers = {'prefetch-count' : '10'})
00187         self.conn.send("nack-test1", destination=d)
00188         self.conn.send("nack-test2", destination=d)
00189 
00190         self.assertTrue(self.listener.await(), "Not received messages")
00191         message_id = self.listener.messages[1]['headers']['message-id']
00192         self.listener.reset(2)
00193 
00194         self.conn.send_frame("NACK", {"message-id" : message_id})
00195         self.assertTrue(self.listener.await(), "Not received message again")
00196         message_id = self.listener.messages[1]['headers']['message-id']
00197         self.conn.ack({'message-id' : message_id})