Back to index

rabbitmq-server  2.8.4
Public Member Functions
rabbitmqadmin-test.TestRabbitMQAdmin Class Reference

List of all members.

Public Member Functions

def test_no_args
def test_help
def test_host
def test_port
def test_user
def test_fmt_long
def test_fmt_kvp
def test_fmt_tsv
def test_fmt_table
def test_fmt_bash
def test_vhosts
def test_users
def test_permissions
def test_alt_vhost
def test_exchanges
def test_queues
def test_bindings
def test_publish
def test_ignore_vhost
def run_success
def run_fail
def assert_output
def assert_list
def assert_table

Detailed Description

Definition at line 12 of file rabbitmqadmin-test.py.


Member Function Documentation

def rabbitmqadmin-test.TestRabbitMQAdmin.assert_list (   self,
  expected,
  args0 
)

Definition at line 159 of file rabbitmqadmin-test.py.

00159 
00160     def assert_list(self, expected, args0):
00161         args = ['-f', 'tsv', '-q']
00162         args.extend(args0)
00163         self.assertEqual(expected, run(args)[0].splitlines())

Here is the call graph for this function:

Here is the caller graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.assert_output (   self,
  expected,
  args 
)

Definition at line 156 of file rabbitmqadmin-test.py.

00156 
00157     def assert_output(self, expected, args):
00158         self.assertEqual(expected, run(args)[0])

Here is the call graph for this function:

Here is the caller graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.assert_table (   self,
  expected,
  args0 
)

Definition at line 164 of file rabbitmqadmin-test.py.

00164 
00165     def assert_table(self, expected, args0):
00166         args = ['-f', 'tsv', '-q']
00167         args.extend(args0)
00168         self.assertEqual(expected, [l.split('\t') for l in run(args)[0].splitlines()])

Here is the call graph for this function:

Here is the caller graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.run_fail (   self,
  args 
)

Definition at line 153 of file rabbitmqadmin-test.py.

00153 
00154     def run_fail(self, args):
00155         self.assertNotEqual(0, run(args)[1])

Here is the call graph for this function:

Here is the caller graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.run_success (   self,
  args,
  kwargs 
)

Definition at line 150 of file rabbitmqadmin-test.py.

00150 
00151     def run_success(self, args, **kwargs):
00152         self.assertEqual(0, run(args, **kwargs)[1])

Here is the call graph for this function:

Here is the caller graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_alt_vhost (   self)

Definition at line 96 of file rabbitmqadmin-test.py.

