Back to index

rabbitmq-server  2.8.4
parsing.py
Go to the documentation of this file.
00001 '''
00002 Few tests for a rabbitmq-stomp adaptor. They intend to increase code coverage
00003 of the erlang stomp code.
00004 '''
00005 import unittest
00006 import re
00007 import socket
00008 import functools
00009 import time
00010 import sys
00011 
00012 def connect(cnames):
00013     ''' Decorator that creates stomp connections and issues CONNECT '''
00014     cmd=('CONNECT\n'
00015         'login:guest\n'
00016         'passcode:guest\n'
00017         '\n'
00018         '\n\0')
00019     resp = ('CONNECTED\n'
00020             'session:(.*)\n'
00021             'heart-beat:0,0\n'
00022             'server:RabbitMQ/(.*)\n'
00023             'version:1.0\n'
00024             '\n\x00')
00025     def w(m):
00026         @functools.wraps(m)
00027         def wrapper(self, *args, **kwargs):
00028             for cname in cnames:
00029                 sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00030                 sd.settimeout(30000)
00031                 sd.connect((self.host, self.port))
00032                 sd.sendall(cmd)
00033                 self.match(resp, sd.recv(4096))
00034                 setattr(self, cname, sd)
00035             try:
00036                 r = m(self, *args, **kwargs)
00037             finally:
00038                 for cname in cnames:
00039                     try:
00040                         getattr(self, cname).close()
00041                     except IOError:
00042                         pass
00043             return r
00044         return wrapper
00045     return w
00046 
00047 
00048 class TestParsing(unittest.TestCase):
00049     host='127.0.0.1'
00050     port=61613
00051 
00052 
00053     def match(self, pattern, data):
00054         ''' helper: try to match 'pattern' regexp with 'data' string.
00055             Fail test if they don't match.
00056         '''
00057         matched = re.match(pattern, data)
00058         if matched:
00059             return matched.groups()
00060         self.assertTrue(False, 'No match:\n%r\n%r' % (pattern, data) )
00061 
00062     def recv_atleast(self, bufsize):
00063         recvhead = []
00064         rl = bufsize
00065         while rl > 0:
00066             buf = self.cd.recv(rl)
00067             bl = len(buf)
00068             if bl==0: break
00069             recvhead.append( buf )
00070             rl -= bl
00071         return ''.join(recvhead)
00072 
00073 
00074     @connect(['cd'])
00075     def test_newline_after_nul(self):
00076         self.cd.sendall('\n'
00077                         'SUBSCRIBE\n'
00078                         'destination:/exchange/amq.fanout\n'
00079                         '\n\x00\n'
00080                         'SEND\n'
00081                         'content-type:text/plain\n'
00082                         'destination:/exchange/amq.fanout\n\n'
00083                         'hello\n\x00\n')
00084         resp = ('MESSAGE\n'
00085                 'content-type:text/plain\n'
00086                 'destination:/exchange/amq.fanout\n'
00087                 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
00088                 'content-length:6\n'
00089                 '\n'
00090                 'hello\n\0')
00091         self.match(resp, self.cd.recv(4096))
00092 
00093     @connect(['cd'])
00094     def test_send_without_content_type(self):
00095         self.cd.sendall('\n'
00096                         'SUBSCRIBE\n'
00097                         'destination:/exchange/amq.fanout\n'
00098                         '\n\x00\n'
00099                         'SEND\n'
00100                         'destination:/exchange/amq.fanout\n\n'
00101                         'hello\n\x00')
00102         resp = ('MESSAGE\n'
00103                 'destination:/exchange/amq.fanout\n'
00104                 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
00105                 'content-length:6\n'
00106                 '\n'
00107                 'hello\n\0')
00108         self.match(resp, self.cd.recv(4096))
00109 
00110     @connect(['cd'])
00111     def test_send_without_content_type_binary(self):
00112         msg = u'\u0ca0\ufffd\x00\n\x01hello\x00'.encode('utf-8')
00113         self.cd.sendall('\n'
00114                         'SUBSCRIBE\n'
00115                         'destination:/exchange/amq.fanout\n'
00116                         '\n\x00\n'
00117                         'SEND\n'
00118                         'destination:/exchange/amq.fanout\n'
00119                         'content-length:'+str(len(msg))+'\n\n'
00120                         + msg + '\x00')
00121         resp = ('MESSAGE\n'
00122                 'destination:/exchange/amq.fanout\n'
00123                 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
00124                 'content-length:'+str(len(msg))+'\n'
00125                 '\n'
00126                 + msg + '\0')
00127         self.match(resp, self.cd.recv(4096))
00128 
00129     @connect(['cd'])
00130     def test_newline_after_nul_and_leading_nul(self):
00131         self.cd.sendall('\n'
00132                         '\x00SUBSCRIBE\n'
00133                         'destination:/exchange/amq.fanout\n'
00134                         '\n\x00\n'
00135                         '\x00SEND\n'
00136                         'destination:/exchange/amq.fanout\n'
00137                         'content-type:text/plain\n'
00138                         '\nhello\n\x00\n')
00139         resp = ('MESSAGE\n'
00140                 'content-type:text/plain\n'
00141                 'destination:/exchange/amq.fanout\n'
00142                 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
00143                 'content-length:6\n'
00144                 '\n'
00145                 'hello\n\0')
00146         self.match(resp, self.cd.recv(4096))
00147 
00148     @connect(['cd'])
00149     def test_bad_command(self):
00150         ''' Trigger an error message. '''
00151         self.cd.sendall('WRONGCOMMAND\n'
00152                         'destination:a\n'
00153                         'exchange:amq.fanout\n'
00154                         '\n\0')
00155         resp = ('ERROR\n'
00156                 'message:Bad command\n'
00157                 'content-type:text/plain\n'
00158                 'version:1.0,1.1\n'
00159                 'content-length:43\n'
00160                 '\n'
00161                 'Could not interpret command "WRONGCOMMAND"\n'
00162                 '\0')
00163         self.match(resp, self.cd.recv(4096))
00164 
00165     @connect(['sd', 'cd1', 'cd2'])
00166     def test_broadcast(self):
00167         ''' Single message should be delivered to two consumers:
00168             amq.topic --routing_key--> first_queue --> first_connection
00169                      \--routing_key--> second_queue--> second_connection
00170         '''
00171         subscribe=( 'SUBSCRIBE\n'
00172                     'id: XsKNhAf\n'
00173                     'destination:/exchange/amq.topic/da9d4779\n'
00174                     '\n\0')
00175         for cd in [self.cd1, self.cd2]:
00176             cd.sendall(subscribe)
00177 
00178         time.sleep(0.1)
00179 
00180         self.sd.sendall('SEND\n'
00181                         'content-type:text/plain\n'
00182                         'destination:/exchange/amq.topic/da9d4779\n'
00183                         '\n'
00184                         'message'
00185                         '\n\0')
00186 
00187         resp=('MESSAGE\n'
00188             'content-type:text/plain\n'
00189             'subscription:(.*)\n'
00190             'destination:/topic/da9d4779\n'
00191             'message-id:(.*)\n'
00192             'content-length:8\n'
00193             '\n'
00194             'message'
00195             '\n\x00')
00196         for cd in [self.cd1, self.cd2]:
00197             self.match(resp, cd.recv(4096))
00198 
00199 
00200     @connect(['cd'])
00201     def test_huge_message(self):
00202         ''' Test sending/receiving huge (16MB) message. '''
00203         subscribe=( 'SUBSCRIBE\n'
00204                     'id: xxx\n'
00205                     'destination:/exchange/amq.topic/test_huge_message\n'
00206                     '\n\0')
00207         self.cd.sendall(subscribe)
00208 
00209         message = 'x' * 1024*1024*16
00210 
00211         self.cd.sendall('SEND\n'
00212                         'destination:/exchange/amq.topic/test_huge_message\n'
00213                         'content-type:text/plain\n'
00214                         '\n'
00215                         '%s'
00216                         '\0' % message)
00217 
00218         resp=('MESSAGE\n'
00219             'content-type:text/plain\n'
00220             'subscription:(.*)\n'
00221             'destination:/topic/test_huge_message\n'
00222             'message-id:(.*)\n'
00223             'content-length:%i\n'
00224             '\n'
00225             '%s(.*)'
00226              % (len(message), message[:8000]) )
00227 
00228         recv = []
00229         s = 0
00230         while len(recv) < 1 or recv[-1][-1] != '\0':
00231             buf =  self.cd.recv(4096*16)
00232             s += len(buf)
00233             recv.append( buf )
00234         buf = ''.join(recv)
00235 
00236         # matching 100MB regexp is way too expensive.
00237         self.match(resp, buf[:8192])
00238         self.assertEqual(len(buf) > len(message), True)
00239 
00240     @connect(['cd'])
00241     def test_message_with_embedded_nulls(self):
00242         ''' Test sending/receiving message with embedded nulls. '''
00243         dest='destination:/exchange/amq.topic/test_embed_nulls_message\n'
00244         resp_dest='destination:/topic/test_embed_nulls_message\n'
00245         subscribe=( 'SUBSCRIBE\n'
00246                     'id:xxx\n'
00247                     +dest+
00248                     '\n\0')
00249         self.cd.sendall(subscribe)
00250 
00251         boilerplate = '0123456789'*1024 # large enough boilerplate
00252         message = '01'
00253         oldi = 2
00254         for i in [5, 90, 256-1, 384-1, 512, 1024, 1024+256+64+32]:
00255             message = message + '\0' + boilerplate[oldi+1:i]
00256             oldi = i
00257         msg_len = len(message)
00258 
00259         self.cd.sendall('SEND\n'
00260                         +dest+
00261                         'content-type:text/plain\n'
00262                         'content-length:%i\n'
00263                         '\n'
00264                         '%s'
00265                         '\0' % (len(message), message) )
00266 
00267         headresp=('MESSAGE\n'            # 8
00268             'content-type:text/plain\n'  # 24
00269             'subscription:(.*)\n'        # 14 + subscription
00270             +resp_dest+                  # 44
00271             'message-id:(.*)\n'          # 12 + message-id
00272             'content-length:%i\n'        # 16 + 4==len('1024')
00273             '\n'                         # 1
00274             '(.*)$'                      # prefix of body+null (potentially)
00275              % len(message) )
00276         headlen = 8 + 24 + 14 + (3) + 44 + 12 + (48) + 16 + (4) + 1 + (1)
00277 
00278         headbuf = self.recv_atleast(headlen)
00279         self.assertFalse(len(headbuf) == 0)
00280 
00281         (sub, msg_id, bodyprefix) = self.match(headresp, headbuf)
00282         bodyresp=( '%s\0' % message )
00283         bodylen = len(bodyresp);
00284 
00285         bodybuf = ''.join([bodyprefix,
00286                            self.recv_atleast(bodylen - len(bodyprefix))])
00287 
00288         self.assertEqual(len(bodybuf), msg_len+1,
00289             "body received not the same length as message sent")
00290         self.assertEqual(bodybuf, bodyresp,
00291             "   body (...'%s')\nincorrectly returned as (...'%s')"
00292             % (bodyresp[-10:], bodybuf[-10:]))
00293 
00294     @connect(['cd'])
00295     def test_message_in_packets(self):
00296         ''' Test sending/receiving message in packets. '''
00297         base_dest='topic/test_embed_nulls_message\n'
00298         dest='destination:/exchange/amq.' + base_dest
00299         resp_dest='destination:/'+ base_dest
00300         subscribe=( 'SUBSCRIBE\n'
00301                     'id:xxx\n'
00302                     +dest+
00303                     '\n\0')
00304         self.cd.sendall(subscribe)
00305 
00306         boilerplate = '0123456789'*1024 # large enough boilerplate
00307 
00308         message = boilerplate[:1024 + 512 + 256 + 32]
00309         msg_len = len(message)
00310 
00311         msg_to_send = ('SEND\n'
00312                        +dest+
00313                        'content-type:text/plain\n'
00314                        '\n'
00315                        '%s'
00316                        '\0' % (message) )
00317         packet_size = 191
00318         part_index = 0
00319         msg_to_send_len = len(msg_to_send)
00320         while part_index < msg_to_send_len:
00321             part = msg_to_send[part_index:part_index+packet_size]
00322             time.sleep(0.1)
00323             self.cd.sendall(part)
00324             part_index += packet_size
00325 
00326         headresp=('MESSAGE\n'           # 8
00327             'content-type:text/plain\n' # 24
00328             'subscription:(.*)\n'       # 14 + subscription
00329             +resp_dest+                 # 44
00330             'message-id:(.*)\n'         # 12 + message-id
00331             'content-length:%i\n'       # 16 + 4==len('1024')
00332             '\n'                        # 1
00333             '(.*)$'                     # prefix of body+null (potentially)
00334              % len(message) )
00335         headlen = 8 + 24 + 14 + (3) + 44 + 12 + (48) + 16 + (4) + 1 + (1)
00336 
00337         headbuf = self.recv_atleast(headlen)
00338         self.assertFalse(len(headbuf) == 0)
00339 
00340         (sub, msg_id, bodyprefix) = self.match(headresp, headbuf)
00341         bodyresp=( '%s\0' % message )
00342         bodylen = len(bodyresp);
00343 
00344         bodybuf = ''.join([bodyprefix,
00345                            self.recv_atleast(bodylen - len(bodyprefix))])
00346 
00347         self.assertEqual(len(bodybuf), msg_len+1,
00348             "body received not the same length as message sent")
00349         self.assertEqual(bodybuf, bodyresp,
00350             "   body ('%s')\nincorrectly returned as ('%s')"
00351             % (bodyresp, bodybuf))