Back to index

rabbitmq-server  2.8.4
destinations.py
Go to the documentation of this file.
00001 import unittest
00002 import stomp
00003 import base
00004 import time
00005 
00006 class TestExchange(base.BaseTest):
00007 
00008 
00009     def test_amq_direct(self):
00010         ''' Test basic send/receive for /exchange/amq.direct '''
00011         self.__test_exchange_send_rec("amq.direct", "route")
00012 
00013     def test_amq_topic(self):
00014         ''' Test basic send/receive for /exchange/amq.topic '''
00015         self.__test_exchange_send_rec("amq.topic", "route")
00016 
00017     def test_amq_fanout(self):
00018         ''' Test basic send/receive for /exchange/amq.fanout '''
00019         self.__test_exchange_send_rec("amq.fanout", "route")
00020 
00021     def test_amq_fanout_no_route(self):
00022         ''' Test basic send/receive, /exchange/amq.direct, no routing key'''
00023         self.__test_exchange_send_rec("amq.fanout")
00024 
00025     def test_invalid_exchange(self):
00026         ''' Test invalid exchange error '''
00027         self.listener.reset()
00028         self.conn.subscribe(destination="/exchange/does.not.exist")
00029         self.listener.await()
00030         self.assertEquals(1, len(self.listener.errors))
00031         err = self.listener.errors[0]
00032         self.assertEquals("not_found", err['headers']['message'])
00033         self.assertEquals(
00034             "NOT_FOUND - no exchange 'does.not.exist' in vhost '/'\n",
00035             err['message'])
00036         time.sleep(1)
00037         self.assertFalse(self.conn.is_connected())
00038 
00039     def __test_exchange_send_rec(self, exchange, route = None):
00040         if exchange != "amq.topic":
00041             dest = "/exchange/" + exchange
00042         else:
00043             dest = "/topic"
00044         if route != None:
00045             dest += "/" + route
00046 
00047         self.simple_test_send_rec(dest)
00048 
00049 class TestQueue(base.BaseTest):
00050 
00051     def test_send_receive(self):
00052         ''' Test basic send/receive for /queue '''
00053         d = '/queue/test'
00054         self.simple_test_send_rec(d)
00055 
00056     def test_send_receive_in_other_conn(self):
00057         ''' Test send in one connection, receive in another '''
00058         d = '/queue/test2'
00059 
00060         # send
00061         self.conn.send("hello", destination=d)
00062 
00063         # now receive
00064         conn2 = self.create_connection()
00065         try:
00066             listener2 = base.WaitableListener()
00067             conn2.set_listener('', listener2)
00068 
00069             conn2.subscribe(destination=d)
00070             self.assertTrue(listener2.await(10), "no receive")
00071         finally:
00072             conn2.stop()
00073 
00074     def test_send_receive_in_other_conn_with_disconnect(self):
00075         ''' Test send, disconnect, receive '''
00076         d = '/queue/test3'
00077 
00078         # send
00079         self.conn.send("hello thar", destination=d, receipt="foo")
00080         self.listener.await(3)
00081         self.conn.stop()
00082 
00083         # now receive
00084         conn2 = self.create_connection()
00085         try:
00086             listener2 = base.WaitableListener()
00087             conn2.set_listener('', listener2)
00088 
00089             conn2.subscribe(destination=d)
00090             self.assertTrue(listener2.await(10), "no receive")
00091         finally:
00092             conn2.stop()
00093 
00094 
00095     def test_multi_subscribers(self):
00096         ''' Test multiple subscribers against a single /queue destination '''
00097         d = '/queue/test-multi'
00098 
00099         ## set up two subscribers
00100         conn1, listener1 = self.create_subscriber_connection(d)
00101         conn2, listener2 = self.create_subscriber_connection(d)
00102 
00103         try:
00104             ## now send
00105             self.conn.send("test1", destination=d)
00106             self.conn.send("test2", destination=d)
00107 
00108             ## expect both consumers to get a message?
00109             self.assertTrue(listener1.await(2))
00110             self.assertEquals(1, len(listener1.messages),
00111                               "unexpected message count")
00112             self.assertTrue(listener2.await(2))
00113             self.assertEquals(1, len(listener2.messages),
00114                               "unexpected message count")
00115         finally:
00116             conn1.stop()
00117             conn2.stop()
00118 
00119     def test_send_with_receipt(self):
00120         d = '/queue/test-receipt'
00121         def noop(): pass
00122         self.__test_send_receipt(d, noop, noop)
00123 
00124     def test_send_with_receipt_tx(self):
00125         d = '/queue/test-receipt-tx'
00126         tx = 'receipt.tx'
00127 
00128         def before():
00129             self.conn.begin(transaction=tx)
00130 
00131         def after():
00132             self.assertFalse(self.listener.await(1))
00133             self.conn.commit(transaction=tx)
00134 
00135         self.__test_send_receipt(d, before, after, {'transaction': tx})
00136 
00137     def test_interleaved_receipt_no_receipt(self):
00138         ''' Test i-leaved receipt/no receipt, no-r bracketed by rs '''
00139 
00140         d = '/queue/ir'
00141 
00142         self.listener.reset(5)
00143 
00144         self.conn.subscribe(destination=d)
00145         self.conn.send('first', destination=d, receipt='a')
00146         self.conn.send('second', destination=d)
00147         self.conn.send('third', destination=d, receipt='b')
00148 
00149         self.assertTrue(self.listener.await(3))
00150 
00151         self.assertEquals(set(['a','b']), self.__gather_receipts())
00152         self.assertEquals(3, len(self.listener.messages))
00153 
00154     def test_interleaved_receipt_no_receipt_tx(self):
00155         ''' Test i-leaved receipt/no receipt, no-r bracketed by r+xactions '''
00156 
00157         d = '/queue/ir'
00158         tx = 'tx.ir'
00159 
00160         # three messages and two receipts
00161         self.listener.reset(5)
00162 
00163         self.conn.subscribe(destination=d)
00164         self.conn.begin(transaction=tx)
00165 
00166         self.conn.send('first', destination=d, receipt='a', transaction=tx)
00167         self.conn.send('second', destination=d, transaction=tx)
00168         self.conn.send('third', destination=d, receipt='b', transaction=tx)
00169         self.conn.commit(transaction=tx)
00170 
00171         self.assertTrue(self.listener.await(40), "Missing messages/confirms")
00172 
00173         expected = set(['a', 'b'])
00174         missing = expected.difference(self.__gather_receipts())
00175 
00176         self.assertEquals(set(), missing, "Missing receipts: " + str(missing))
00177         self.assertEquals(3, len(self.listener.messages))
00178 
00179     def test_interleaved_receipt_no_receipt_inverse(self):
00180         ''' Test i-leaved receipt/no receipt, r bracketed by no-rs '''
00181 
00182         d = '/queue/ir'
00183 
00184         self.listener.reset(4)
00185 
00186         self.conn.subscribe(destination=d)
00187         self.conn.send('first', destination=d)
00188         self.conn.send('second', destination=d, receipt='a')
00189         self.conn.send('third', destination=d)
00190 
00191         self.assertTrue(self.listener.await(3))
00192 
00193         self.assertEquals(set(['a']), self.__gather_receipts())
00194         self.assertEquals(3, len(self.listener.messages))
00195 
00196     def __test_send_receipt(self, destination, before, after, headers = {}):
00197         count = 50
00198         self.listener.reset(count)
00199 
00200         before()
00201         expected_receipts = set()
00202 
00203         for x in range(0, count):
00204             receipt = "test" + str(x)
00205             expected_receipts.add(receipt)
00206             self.conn.send("test receipt", destination=destination,
00207                            receipt=receipt, headers=headers)
00208         after()
00209 
00210         self.assertTrue(self.listener.await(5))
00211 
00212         missing_receipts = expected_receipts.difference(
00213                     self.__gather_receipts())
00214 
00215         self.assertEquals(set(), missing_receipts,
00216                           "missing receipts: " + str(missing_receipts))
00217 
00218     def __gather_receipts(self):
00219         result = set()
00220         for r in self.listener.receipts:
00221             result.add(r['headers']['receipt-id'])
00222         return result
00223 
00224 class TestTopic(base.BaseTest):
00225 
00226       def test_send_receive(self):
00227         ''' Test basic send/receive for /topic '''
00228         d = '/topic/test'
00229         self.simple_test_send_rec(d)
00230 
00231       def test_send_multiple(self):
00232           ''' Test /topic with multiple consumers '''
00233           d = '/topic/multiple'
00234 
00235           ## set up two subscribers
00236           conn1, listener1 = self.create_subscriber_connection(d)
00237           conn2, listener2 = self.create_subscriber_connection(d)
00238 
00239           try:
00240               ## listeners are expecting 2 messages
00241               listener1.reset(2)
00242               listener2.reset(2)
00243 
00244               ## now send
00245               self.conn.send("test1", destination=d)
00246               self.conn.send("test2", destination=d)
00247 
00248               ## expect both consumers to get both messages
00249               self.assertTrue(listener1.await(5))
00250               self.assertEquals(2, len(listener1.messages),
00251                                 "unexpected message count")
00252               self.assertTrue(listener2.await(5))
00253               self.assertEquals(2, len(listener2.messages),
00254                                 "unexpected message count")
00255           finally:
00256               conn1.stop()
00257               conn2.stop()
00258 
00259 class TestReplyQueue(base.BaseTest):
00260 
00261     def test_reply_queue(self):
00262         ''' Test with two separate clients. Client 1 sends
00263         message to a known destination with a defined reply
00264         queue. Client 2 receives on known destination and replies
00265         on the reply destination. Client 1 gets the reply message'''
00266 
00267         known = '/queue/known'
00268         reply = '/temp-queue/0'
00269 
00270         ## Client 1 uses pre-supplied connection and listener
00271         ## Set up client 2
00272         conn2, listener2 = self.create_subscriber_connection(known)
00273 
00274         try:
00275             self.conn.send("test", destination=known,
00276                            headers = {"reply-to": reply})
00277 
00278             self.assertTrue(listener2.await(5))
00279             self.assertEquals(1, len(listener2.messages))
00280 
00281             reply_to = listener2.messages[0]['headers']['reply-to']
00282             self.assertTrue(reply_to.startswith('/reply-queue/'))
00283 
00284             conn2.send("reply", destination=reply_to)
00285             self.assertTrue(self.listener.await(5))
00286             self.assertEquals("reply", self.listener.messages[0]['message'])
00287         finally:
00288             conn2.stop()
00289 
00290     def test_reuse_reply_queue(self):
00291         ''' Test re-use of reply-to queue '''
00292 
00293         known2 = '/queue/known2'
00294         known3 = '/queue/known3'
00295         reply = '/temp-queue/foo'
00296 
00297         def respond(cntn, listna):
00298             self.assertTrue(listna.await(5))
00299             self.assertEquals(1, len(listna.messages))
00300             reply_to = listna.messages[0]['headers']['reply-to']
00301             self.assertTrue(reply_to.startswith('/reply-queue/'))
00302             cntn.send("reply", destination=reply_to)
00303 
00304         ## Client 1 uses pre-supplied connection and listener
00305         ## Set up clients 2 and 3
00306         conn2, listener2 = self.create_subscriber_connection(known2)
00307         conn3, listener3 = self.create_subscriber_connection(known3)
00308         try:
00309             self.listener.reset(2)
00310             self.conn.send("test2", destination=known2,
00311                            headers = {"reply-to": reply})
00312             self.conn.send("test3", destination=known3,
00313                            headers = {"reply-to": reply})
00314             respond(conn2, listener2)
00315             respond(conn3, listener3)
00316 
00317             self.assertTrue(self.listener.await(5))
00318             self.assertEquals(2, len(self.listener.messages))
00319             self.assertEquals("reply", self.listener.messages[0]['message'])
00320             self.assertEquals("reply", self.listener.messages[1]['message'])
00321         finally:
00322             conn2.stop()
00323             conn3.stop()
00324 
00325 class TestDurableSubscription(base.BaseTest):
00326 
00327     ID = 'test.subscription'
00328 
00329     def __subscribe(self, dest, conn=None, id=None):
00330         if not conn:
00331             conn = self.conn
00332         if not id:
00333             id = TestDurableSubscription.ID
00334 
00335         conn.subscribe(destination=dest,
00336                        headers    ={'persistent': 'true',
00337                                     'receipt': 1,
00338                                     'id': id})
00339 
00340     def __assert_receipt(self, listener=None):
00341         if not listener:
00342             listener = self.listener
00343 
00344         self.assertTrue(listener.await(5))
00345         self.assertEquals(1, len(self.listener.receipts))
00346 
00347     def __assert_message(self, msg, listener=None):
00348         if not listener:
00349             listener = self.listener
00350 
00351         self.assertTrue(listener.await(5))
00352         self.assertEquals(1, len(listener.messages))
00353         self.assertEquals(msg, listener.messages[0]['message'])
00354 
00355     def test_durability(self):
00356         d = '/topic/durable'
00357 
00358         self.__subscribe(d)
00359         self.__assert_receipt()
00360 
00361         # send first message without unsubscribing
00362         self.listener.reset(1)
00363         self.conn.send("first", destination=d)
00364         self.__assert_message("first")
00365 
00366         # now unsubscribe (disconnect only)
00367         self.conn.unsubscribe(id=TestDurableSubscription.ID)
00368 
00369         # send again
00370         self.listener.reset(1)
00371         self.conn.send("second", destination=d)
00372 
00373         # resubscribe and expect message
00374         self.__subscribe(d)
00375         self.__assert_message("second")
00376 
00377         # now unsubscribe (cancel)
00378         self.conn.unsubscribe(id=TestDurableSubscription.ID,
00379                               headers={'persistent': 'true'})
00380 
00381         # send again
00382         self.listener.reset(1)
00383         self.conn.send("third", destination=d)
00384 
00385         # resubscribe and expect no message
00386         self.__subscribe(d)
00387         self.assertTrue(self.listener.await(3))
00388         self.assertEquals(0, len(self.listener.messages))
00389         self.assertEquals(1, len(self.listener.receipts))
00390 
00391     def test_share_subscription(self):
00392         d = '/topic/durable-shared'
00393 
00394         conn2 = self.create_connection()
00395         conn2.set_listener('', self.listener)
00396 
00397         try:
00398             self.__subscribe(d)
00399             self.__assert_receipt()
00400             self.listener.reset(1)
00401             self.__subscribe(d, conn2)
00402             self.__assert_receipt()
00403 
00404             self.listener.reset(100)
00405 
00406             # send 100 messages
00407             for x in xrange(0, 100):
00408                 self.conn.send("msg" + str(x), destination=d)
00409 
00410             self.assertTrue(self.listener.await(5))
00411             self.assertEquals(100, len(self.listener.messages))
00412         finally:
00413             conn2.stop()
00414 
00415     def test_separate_ids(self):
00416         d = '/topic/durable-separate'
00417 
00418         conn2 = self.create_connection()
00419         listener2 = base.WaitableListener()
00420         conn2.set_listener('', listener2)
00421 
00422         try:
00423             # ensure durable subscription exists for each ID
00424             self.__subscribe(d)
00425             self.__assert_receipt()
00426             self.__subscribe(d, conn2, "other.id")
00427             self.__assert_receipt(listener2)
00428             self.conn.unsubscribe(id=TestDurableSubscription.ID)
00429             conn2.unsubscribe(id="other.id")
00430 
00431             self.listener.reset(101)
00432             listener2.reset(101) ## 100 messages and 1 receipt
00433 
00434             # send 100 messages
00435             for x in xrange(0, 100):
00436                 self.conn.send("msg" + str(x), destination=d)
00437 
00438             self.__subscribe(d)
00439             self.__subscribe(d, conn2, "other.id")
00440 
00441             for l in [self.listener, listener2]:
00442                 self.assertTrue(l.await(10))
00443                 self.assertEquals(100, len(l.messages))
00444 
00445         finally:
00446             conn2.stop()
00447 
00448     def test_durable_subscribe_no_id(self):
00449         d = '/topic/durable-invalid'
00450 
00451         self.conn.subscribe(destination=d, headers={'persistent':'true'}),
00452         self.listener.await(3)
00453         self.assertEquals(1, len(self.listener.errors))
00454         self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message'])
00455 
00456