00096 
00097     def test_alt_vhost(self):
00098         self.run_success(['declare', 'vhost', 'name=foo'])
00099         self.run_success(['declare', 'permission', 'user=guest', 'vhost=foo', 'configure=.*', 'write=.*', 'read=.*'])
00100         self.run_success(['declare', 'queue', 'name=in_/'])
00101         self.run_success(['--vhost', 'foo', 'declare', 'queue', 'name=in_foo'])
00102         self.assert_table([['/', 'in_/'], ['foo', 'in_foo']], ['list', 'queues', 'vhost', 'name'])
00103         self.run_success(['--vhost', 'foo', 'delete', 'queue', 'name=in_foo'])
00104         self.run_success(['delete', 'queue', 'name=in_/'])
00105         self.run_success(['delete', 'vhost', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_bindings (   self)

Definition at line 116 of file rabbitmqadmin-test.py.

00116 
00117     def test_bindings(self):
00118         self.run_success(['declare', 'queue', 'name=foo'])
00119         self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=foo', 'destination_type=queue', 'routing_key=test'])
00120         self.assert_table([['', 'foo', 'queue', 'foo'], ['amq.direct', 'foo', 'queue', 'test']], ['list', 'bindings', 'source', 'destination', 'destination_type', 'routing_key'])
00121         self.run_success(['delete', 'queue', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_exchanges (   self)

Definition at line 106 of file rabbitmqadmin-test.py.

00106 
00107     def test_exchanges(self):
00108         self.run_success(['declare', 'exchange', 'name=foo', 'type=direct'])
00109         self.assert_list(['', 'amq.direct', 'amq.fanout', 'amq.headers', 'amq.match', 'amq.rabbitmq.log', 'amq.rabbitmq.trace', 'amq.topic', 'foo'], l('exchanges'))
00110         self.run_success(['delete', 'exchange', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_fmt_bash (   self)

Definition at line 68 of file rabbitmqadmin-test.py.

00068 
00069     def test_fmt_bash(self):
00070         self.assert_output("""/
00071 """, ['--format', 'bash', 'list', 'vhosts'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_fmt_kvp (   self)

Definition at line 49 of file rabbitmqadmin-test.py.

00049 
00050     def test_fmt_kvp(self):
00051         self.assert_output("""name="/" tracing="False"
00052 """, ['--format', 'kvp', 'list', 'vhosts'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_fmt_long (   self)

Definition at line 38 of file rabbitmqadmin-test.py.

00038 
00039     def test_fmt_long(self):
00040         self.assert_output("""
00041 --------------------------------------------------------------------------------
00042 
00043    name: /
00044 tracing: False
00045 
00046 --------------------------------------------------------------------------------
00047 
00048 """, ['--format', 'long', 'list', 'vhosts'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_fmt_table (   self)

Definition at line 58 of file rabbitmqadmin-test.py.

00058 
00059     def test_fmt_table(self):
00060         out = """+------+---------+
00061 | name | tracing |
00062 +------+---------+
00063 | /    | False   |
00064 +------+---------+
00065 """
00066         self.assert_output(out, ['list', 'vhosts'])
00067         self.assert_output(out, ['--format', 'table', 'list', 'vhosts'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_fmt_tsv (   self)

Definition at line 53 of file rabbitmqadmin-test.py.

00053 
00054     def test_fmt_tsv(self):
00055         self.assert_output("""name tracing
00056 /      False
00057 """, ['--format', 'tsv', 'list', 'vhosts'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_help (   self)

Definition at line 16 of file rabbitmqadmin-test.py.

00016 
00017     def test_help(self):
00018         self.run_success(['--help'])
00019         self.run_success(['help', 'subcommands'])
00020         self.run_success(['help', 'config'])
00021         self.run_fail(['help', 'astronomy'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_host (   self)

Definition at line 22 of file rabbitmqadmin-test.py.

00022 
00023     def test_host(self):
00024         self.run_success(['show', 'overview'])
00025         self.run_success(['--host', 'localhost', 'show', 'overview'])
00026         self.run_success(['--host', socket.gethostname(), 'show', 'overview'])
00027         self.run_fail(['--host', 'some-host-that-does-not-exist', 'show', 'overview'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_ignore_vhost (   self)

Definition at line 139 of file rabbitmqadmin-test.py.

00139 
00140     def test_ignore_vhost(self):
00141         self.run_success(['--vhost', '/', 'show', 'overview'])
00142         self.run_success(['--vhost', '/', 'list', 'users'])
00143         self.run_success(['--vhost', '/', 'list', 'vhosts'])
00144         self.run_success(['--vhost', '/', 'list', 'nodes'])
00145         self.run_success(['--vhost', '/', 'list', 'permissions'])
00146         self.run_success(['--vhost', '/', 'declare', 'user', 'name=foo', 'password=pass', 'tags='])
00147         self.run_success(['delete', 'user', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_no_args (   self)

Definition at line 13 of file rabbitmqadmin-test.py.

00013 
00014     def test_no_args(self):
00015         self.run_fail([])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_permissions (   self)

Definition at line 87 of file rabbitmqadmin-test.py.

00087 
00088     def test_permissions(self):
00089         self.run_success(['declare', 'vhost', 'name=foo'])
00090         self.run_success(['declare', 'user', 'name=bar', 'password=pass', 'tags='])
00091         self.assert_table([['guest', '/']], ['list', 'permissions', 'user', 'vhost'])
00092         self.run_success(['declare', 'permission', 'user=bar', 'vhost=foo', 'configure=.*', 'write=.*', 'read=.*'])
00093         self.assert_table([['guest', '/'], ['bar', 'foo']], ['list', 'permissions', 'user', 'vhost'])
00094         self.run_success(['delete', 'user', 'name=bar'])
00095         self.run_success(['delete', 'vhost', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_port (   self)

Definition at line 28 of file rabbitmqadmin-test.py.

00028 
00029     def test_port(self):
00030         self.run_success(['--port', '55672', 'show', 'overview'])
00031         self.run_fail(['--port', '55673', 'show', 'overview'])
00032         self.run_fail(['--port', '5672', 'show', 'overview'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_publish (   self)

Definition at line 122 of file rabbitmqadmin-test.py.

00122 
00123     def test_publish(self):
00124         self.run_success(['declare', 'queue', 'name=test'])
00125         self.run_success(['publish', 'routing_key=test', 'payload=test_1'])
00126         self.run_success(['publish', 'routing_key=test', 'payload=test_2'])
00127         self.run_success(['publish', 'routing_key=test'], stdin='test_3')
00128         self.assert_table([exp_msg('test', 2, False, 'test_1')], ['get', 'queue=test', 'requeue=false'])
00129         self.assert_table([exp_msg('test', 1, False, 'test_2')], ['get', 'queue=test', 'requeue=true'])
00130         self.assert_table([exp_msg('test', 1, True,  'test_2')], ['get', 'queue=test', 'requeue=false'])
00131         self.assert_table([exp_msg('test', 0, False, 'test_3')], ['get', 'queue=test', 'requeue=false'])
00132         self.run_success(['publish', 'routing_key=test'], stdin='test_4')
00133         filename = '/tmp/rabbitmq-test/get.txt'
00134         self.run_success(['get', 'queue=test', 'requeue=false', 'payload_file=' + filename])
00135         with open(filename) as f:
00136             self.assertEqual('test_4', f.read())
00137         os.remove(filename)
00138         self.run_success(['delete', 'queue', 'name=test'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_queues (   self)

Definition at line 111 of file rabbitmqadmin-test.py.

00111 
00112     def test_queues(self):
00113         self.run_success(['declare', 'queue', 'name=foo'])
00114         self.assert_list(['foo'], l('queues'))
00115         self.run_success(['delete', 'queue', 'name=foo'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_user (   self)

Definition at line 33 of file rabbitmqadmin-test.py.

00033 
00034     def test_user(self):
00035         self.run_success(['--user', 'guest', '--password', 'guest', 'show', 'overview'])
00036         self.run_fail(['--user', 'no', '--password', 'guest', 'show', 'overview'])
00037         self.run_fail(['--user', 'guest', '--password', 'no', 'show', 'overview'])

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_users (   self)

Definition at line 79 of file rabbitmqadmin-test.py.

00079 
00080     def test_users(self):
00081         self.assert_list(['guest'], l('users'))
00082         self.run_fail(['declare', 'user', 'name=foo'])
00083         self.run_success(['declare', 'user', 'name=foo', 'password=pass', 'tags='])
00084         self.assert_list(['foo', 'guest'], l('users'))
00085         self.run_success(['delete', 'user', 'name=foo'])
00086         self.assert_list(['guest'], l('users'))

Here is the call graph for this function:

def rabbitmqadmin-test.TestRabbitMQAdmin.test_vhosts (   self)

Definition at line 72 of file rabbitmqadmin-test.py.

00072 
00073     def test_vhosts(self):
00074         self.assert_list(['/'], l('vhosts'))
00075         self.run_success(['declare', 'vhost', 'name=foo'])
00076         self.assert_list(['/', 'foo'], l('vhosts'))
00077         self.run_success(['delete', 'vhost', 'name=foo'])
00078         self.assert_list(['/'], l('vhosts'))

Here is the call graph for this function:


The documentation for this class was generated from the following file: