Merge "tests: rabbitmq failover tests"
This commit is contained in:
commit
eef2bd058a
@ -49,11 +49,11 @@ class ConfFixture(fixtures.Fixture):
|
||||
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts',
|
||||
'oslo_messaging_rabbit')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||
'oslo_messaging._drivers.base', 'base_opts',
|
||||
'oslo_messaging_rabbit')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||
'oslo_messaging_qpid')
|
||||
'oslo_messaging_rabbit')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.amqp1_driver.opts',
|
||||
'amqp1_opts', 'oslo_messaging_amqp')
|
||||
|
140
oslo_messaging/tests/functional/test_rabbitmq.py
Normal file
140
oslo_messaging/tests/functional/test_rabbitmq.py
Normal file
@ -0,0 +1,140 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
import fixtures
|
||||
from pifpaf.drivers import rabbitmq
|
||||
|
||||
from oslo_messaging.tests.functional import utils
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
class ConnectedPortMatcher(object):
|
||||
def __init__(self, port):
|
||||
self.port = port
|
||||
|
||||
def __eq__(self, data):
|
||||
return data.get("port") == self.port
|
||||
|
||||
def __repr__(self):
|
||||
return "<ConnectedPortMatcher port=%d>" % self.port
|
||||
|
||||
|
||||
class RabbitMQFailoverTests(test_utils.BaseTestCase):
|
||||
def test_failover_scenario(self):
|
||||
# NOTE(sileht): run this test only if functionnal suite run of a driver
|
||||
# that use rabbitmq as backend
|
||||
self.driver = os.environ.get('TRANSPORT_DRIVER')
|
||||
if self.driver not in ["pika", "rabbit"]:
|
||||
self.skipTest("TRANSPORT_DRIVER is not set to a rabbit driver")
|
||||
|
||||
# NOTE(sileht): Allow only one response at a time, to
|
||||
# have only one tcp connection for reply and ensure it will failover
|
||||
# correctly
|
||||
self.config(heartbeat_timeout_threshold=1,
|
||||
rpc_conn_pool_size=1,
|
||||
kombu_reconnect_delay=0,
|
||||
rabbit_retry_interval=0,
|
||||
rabbit_retry_backoff=0,
|
||||
group='oslo_messaging_rabbit')
|
||||
|
||||
#
|
||||
self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
|
||||
port=5692))
|
||||
|
||||
self.url = self.pifpaf.env["PIFPAF_URL"]
|
||||
self.n1 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME1"]
|
||||
self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"]
|
||||
self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"]
|
||||
|
||||
# ensure connections will be establish to the first node
|
||||
self.pifpaf.stop_node(self.n2)
|
||||
self.pifpaf.stop_node(self.n3)
|
||||
|
||||
self.servers = self.useFixture(utils.RpcServerGroupFixture(
|
||||
self.conf, self.url, endpoint=self, names=["server"]))
|
||||
|
||||
# Don't randomize rabbit hosts
|
||||
self.useFixture(fixtures.MockPatch(
|
||||
'oslo_messaging._drivers.impl_rabbit.random',
|
||||
side_effect=lambda x: x))
|
||||
|
||||
# NOTE(sileht): this connects server connections and reply
|
||||
# connection to nodename n1
|
||||
self.client = self.servers.client(0)
|
||||
self.client.ping()
|
||||
self._check_ports(self.pifpaf.port)
|
||||
|
||||
# Switch to node n2
|
||||
self.pifpaf.start_node(self.n2)
|
||||
self.assertEqual("callback done", self.client.kill_and_process())
|
||||
self.assertEqual("callback done", self.client.just_process())
|
||||
self._check_ports(self.pifpaf.get_port(self.n2))
|
||||
|
||||
# Switch to node n3
|
||||
self.pifpaf.start_node(self.n3)
|
||||
time.sleep(0.1)
|
||||
self.pifpaf.kill_node(self.n2, signal=signal.SIGKILL)
|
||||
time.sleep(0.1)
|
||||
self.assertEqual("callback done", self.client.just_process())
|
||||
self._check_ports(self.pifpaf.get_port(self.n3))
|
||||
|
||||
self.pifpaf.start_node(self.n1)
|
||||
time.sleep(0.1)
|
||||
self.pifpaf.kill_node(self.n3, signal=signal.SIGKILL)
|
||||
time.sleep(0.1)
|
||||
self.assertEqual("callback done", self.client.just_process())
|
||||
self._check_ports(self.pifpaf.get_port(self.n1))
|
||||
|
||||
def kill_and_process(self, *args, **kargs):
|
||||
self.pifpaf.kill_node(self.n1, signal=signal.SIGKILL)
|
||||
time.sleep(0.1)
|
||||
return "callback done"
|
||||
|
||||
def just_process(self, *args, **kargs):
|
||||
return "callback done"
|
||||
|
||||
def _get_log_call_startswith(self, filter):
|
||||
return [call for call in self.logger.debug.mock_calls
|
||||
if call[1][0].startswith(filter)]
|
||||
|
||||
def _check_ports(self, port):
|
||||
getattr(self, '_check_ports_%s_driver' % self.driver)(port)
|
||||
|
||||
def _check_ports_pika_driver(self, port):
|
||||
rpc_server = self.servers.servers[0].server
|
||||
# FIXME(sileht): Check other connections
|
||||
connections = [
|
||||
rpc_server.listener._poll_style_listener._connection
|
||||
]
|
||||
for conn in connections:
|
||||
self.assertEqual(
|
||||
port, conn._impl.socket.getpeername()[1])
|
||||
|
||||
def _check_ports_rabbit_driver(self, port):
|
||||
rpc_server = self.servers.servers[0].server
|
||||
connection_contexts = [
|
||||
# rpc server
|
||||
rpc_server.listener._poll_style_listener.conn,
|
||||
# rpc client
|
||||
self.client.client.transport._driver._get_connection(),
|
||||
# rpc client replies waiter
|
||||
self.client.client.transport._driver._reply_q_conn,
|
||||
]
|
||||
|
||||
for cctxt in connection_contexts:
|
||||
socket = cctxt.connection.channel.connection.sock
|
||||
self.assertEqual(port, socket.getpeername()[1])
|
@ -122,7 +122,7 @@ class RpcServerFixture(fixtures.Fixture):
|
||||
|
||||
class RpcServerGroupFixture(fixtures.Fixture):
|
||||
def __init__(self, conf, url, topic=None, names=None, exchange=None,
|
||||
use_fanout_ctrl=False):
|
||||
use_fanout_ctrl=False, endpoint=None):
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
# NOTE(sileht): topic and servier_name must be uniq
|
||||
@ -133,6 +133,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
|
||||
self.exchange = exchange
|
||||
self.targets = [self._target(server=n) for n in self.names]
|
||||
self.use_fanout_ctrl = use_fanout_ctrl
|
||||
self.endpoint = endpoint
|
||||
|
||||
def setUp(self):
|
||||
super(RpcServerGroupFixture, self).setUp()
|
||||
@ -149,6 +150,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
|
||||
if self.use_fanout_ctrl:
|
||||
ctrl = self._target(fanout=True)
|
||||
server = RpcServerFixture(self.conf, self.url, target,
|
||||
endpoint=self.endpoint,
|
||||
ctrl_target=ctrl)
|
||||
return server
|
||||
|
||||
@ -277,7 +279,15 @@ class IsValidDistributionOf(object):
|
||||
class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
def setUp(self, conf=cfg.CONF):
|
||||
super(SkipIfNoTransportURL, self).setUp(conf=conf)
|
||||
self.url = os.environ.get('TRANSPORT_URL')
|
||||
|
||||
driver = os.environ.get("TRANSPORT_DRIVER")
|
||||
if driver:
|
||||
self.url = os.environ.get('PIFPAF_URL')
|
||||
if driver == "pika" and self.url:
|
||||
self.url = self.url.replace("rabbit://", "pika://")
|
||||
else:
|
||||
self.url = os.environ.get('TRANSPORT_URL')
|
||||
|
||||
if not self.url:
|
||||
self.skipTest("No transport url configured")
|
||||
|
||||
|
@ -1,32 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
. tools/functions.sh
|
||||
|
||||
DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX)
|
||||
trap "clean_exit $DATADIR" EXIT
|
||||
|
||||
export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
|
||||
export RABBITMQ_NODE_PORT=65123
|
||||
export RABBITMQ_NODENAME=oslomsg-test@localhost
|
||||
export RABBITMQ_LOG_BASE=$DATADIR
|
||||
export RABBITMQ_MNESIA_BASE=$DATADIR
|
||||
export RABBITMQ_PID_FILE=$DATADIR/pid
|
||||
export HOME=$DATADIR
|
||||
|
||||
# NOTE(sileht): We directly use the rabbitmq scripts
|
||||
# to avoid distribution check, like running as root/rabbitmq
|
||||
# enforcing.
|
||||
export PATH=/usr/lib/rabbitmq/bin/:$PATH
|
||||
|
||||
|
||||
mkfifo ${DATADIR}/out
|
||||
rabbitmq-server &> ${DATADIR}/out &
|
||||
wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out
|
||||
|
||||
rabbitmqctl add_user oslomsg oslosecret
|
||||
rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*"
|
||||
|
||||
|
||||
export TRANSPORT_URL=pika://oslomsg:oslosecret@127.0.0.1:65123//
|
||||
$*
|
@ -1,32 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
. tools/functions.sh
|
||||
|
||||
DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX)
|
||||
trap "clean_exit $DATADIR" EXIT
|
||||
|
||||
export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
|
||||
export RABBITMQ_NODE_PORT=65123
|
||||
export RABBITMQ_NODENAME=oslomsg-test@localhost
|
||||
export RABBITMQ_LOG_BASE=$DATADIR
|
||||
export RABBITMQ_MNESIA_BASE=$DATADIR
|
||||
export RABBITMQ_PID_FILE=$DATADIR/pid
|
||||
export HOME=$DATADIR
|
||||
|
||||
# NOTE(sileht): We directly use the rabbitmq scripts
|
||||
# to avoid distribution check, like running as root/rabbitmq
|
||||
# enforcing.
|
||||
export PATH=/usr/lib/rabbitmq/bin/:$PATH
|
||||
|
||||
|
||||
mkfifo ${DATADIR}/out
|
||||
rabbitmq-server &> ${DATADIR}/out &
|
||||
wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out
|
||||
|
||||
rabbitmqctl add_user oslomsg oslosecret
|
||||
rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*"
|
||||
|
||||
|
||||
export TRANSPORT_URL=rabbit://oslomsg:oslosecret@127.0.0.1:65123//
|
||||
$*
|
@ -14,7 +14,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
testscenarios>=0.4 # Apache-2.0/BSD
|
||||
testtools>=1.4.0 # MIT
|
||||
oslotest>=1.10.0 # Apache-2.0
|
||||
|
||||
pifpaf>=0.4.0 # Apache-2.0
|
||||
# for test_matchmaker_redis
|
||||
redis>=2.10.0 # MIT
|
||||
|
||||
|
9
tox.ini
9
tox.ini
@ -25,14 +25,17 @@ commands = {posargs}
|
||||
commands = python setup.py build_sphinx
|
||||
|
||||
[testenv:py27-func-rabbit]
|
||||
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
setenv = TRANSPORT_DRIVER=rabbit
|
||||
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py34-func-rabbit]
|
||||
setenv = TRANSPORT_DRIVER=rabbit
|
||||
basepython = python3.4
|
||||
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-pika]
|
||||
commands = {toxinidir}/setup-test-env-pika.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
setenv = TRANSPORT_DRIVER=pika
|
||||
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-amqp1]
|
||||
setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
|
||||
|
Loading…
x
Reference in New Issue
Block a user