Merge "remove zmq tests"
This commit is contained in:
commit
415e9b991b
.zuul.yaml
oslo_messaging/tests
84
.zuul.yaml
84
.zuul.yaml
@ -18,27 +18,6 @@
|
||||
tox_envlist: py27-func-rabbit
|
||||
bindep_profile: rabbit
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tox-py27-func-zmq
|
||||
parent: openstack-tox-py27
|
||||
vars:
|
||||
tox_envlist: py27-func-zmq
|
||||
bindep_profile: zmq
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tox-py27-func-zmq-proxy
|
||||
parent: openstack-tox-py27
|
||||
vars:
|
||||
tox_envlist: py27-func-zmq-proxy
|
||||
bindep_profile: zmq
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tox-py27-func-zmq-pubsub
|
||||
parent: openstack-tox-py27
|
||||
vars:
|
||||
tox_envlist: py27-func-zmq-pubsub
|
||||
bindep_profile: zmq
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tox-py35-func-amqp1
|
||||
parent: openstack-tox-py35
|
||||
@ -52,13 +31,6 @@
|
||||
tox_envlist: py35-func-rabbit
|
||||
bindep_profile: rabbit
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tox-py35-func-zmq
|
||||
parent: openstack-tox-py35
|
||||
vars:
|
||||
tox_envlist: py35-func-zmq
|
||||
bindep_profile: zmq
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-src-dsvm-full-rabbit-default
|
||||
parent: legacy-dsvm-base
|
||||
@ -115,17 +87,6 @@
|
||||
- openstack/devstack-plugin-kafka
|
||||
- openstack/oslo.messaging
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-src-dsvm-full-zmq-default
|
||||
parent: legacy-dsvm-base
|
||||
run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml
|
||||
post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml
|
||||
timeout: 10800
|
||||
required-projects:
|
||||
- openstack-infra/devstack-gate
|
||||
- openstack/devstack-plugin-zmq
|
||||
- openstack/oslo.messaging
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-src-grenade-dsvm
|
||||
parent: legacy-dsvm-base
|
||||
@ -194,24 +155,6 @@
|
||||
- openstack/dib-utils
|
||||
- openstack/diskimage-builder
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-telemetry-dsvm-integration-zmq
|
||||
parent: legacy-dsvm-base
|
||||
run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
|
||||
post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
|
||||
timeout: 4200
|
||||
required-projects:
|
||||
- openstack-infra/devstack-gate
|
||||
- openstack/aodh
|
||||
- openstack/ceilometer
|
||||
- openstack/devstack-plugin-zmq
|
||||
- openstack/oslo.messaging
|
||||
- openstack/panko
|
||||
# following are required when DEVSTACK_GATE_HEAT, which this
|
||||
# job turns on
|
||||
- openstack/dib-utils
|
||||
- openstack/diskimage-builder
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-telemetry-dsvm-integration-rabbit
|
||||
parent: legacy-dsvm-base
|
||||
@ -270,19 +213,6 @@
|
||||
- openstack/oslo.messaging
|
||||
- openstack/tempest
|
||||
|
||||
- job:
|
||||
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
|
||||
parent: legacy-dsvm-base
|
||||
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
|
||||
post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml
|
||||
timeout: 7800
|
||||
required-projects:
|
||||
- openstack-infra/devstack-gate
|
||||
- openstack/devstack-plugin-zmq
|
||||
- openstack/neutron
|
||||
- openstack/oslo.messaging
|
||||
- openstack/tempest
|
||||
|
||||
|
||||
- project:
|
||||
check:
|
||||
@ -292,18 +222,10 @@
|
||||
- oslo.messaging-tox-py27-func-kafka:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py27-func-rabbit
|
||||
- oslo.messaging-tox-py27-func-zmq-proxy:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py27-func-zmq-pubsub:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py27-func-zmq:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py35-func-amqp1:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py35-func-rabbit:
|
||||
voting: false
|
||||
- oslo.messaging-tox-py35-func-zmq:
|
||||
voting: false
|
||||
|
||||
- oslo.messaging-src-dsvm-full-rabbit-default
|
||||
- oslo.messaging-src-dsvm-full-amqp1-hybrid:
|
||||
@ -316,8 +238,6 @@
|
||||
voting: false
|
||||
- oslo.messaging-src-dsvm-full-kafka-default:
|
||||
voting: false
|
||||
- oslo.messaging-src-dsvm-full-zmq-default:
|
||||
voting: false
|
||||
|
||||
- oslo.messaging-src-grenade-dsvm:
|
||||
voting: false
|
||||
@ -329,8 +249,6 @@
|
||||
voting: false
|
||||
- oslo.messaging-telemetry-dsvm-integration-kafka:
|
||||
voting: false
|
||||
- oslo.messaging-telemetry-dsvm-integration-zmq:
|
||||
voting: false
|
||||
|
||||
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
|
||||
- oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid:
|
||||
@ -338,8 +256,6 @@
|
||||
branches: ^(?!stable/ocata).*$
|
||||
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
|
||||
voting: false
|
||||
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
|
||||
voting: false
|
||||
|
||||
gate:
|
||||
jobs:
|
||||
|
@ -1,140 +0,0 @@
|
||||
# Copyright 2014 Canonical, Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
from stevedore import driver
|
||||
import testscenarios
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
from oslo_utils import importutils
|
||||
|
||||
redis = importutils.try_import('redis')
|
||||
|
||||
|
||||
def redis_available():
|
||||
'''Helper to see if local redis server is running'''
|
||||
if not redis:
|
||||
return False
|
||||
try:
|
||||
redis.StrictRedis(socket_timeout=1).ping()
|
||||
return True
|
||||
except redis.exceptions.ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
class TestImplMatchmaker(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
("dummy", {"rpc_zmq_matchmaker": "dummy"}),
|
||||
("redis", {"rpc_zmq_matchmaker": "redis"}),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestImplMatchmaker, self).setUp()
|
||||
|
||||
if self.rpc_zmq_matchmaker == "redis":
|
||||
if not redis_available():
|
||||
self.skipTest("redis unavailable")
|
||||
|
||||
self.test_matcher = driver.DriverManager(
|
||||
'oslo.messaging.zmq.matchmaker',
|
||||
self.rpc_zmq_matchmaker,
|
||||
).driver(self.conf)
|
||||
|
||||
if self.rpc_zmq_matchmaker == "redis":
|
||||
for redis_instance in self.test_matcher._redis_instances:
|
||||
self.addCleanup(redis_instance.flushdb)
|
||||
|
||||
self.target = oslo_messaging.Target(topic="test_topic")
|
||||
self.host1 = b"test_host1"
|
||||
self.host2 = b"test_host2"
|
||||
|
||||
def test_register(self):
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertEqual([self.host1],
|
||||
self.test_matcher.get_hosts(self.target, "test"))
|
||||
|
||||
def test_register_two_hosts(self):
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host2,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1, self.host2])
|
||||
|
||||
def test_register_unregister(self):
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host2,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.test_matcher.unregister(self.target, self.host2, "test")
|
||||
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1])
|
||||
|
||||
def test_register_two_same_hosts(self):
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertEqual([self.host1],
|
||||
self.test_matcher.get_hosts(self.target, "test"))
|
||||
|
||||
def test_get_hosts_wrong_topic(self):
|
||||
target = oslo_messaging.Target(topic="no_such_topic")
|
||||
self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
|
||||
|
||||
def test_handle_redis_package_error(self):
|
||||
if self.rpc_zmq_matchmaker == "redis":
|
||||
# move 'redis' variable to prevent this case affect others
|
||||
module = inspect.getmodule(self.test_matcher)
|
||||
redis_package = module.redis
|
||||
|
||||
# 'redis' variable is set to None, when package importing is failed
|
||||
module.redis = None
|
||||
self.assertRaises(ImportError, self.test_matcher.__init__,
|
||||
self.conf)
|
||||
|
||||
# retrieve 'redis' variable which is set originally
|
||||
module.redis = redis_package
|
@ -1,129 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging.tests.drivers.zmq import zmq_common
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(ZmqTestPortsRange, self).setUp()
|
||||
|
||||
# Set config values
|
||||
kwargs = {'rpc_zmq_min_port': 5555,
|
||||
'rpc_zmq_max_port': 5560}
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
|
||||
def test_ports_range(self):
|
||||
listeners = []
|
||||
|
||||
for i in range(10):
|
||||
try:
|
||||
target = oslo_messaging.Target(topic='testtopic_' + str(i))
|
||||
new_listener = self.driver.listen(target, None, None)
|
||||
listeners.append(new_listener)
|
||||
except zmq_socket.ZmqPortBusy:
|
||||
pass
|
||||
|
||||
self.assertLessEqual(len(listeners), 5)
|
||||
|
||||
for l in listeners:
|
||||
l.cleanup()
|
||||
|
||||
|
||||
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestConfZmqDriverLoad, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
|
||||
|
||||
|
||||
class TestZmqBasics(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqBasics, self).setUp()
|
||||
self.target = oslo_messaging.Target(topic='topic')
|
||||
self.ctxt = {'key': 'value'}
|
||||
self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
|
||||
|
||||
def test_send_call_without_method_failure(self):
|
||||
self.message.pop('method')
|
||||
self.listener.listen(self.target)
|
||||
self.assertRaises(KeyError, self.driver.send,
|
||||
self.target, self.ctxt, self.message,
|
||||
wait_for_reply=True, timeout=10)
|
||||
|
||||
def _check_listener_received(self):
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.ctxt, self.listener.message.ctxt)
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
|
||||
def test_send_call_success(self):
|
||||
self.listener.listen(self.target)
|
||||
result = self.driver.send(self.target, self.ctxt, self.message,
|
||||
wait_for_reply=True, timeout=10)
|
||||
self.assertTrue(result)
|
||||
self._check_listener_received()
|
||||
|
||||
def test_send_call_direct_success(self):
|
||||
self.target.server = 'server'
|
||||
self.listener.listen(self.target)
|
||||
result = self.driver.send(self.target, self.ctxt, self.message,
|
||||
wait_for_reply=True, timeout=10)
|
||||
self.assertTrue(result)
|
||||
self._check_listener_received()
|
||||
|
||||
def test_send_cast_direct_success(self):
|
||||
self.target.server = 'server'
|
||||
self.listener.listen(self.target)
|
||||
result = self.driver.send(self.target, self.ctxt, self.message,
|
||||
wait_for_reply=False)
|
||||
self.listener._received.wait(5)
|
||||
self.assertIsNone(result)
|
||||
self._check_listener_received()
|
||||
|
||||
def test_send_fanout_success(self):
|
||||
self.target.fanout = True
|
||||
self.listener.listen(self.target)
|
||||
result = self.driver.send(self.target, self.ctxt, self.message,
|
||||
wait_for_reply=False)
|
||||
self.listener._received.wait(5)
|
||||
self.assertIsNone(result)
|
||||
self._check_listener_received()
|
||||
|
||||
def test_send_notify_success(self):
|
||||
self.listener.listen_notifications([(self.target, 'info')])
|
||||
self.target.topic += '.info'
|
||||
result = self.driver.send_notification(self.target, self.ctxt,
|
||||
self.message, '3.0')
|
||||
self.listener._received.wait(5)
|
||||
self.assertIsNone(result)
|
||||
self._check_listener_received()
|
@ -1,150 +0,0 @@
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import msgpack
|
||||
import six
|
||||
import testscenarios
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.central \
|
||||
import zmq_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_version
|
||||
from oslo_messaging.tests.drivers.zmq import zmq_common
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
|
||||
title='ZeroMQ proxy options')
|
||||
cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
|
||||
|
||||
|
||||
class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
LISTENERS_COUNT = 3
|
||||
|
||||
scenarios = [
|
||||
('json', {'serialization': 'json',
|
||||
'dumps': lambda obj: six.b(json.dumps(obj))}),
|
||||
('msgpack', {'serialization': 'msgpack',
|
||||
'dumps': msgpack.dumps})
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestPubSub, self).setUp()
|
||||
|
||||
kwargs = {'use_pub_sub': True,
|
||||
'rpc_zmq_serialization': self.serialization}
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
|
||||
self.config(host="127.0.0.1", group="zmq_proxy_opts")
|
||||
self.config(publisher_port=0, group="zmq_proxy_opts")
|
||||
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
self.conf, self.driver.matchmaker)
|
||||
self.driver.matchmaker.register_publisher(
|
||||
(self.publisher.host, ''),
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.listeners = []
|
||||
for _ in range(self.LISTENERS_COUNT):
|
||||
self.listeners.append(zmq_common.TestServerListener(self.driver))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestPubSub, self).tearDown()
|
||||
self.publisher.cleanup()
|
||||
for listener in self.listeners:
|
||||
listener.stop()
|
||||
|
||||
def _send_request(self, target):
|
||||
# Needed only in test env to give listener a chance to connect
|
||||
# before request fires
|
||||
time.sleep(1)
|
||||
context = {}
|
||||
message = {'method': 'hello-world'}
|
||||
|
||||
self.publisher.send_request(
|
||||
[b"reply_id",
|
||||
b'',
|
||||
six.b(zmq_version.MESSAGE_VERSION),
|
||||
six.b(str(zmq_names.CAST_FANOUT_TYPE)),
|
||||
zmq_address.target_to_subscribe_filter(target),
|
||||
b"message_id",
|
||||
self.dumps([context, message])]
|
||||
)
|
||||
|
||||
def _check_listener(self, listener):
|
||||
listener._received.wait(timeout=5)
|
||||
self.assertTrue(listener._received.isSet())
|
||||
method = listener.message.message[u'method']
|
||||
self.assertEqual(u'hello-world', method)
|
||||
|
||||
def _check_listener_negative(self, listener):
|
||||
listener._received.wait(timeout=1)
|
||||
self.assertFalse(listener._received.isSet())
|
||||
|
||||
def test_single_listener(self):
|
||||
target = oslo_messaging.Target(topic='testtopic', fanout=True)
|
||||
self.listener.listen(target)
|
||||
|
||||
self._send_request(target)
|
||||
|
||||
self._check_listener(self.listener)
|
||||
|
||||
def test_all_listeners(self):
|
||||
target = oslo_messaging.Target(topic='testtopic', fanout=True)
|
||||
|
||||
for listener in self.listeners:
|
||||
listener.listen(target)
|
||||
|
||||
self._send_request(target)
|
||||
|
||||
for listener in self.listeners:
|
||||
self._check_listener(listener)
|
||||
|
||||
def test_filtered(self):
|
||||
target = oslo_messaging.Target(topic='testtopic', fanout=True)
|
||||
target_wrong = oslo_messaging.Target(topic='wrong', fanout=True)
|
||||
|
||||
self.listeners[0].listen(target)
|
||||
self.listeners[1].listen(target)
|
||||
self.listeners[2].listen(target_wrong)
|
||||
|
||||
self._send_request(target)
|
||||
|
||||
self._check_listener(self.listeners[0])
|
||||
self._check_listener(self.listeners[1])
|
||||
self._check_listener_negative(self.listeners[2])
|
||||
|
||||
def test_topic_part_matching(self):
|
||||
target = oslo_messaging.Target(topic='testtopic', server='server')
|
||||
target_part = oslo_messaging.Target(topic='testtopic', fanout=True)
|
||||
|
||||
self.listeners[0].listen(target)
|
||||
self.listeners[1].listen(target)
|
||||
|
||||
self._send_request(target_part)
|
||||
|
||||
self._check_listener(self.listeners[0])
|
||||
self._check_listener(self.listeners[1])
|
@ -1,80 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestRoutingTable(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRoutingTable, self).setUp()
|
||||
|
||||
def test_get_next_while_origin_changed(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
table.register("topic1.server1", "1")
|
||||
table.register("topic1.server1", "2")
|
||||
table.register("topic1.server1", "3")
|
||||
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for i in range(3):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(3, len(result))
|
||||
self.assertIn("1", result)
|
||||
self.assertIn("2", result)
|
||||
self.assertIn("3", result)
|
||||
|
||||
table.register("topic1.server1", "4")
|
||||
table.register("topic1.server1", "5")
|
||||
table.register("topic1.server1", "6")
|
||||
|
||||
result = []
|
||||
for i in range(6):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(6, len(result))
|
||||
self.assertIn("1", result)
|
||||
self.assertIn("2", result)
|
||||
self.assertIn("3", result)
|
||||
self.assertIn("4", result)
|
||||
self.assertIn("5", result)
|
||||
self.assertIn("6", result)
|
||||
|
||||
def test_no_targets(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for t in rr_gen:
|
||||
result.append(t)
|
||||
self.assertEqual(0, len(result))
|
||||
|
||||
def test_target_unchanged(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
table.register("topic1.server1", "1")
|
||||
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for i in range(3):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(["1", "1", "1"], result)
|
@ -1,226 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from six.moves import mock
|
||||
import testtools
|
||||
import time
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \
|
||||
import DealerConsumerWithAcks
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||
from oslo_messaging.tests.drivers.zmq import zmq_common
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestZmqAckManager(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqAckManager, self).setUp()
|
||||
|
||||
# register and set necessary config opts
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
zmq_options.register_opts(self.conf, mock.MagicMock())
|
||||
kwargs = {'rpc_zmq_matchmaker': 'dummy',
|
||||
'use_pub_sub': False,
|
||||
'use_router_proxy': True,
|
||||
'rpc_thread_pool_size': 1,
|
||||
'rpc_use_acks': True,
|
||||
'rpc_ack_timeout_base': 5,
|
||||
'rpc_ack_timeout_multiplier': 1,
|
||||
'rpc_retry_attempts': 2}
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
self.conf.register_opts(zmq_proxy.zmq_proxy_opts,
|
||||
group='zmq_proxy_opts')
|
||||
|
||||
# mock set_result method of futures
|
||||
self.set_result_patcher = mock.patch.object(
|
||||
zmq_receivers.futurist.Future, 'set_result',
|
||||
side_effect=zmq_receivers.futurist.Future.set_result, autospec=True
|
||||
)
|
||||
self.set_result = self.set_result_patcher.start()
|
||||
|
||||
# mock send method of senders
|
||||
self.send_patcher = mock.patch.object(
|
||||
zmq_senders.RequestSenderProxy, 'send',
|
||||
side_effect=zmq_senders.RequestSenderProxy.send, autospec=True
|
||||
)
|
||||
self.send = self.send_patcher.start()
|
||||
|
||||
# get driver
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# prepare and launch proxy
|
||||
self.proxy = zmq_proxy.ZmqProxy(self.conf)
|
||||
vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
|
||||
self.executor = zmq_async.get_executor(self.proxy.run)
|
||||
self.executor.execute()
|
||||
|
||||
# create listener
|
||||
self.listener = zmq_common.TestServerListener(self.driver)
|
||||
|
||||
# create target and message
|
||||
self.target = oslo_messaging.Target(topic='topic', server='server')
|
||||
self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
|
||||
|
||||
# start listening to target
|
||||
self.listener.listen(self.target)
|
||||
|
||||
# get ack manager
|
||||
self.ack_manager = self.driver.client.get().publishers['default']
|
||||
|
||||
self.addCleanup(
|
||||
zmq_common.StopRpc(
|
||||
self, [('listener', 'stop'), ('executor', 'stop'),
|
||||
('proxy', 'close'), ('driver', 'cleanup'),
|
||||
('send_patcher', 'stop'),
|
||||
('set_result_patcher', 'stop')]
|
||||
)
|
||||
)
|
||||
|
||||
# wait for all connections to be established
|
||||
# and all parties to be ready for messaging
|
||||
time.sleep(1)
|
||||
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
|
||||
side_effect=DealerConsumerWithAcks._acknowledge,
|
||||
autospec=True)
|
||||
def test_cast_success_without_retries(self, received_ack_mock):
|
||||
result = self.driver.send(
|
||||
self.target, {}, self.message, wait_for_reply=False
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(1, self.send.call_count)
|
||||
self.assertEqual(1, received_ack_mock.call_count)
|
||||
self.assertEqual(2, self.set_result.call_count)
|
||||
|
||||
def test_cast_success_with_one_retry(self):
|
||||
with mock.patch.object(DealerConsumerWithAcks,
|
||||
'_acknowledge') as lost_ack_mock:
|
||||
result = self.driver.send(
|
||||
self.target, {}, self.message, wait_for_reply=False
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
self.listener._received.wait(5)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(1, self.send.call_count)
|
||||
self.assertEqual(1, lost_ack_mock.call_count)
|
||||
self.assertEqual(0, self.set_result.call_count)
|
||||
self.listener._received.clear()
|
||||
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
|
||||
side_effect=DealerConsumerWithAcks._acknowledge,
|
||||
autospec=True) as received_ack_mock:
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertFalse(self.listener._received.isSet())
|
||||
self.assertEqual(2, self.send.call_count)
|
||||
self.assertEqual(1, received_ack_mock.call_count)
|
||||
self.assertEqual(2, self.set_result.call_count)
|
||||
|
||||
def test_cast_success_with_two_retries(self):
|
||||
with mock.patch.object(DealerConsumerWithAcks,
|
||||
'_acknowledge') as lost_ack_mock:
|
||||
result = self.driver.send(
|
||||
self.target, {}, self.message, wait_for_reply=False
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
self.listener._received.wait(5)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(1, self.send.call_count)
|
||||
self.assertEqual(1, lost_ack_mock.call_count)
|
||||
self.assertEqual(0, self.set_result.call_count)
|
||||
self.listener._received.clear()
|
||||
self.listener._received.wait(7.5)
|
||||
self.assertFalse(self.listener._received.isSet())
|
||||
self.assertEqual(2, self.send.call_count)
|
||||
self.assertEqual(2, lost_ack_mock.call_count)
|
||||
self.assertEqual(0, self.set_result.call_count)
|
||||
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
|
||||
side_effect=DealerConsumerWithAcks._acknowledge,
|
||||
autospec=True) as received_ack_mock:
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertFalse(self.listener._received.isSet())
|
||||
self.assertEqual(3, self.send.call_count)
|
||||
self.assertEqual(1, received_ack_mock.call_count)
|
||||
self.assertEqual(2, self.set_result.call_count)
|
||||
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
|
||||
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
|
||||
result = self.driver.send(
|
||||
self.target, {}, self.message, wait_for_reply=False
|
||||
)
|
||||
self.assertIsNone(result)
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(3, self.send.call_count)
|
||||
self.assertEqual(3, lost_ack_mock.call_count)
|
||||
self.assertEqual(1, self.set_result.call_count)
|
||||
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
|
||||
side_effect=DealerConsumerWithAcks._acknowledge,
|
||||
autospec=True)
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_reply',
|
||||
side_effect=DealerConsumerWithAcks._reply,
|
||||
autospec=True)
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache',
|
||||
side_effect=DealerConsumerWithAcks._reply_from_cache,
|
||||
autospec=True)
|
||||
def test_call_success_without_retries(self, unused_reply_from_cache_mock,
|
||||
received_reply_mock,
|
||||
received_ack_mock):
|
||||
result = self.driver.send(
|
||||
self.target, {}, self.message, wait_for_reply=True, timeout=10
|
||||
)
|
||||
self.assertIsNotNone(result)
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(1, self.send.call_count)
|
||||
self.assertEqual(1, received_ack_mock.call_count)
|
||||
self.assertEqual(3, self.set_result.call_count)
|
||||
received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY,
|
||||
reply=True, failure=None)
|
||||
self.assertEqual(0, unused_reply_from_cache_mock.call_count)
|
||||
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_reply')
|
||||
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache')
|
||||
def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
|
||||
lost_reply_mock, lost_ack_mock):
|
||||
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||
self.driver.send,
|
||||
self.target, {}, self.message,
|
||||
wait_for_reply=True, timeout=20)
|
||||
self.ack_manager.pool.shutdown(wait=True)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
self.assertEqual(self.message, self.listener.message.message)
|
||||
self.assertEqual(3, self.send.call_count)
|
||||
self.assertEqual(3, lost_ack_mock.call_count)
|
||||
self.assertEqual(2, self.set_result.call_count)
|
||||
lost_reply_mock.assert_called_once_with(mock.ANY,
|
||||
reply=True, failure=None)
|
||||
self.assertEqual(2, lost_reply_from_cache_mock.call_count)
|
@ -1,67 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testscenarios
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
class TestZmqAddress(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}),
|
||||
('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)})
|
||||
]
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def test_target_to_key_topic_only(self):
|
||||
target = oslo_messaging.Target(topic='topic')
|
||||
key = zmq_address.target_to_key(target, self.listener_type)
|
||||
self.assertEqual(self.listener_type + '/topic', key)
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def test_target_to_key_topic_server_round_robin(self):
|
||||
target = oslo_messaging.Target(topic='topic', server='server')
|
||||
key = zmq_address.target_to_key(target, self.listener_type)
|
||||
self.assertEqual(self.listener_type + '/topic/server', key)
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def test_target_to_key_topic_fanout(self):
|
||||
target = oslo_messaging.Target(topic='topic', fanout=True)
|
||||
key = zmq_address.target_to_key(target, self.listener_type)
|
||||
self.assertEqual(self.listener_type + '/topic', key)
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def test_target_to_key_topic_server_fanout(self):
|
||||
target = oslo_messaging.Target(topic='topic', server='server',
|
||||
fanout=True)
|
||||
key = zmq_address.target_to_key(target, self.listener_type)
|
||||
self.assertEqual(self.listener_type + '/topic', key)
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def test_target_to_key_topic_server_fanout_no_prefix(self):
|
||||
target = oslo_messaging.Target(topic='topic', server='server',
|
||||
fanout=True)
|
||||
key = zmq_address.target_to_key(target)
|
||||
self.assertEqual('topic', key)
|
@ -1,93 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from six.moves import mock
|
||||
import testtools
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.poller import green_poller
|
||||
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestImportZmq(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestImportZmq, self).setUp()
|
||||
|
||||
def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: True
|
||||
|
||||
mock_try_import = mock.Mock()
|
||||
zmq_async.importutils.try_import = mock_try_import
|
||||
|
||||
zmq_async.import_zmq()
|
||||
|
||||
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
|
||||
|
||||
def test_when_evetlet_is_unavailable_then_load_zmq(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: False
|
||||
|
||||
mock_try_import = mock.Mock()
|
||||
zmq_async.importutils.try_import = mock_try_import
|
||||
|
||||
zmq_async.import_zmq()
|
||||
|
||||
mock_try_import.assert_called_with('zmq', default=None)
|
||||
|
||||
|
||||
class TestGetPoller(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestGetPoller, self).setUp()
|
||||
|
||||
def test_when_eventlet_is_available_then_return_GreenPoller(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: True
|
||||
|
||||
poller = zmq_async.get_poller()
|
||||
|
||||
self.assertIsInstance(poller, green_poller.GreenPoller)
|
||||
|
||||
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: False
|
||||
|
||||
poller = zmq_async.get_poller()
|
||||
|
||||
self.assertIsInstance(poller, threading_poller.ThreadingPoller)
|
||||
|
||||
|
||||
class TestGetExecutor(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestGetExecutor, self).setUp()
|
||||
|
||||
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: True
|
||||
|
||||
executor = zmq_async.get_executor('any method')
|
||||
|
||||
self.assertIsInstance(executor, green_poller.GreenExecutor)
|
||||
self.assertEqual('any method', executor._method)
|
||||
|
||||
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
|
||||
zmq_async.eventletutils.is_monkey_patched = lambda _: False
|
||||
|
||||
executor = zmq_async.get_executor('any method')
|
||||
|
||||
self.assertIsInstance(executor,
|
||||
threading_poller.ThreadingExecutor)
|
||||
self.assertEqual('any method', executor._method)
|
@ -1,127 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from six.moves import mock
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \
|
||||
import MatchmakerDummy
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
redis = zmq_matchmaker_redis.redis
|
||||
sentinel = zmq_matchmaker_redis.redis_sentinel
|
||||
|
||||
|
||||
class TestZmqTransportUrl(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqTransportUrl, self).setUp()
|
||||
|
||||
def setup_url(self, url):
|
||||
transport = oslo_messaging.get_transport(self.conf, url)
|
||||
self.addCleanup(transport.cleanup)
|
||||
driver = transport._driver
|
||||
return driver, url
|
||||
|
||||
def mock_redis(self):
|
||||
if redis is None:
|
||||
self.skipTest("redis not available")
|
||||
else:
|
||||
redis_patcher = mock.patch.object(redis, 'StrictRedis')
|
||||
self.addCleanup(redis_patcher.stop)
|
||||
return redis_patcher.start()
|
||||
|
||||
def mock_sentinel(self):
|
||||
if sentinel is None:
|
||||
self.skipTest("sentinel not available")
|
||||
else:
|
||||
sentinel_patcher = mock.patch.object(sentinel, 'Sentinel')
|
||||
self.addCleanup(sentinel_patcher.stop)
|
||||
return sentinel_patcher.start()
|
||||
|
||||
def test_empty_url(self):
|
||||
self.mock_redis()
|
||||
driver, url = self.setup_url("zmq:///")
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
||||
|
||||
def test_error_url(self):
|
||||
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
|
||||
|
||||
def test_dummy_url(self):
|
||||
driver, url = self.setup_url("zmq+dummy:///")
|
||||
self.assertIs(MatchmakerDummy,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+dummy', driver.matchmaker.url.transport)
|
||||
|
||||
def test_redis_url(self):
|
||||
self.mock_redis()
|
||||
driver, url = self.setup_url("zmq+redis:///")
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
|
||||
def test_sentinel_url(self):
|
||||
self.mock_sentinel()
|
||||
driver, url = self.setup_url("zmq+sentinel:///")
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
|
||||
|
||||
def test_host_with_credentials_url(self):
|
||||
self.mock_redis()
|
||||
driver, url = self.setup_url("zmq://:password@host:60000/")
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
||||
self.assertEqual(
|
||||
[{"host": "host", "port": 60000, "password": "password"}],
|
||||
driver.matchmaker._redis_hosts
|
||||
)
|
||||
|
||||
def test_redis_multiple_hosts_url(self):
|
||||
self.mock_redis()
|
||||
driver, url = self.setup_url(
|
||||
"zmq+redis://host1:60001,host2:60002,host3:60003/"
|
||||
)
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||
self.assertEqual(
|
||||
[{"host": "host1", "port": 60001, "password": None},
|
||||
{"host": "host2", "port": 60002, "password": None},
|
||||
{"host": "host3", "port": 60003, "password": None}],
|
||||
driver.matchmaker._redis_hosts
|
||||
)
|
||||
|
||||
def test_sentinel_multiple_hosts_url(self):
|
||||
self.mock_sentinel()
|
||||
driver, url = self.setup_url(
|
||||
"zmq+sentinel://host1:20001,host2:20002,host3:20003/"
|
||||
)
|
||||
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
|
||||
driver.matchmaker.__class__)
|
||||
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
|
||||
self.assertEqual(
|
||||
[("host1", 20001), ("host2", 20002), ("host3", 20003)],
|
||||
driver.matchmaker._sentinel_hosts
|
||||
)
|
@ -1,132 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
class TestZmqTTLCache(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestZmqTTLCache, self).setUp()
|
||||
|
||||
def call_count_decorator(unbound_method):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
wrapper.call_count += 1
|
||||
return unbound_method(self, *args, **kwargs)
|
||||
wrapper.call_count = 0
|
||||
return wrapper
|
||||
|
||||
zmq_ttl_cache.TTLCache._update_cache = \
|
||||
call_count_decorator(zmq_ttl_cache.TTLCache._update_cache)
|
||||
|
||||
self.cache = zmq_ttl_cache.TTLCache(ttl=1)
|
||||
|
||||
self.addCleanup(lambda: self.cache.cleanup())
|
||||
|
||||
def _test_add_get(self):
|
||||
self.cache.add('x', 'a')
|
||||
|
||||
self.assertEqual(self.cache.get('x'), 'a')
|
||||
self.assertEqual(self.cache.get('x', 'b'), 'a')
|
||||
self.assertIsNone(self.cache.get('y'))
|
||||
self.assertEqual(self.cache.get('y', 'b'), 'b')
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
self.assertIsNone(self.cache.get('x'))
|
||||
self.assertEqual(self.cache.get('x', 'b'), 'b')
|
||||
|
||||
def test_add_get_with_executor(self):
|
||||
self._test_add_get()
|
||||
|
||||
def test_add_get_without_executor(self):
|
||||
self.cache._executor.stop()
|
||||
self._test_add_get()
|
||||
|
||||
def _test_in_operator(self):
|
||||
self.cache.add(1)
|
||||
|
||||
self.assertIn(1, self.cache)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
self.cache.add(2)
|
||||
|
||||
self.assertIn(1, self.cache)
|
||||
self.assertIn(2, self.cache)
|
||||
|
||||
time.sleep(0.75)
|
||||
|
||||
self.cache.add(3)
|
||||
|
||||
self.assertNotIn(1, self.cache)
|
||||
self.assertIn(2, self.cache)
|
||||
self.assertIn(3, self.cache)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
self.assertNotIn(2, self.cache)
|
||||
self.assertIn(3, self.cache)
|
||||
|
||||
def test_in_operator_with_executor(self):
|
||||
self._test_in_operator()
|
||||
|
||||
def test_in_operator_without_executor(self):
|
||||
self.cache._executor.stop()
|
||||
self._test_in_operator()
|
||||
|
||||
def _is_expired(self, key):
|
||||
with self.cache._lock:
|
||||
_, expiration_time = self.cache._cache[key]
|
||||
return self.cache._is_expired(expiration_time, time.time())
|
||||
|
||||
def test_executor(self):
|
||||
self.cache.add(1)
|
||||
|
||||
self.assertEqual([1], sorted(self.cache._cache.keys()))
|
||||
self.assertFalse(self._is_expired(1))
|
||||
|
||||
time.sleep(0.75)
|
||||
|
||||
self.assertEqual(1, self.cache._update_cache.call_count)
|
||||
|
||||
self.cache.add(2)
|
||||
|
||||
self.assertEqual([1, 2], sorted(self.cache._cache.keys()))
|
||||
self.assertFalse(self._is_expired(1))
|
||||
self.assertFalse(self._is_expired(2))
|
||||
|
||||
time.sleep(0.75)
|
||||
|
||||
self.assertEqual(2, self.cache._update_cache.call_count)
|
||||
|
||||
self.cache.add(3)
|
||||
|
||||
if 1 in self.cache:
|
||||
self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys()))
|
||||
self.assertTrue(self._is_expired(1))
|
||||
else:
|
||||
self.assertEqual([2, 3], sorted(self.cache._cache.keys()))
|
||||
self.assertFalse(self._is_expired(2))
|
||||
self.assertFalse(self._is_expired(3))
|
||||
|
||||
time.sleep(0.75)
|
||||
|
||||
self.assertEqual(3, self.cache._update_cache.call_count)
|
||||
|
||||
self.assertEqual([3], sorted(self.cache._cache.keys()))
|
||||
self.assertFalse(self._is_expired(3))
|
@ -1,63 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_version
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
class Doer(object):
|
||||
|
||||
def __init__(self):
|
||||
self.x = 1
|
||||
self.y = 2
|
||||
self.z = 3
|
||||
|
||||
def _sudo(self):
|
||||
pass
|
||||
|
||||
def do(self):
|
||||
pass
|
||||
|
||||
def _do_v_1_1(self):
|
||||
pass
|
||||
|
||||
def _do_v_2_2(self):
|
||||
pass
|
||||
|
||||
def _do_v_3_3(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestZmqVersion(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestZmqVersion, self).setUp()
|
||||
self.doer = Doer()
|
||||
|
||||
def test_get_unknown_attr_versions(self):
|
||||
self.assertRaises(AssertionError, zmq_version.get_method_versions,
|
||||
self.doer, 'qwerty')
|
||||
|
||||
def test_get_non_method_attr_versions(self):
|
||||
for attr_name in vars(self.doer):
|
||||
self.assertRaises(AssertionError, zmq_version.get_method_versions,
|
||||
self.doer, attr_name)
|
||||
|
||||
def test_get_private_method_versions(self):
|
||||
self.assertRaises(AssertionError, zmq_version.get_method_versions,
|
||||
self.doer, '_sudo')
|
||||
|
||||
def test_get_public_method_versions(self):
|
||||
do_versions = zmq_version.get_method_versions(self.doer, 'do')
|
||||
self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys()))
|
@ -1,111 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import fixtures
|
||||
from six.moves import mock
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestServerListener(object):
|
||||
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
self.listener = None
|
||||
self.executor = zmq_async.get_executor(self._run)
|
||||
self._stop = threading.Event()
|
||||
self._received = threading.Event()
|
||||
self.message = None
|
||||
|
||||
def listen(self, target):
|
||||
self.listener = self.driver.listen(target, None,
|
||||
None)._poll_style_listener
|
||||
self.executor.execute()
|
||||
|
||||
def listen_notifications(self, targets_and_priorities):
|
||||
self.listener = self.driver.listen_for_notifications(
|
||||
targets_and_priorities, None, None, None)._poll_style_listener
|
||||
self.executor.execute()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
messages = self.listener.poll()
|
||||
if messages:
|
||||
message = messages[0]
|
||||
message.acknowledge()
|
||||
self._received.set()
|
||||
self.message = message
|
||||
message.reply(reply=True)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Unexpected exception occurred."))
|
||||
|
||||
def stop(self):
|
||||
self.executor.stop()
|
||||
|
||||
|
||||
class ZmqBaseTestCase(test_utils.BaseTestCase):
|
||||
"""Base test case for all ZMQ tests """
|
||||
|
||||
@testtools.skipIf(zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(ZmqBaseTestCase, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
zmq_options.register_opts(self.conf, mock.MagicMock())
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
|
||||
'use_pub_sub': False,
|
||||
'use_router_proxy': False,
|
||||
'rpc_zmq_matchmaker': 'dummy'}
|
||||
self.config(group='oslo_messaging_zmq', **kwargs)
|
||||
self.config(rpc_response_timeout=5)
|
||||
|
||||
# Get driver
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
self.listener = TestServerListener(self.driver)
|
||||
|
||||
self.addCleanup(
|
||||
StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')])
|
||||
)
|
||||
|
||||
|
||||
class StopRpc(object):
|
||||
def __init__(self, obj, attrs_and_stops):
|
||||
self.obj = obj
|
||||
self.attrs_and_stops = attrs_and_stops
|
||||
|
||||
def __call__(self):
|
||||
for attr, stop in self.attrs_and_stops:
|
||||
if hasattr(self.obj, attr):
|
||||
obj_attr = getattr(self.obj, attr)
|
||||
if hasattr(obj_attr, stop):
|
||||
obj_attr_stop = getattr(obj_attr, stop)
|
||||
obj_attr_stop()
|
@ -1,232 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import multiprocessing
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests.functional import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class QueueHandler(logging.Handler):
|
||||
"""This is a logging handler which sends events to a multiprocessing queue.
|
||||
|
||||
The plan is to add it to Python 3.2, but this can be copy pasted into
|
||||
user code for use with earlier Python versions.
|
||||
"""
|
||||
|
||||
def __init__(self, queue):
|
||||
"""Initialise an instance, using the passed queue."""
|
||||
logging.Handler.__init__(self)
|
||||
self.queue = queue
|
||||
|
||||
def emit(self, record):
|
||||
"""Emit a record.
|
||||
|
||||
Writes the LogRecord to the queue.
|
||||
"""
|
||||
try:
|
||||
ei = record.exc_info
|
||||
if ei:
|
||||
# just to get traceback text into record.exc_text
|
||||
dummy = self.format(record) # noqa
|
||||
record.exc_info = None # not needed any more
|
||||
self.queue.put_nowait(record)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
|
||||
def listener_configurer(conf):
|
||||
root = logging.getLogger()
|
||||
h = logging.StreamHandler(sys.stdout)
|
||||
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s '
|
||||
'%(levelname)-8s %(message)s')
|
||||
h.setFormatter(f)
|
||||
root.addHandler(h)
|
||||
log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
|
||||
"/" + "zmq_multiproc.log"
|
||||
file_handler = logging.StreamHandler(open(log_path, 'w'))
|
||||
file_handler.setFormatter(f)
|
||||
root.addHandler(file_handler)
|
||||
|
||||
|
||||
def server_configurer(queue):
|
||||
h = QueueHandler(queue)
|
||||
root = logging.getLogger()
|
||||
root.addHandler(h)
|
||||
root.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def listener_thread(queue, configurer, conf):
|
||||
configurer(conf)
|
||||
while True:
|
||||
time.sleep(0.3)
|
||||
try:
|
||||
record = queue.get()
|
||||
if record is None:
|
||||
break
|
||||
logger = logging.getLogger(record.name)
|
||||
logger.handle(record)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
|
||||
|
||||
class Client(oslo_messaging.RPCClient):
|
||||
|
||||
def __init__(self, transport, topic):
|
||||
super(Client, self).__init__(
|
||||
transport=transport, target=oslo_messaging.Target(topic=topic))
|
||||
self.replies = []
|
||||
|
||||
def call_a(self):
|
||||
LOG.warning("call_a - client side")
|
||||
rep = self.call({}, 'call_a')
|
||||
LOG.warning("after call_a - client side")
|
||||
self.replies.append(rep)
|
||||
return rep
|
||||
|
||||
|
||||
class ReplyServerEndpoint(object):
|
||||
|
||||
def call_a(self, *args, **kwargs):
|
||||
LOG.warning("call_a - Server endpoint reached!")
|
||||
return "OK"
|
||||
|
||||
|
||||
class Server(object):
|
||||
|
||||
def __init__(self, conf, log_queue, transport_url, name, topic=None):
|
||||
self.conf = conf
|
||||
self.log_queue = log_queue
|
||||
self.transport_url = transport_url
|
||||
self.name = name
|
||||
self.topic = topic or str(uuid.uuid4())
|
||||
self.ready = multiprocessing.Value('b', False)
|
||||
self._stop = multiprocessing.Event()
|
||||
|
||||
def start(self):
|
||||
self.process = multiprocessing.Process(target=self._run_server,
|
||||
name=self.name,
|
||||
args=(self.conf,
|
||||
self.transport_url,
|
||||
self.log_queue,
|
||||
self.ready))
|
||||
self.process.start()
|
||||
LOG.debug("Server process started: pid: %d", self.process.pid)
|
||||
|
||||
def _run_server(self, conf, url, log_queue, ready):
|
||||
server_configurer(log_queue)
|
||||
LOG.debug("Starting RPC server")
|
||||
|
||||
transport = oslo_messaging.get_transport(conf, url=url)
|
||||
target = oslo_messaging.Target(topic=self.topic, server=self.name)
|
||||
self.rpc_server = oslo_messaging.get_rpc_server(
|
||||
transport=transport, target=target,
|
||||
endpoints=[ReplyServerEndpoint()],
|
||||
executor='eventlet')
|
||||
self.rpc_server.start()
|
||||
ready.value = True
|
||||
LOG.debug("RPC server being started")
|
||||
while not self._stop.is_set():
|
||||
LOG.debug("Waiting for the stop signal ...")
|
||||
time.sleep(1)
|
||||
self.rpc_server.stop()
|
||||
self.rpc_server.wait()
|
||||
LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
|
||||
|
||||
def cleanup(self):
|
||||
LOG.debug("Stopping server")
|
||||
self.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
self._stop.set()
|
||||
|
||||
def restart(self, time_for_restart=1):
|
||||
pass
|
||||
|
||||
def hang(self):
|
||||
pass
|
||||
|
||||
def crash(self):
|
||||
pass
|
||||
|
||||
def ping(self):
|
||||
pass
|
||||
|
||||
|
||||
class MultiprocTestCase(utils.SkipIfNoTransportURL):
|
||||
|
||||
def setUp(self):
|
||||
super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
if not self.url.startswith("zmq"):
|
||||
self.skipTest("ZeroMQ specific skipped...")
|
||||
|
||||
self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
|
||||
|
||||
LOG.debug("Start log queue")
|
||||
|
||||
self.log_queue = multiprocessing.Queue()
|
||||
self.log_listener = threading.Thread(target=listener_thread,
|
||||
args=(self.log_queue,
|
||||
listener_configurer,
|
||||
self.conf))
|
||||
self.log_listener.start()
|
||||
self.spawned = []
|
||||
|
||||
self.conf.prog = "test_prog"
|
||||
self.conf.project = "test_project"
|
||||
|
||||
def tearDown(self):
|
||||
for process in self.spawned:
|
||||
process.cleanup()
|
||||
super(MultiprocTestCase, self).tearDown()
|
||||
|
||||
def get_client(self, topic):
|
||||
return Client(self.transport, topic)
|
||||
|
||||
def spawn_server(self, wait_for_server=False, topic=None):
|
||||
name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8])
|
||||
server = Server(self.conf, self.log_queue, self.url, name, topic)
|
||||
LOG.debug("[SPAWN] %s (starting)...", server.name)
|
||||
server.start()
|
||||
if wait_for_server:
|
||||
while not server.ready.value:
|
||||
LOG.debug("[SPAWN] %s (waiting for server ready)...",
|
||||
server.name)
|
||||
time.sleep(1)
|
||||
LOG.debug("[SPAWN] Server %s:%d started.",
|
||||
server.name, server.process.pid)
|
||||
self.spawned.append(server)
|
||||
return server
|
||||
|
||||
def spawn_servers(self, number, wait_for_server=False, common_topic=True):
|
||||
topic = str(uuid.uuid4()) if common_topic else None
|
||||
for _ in range(number):
|
||||
self.spawn_server(wait_for_server, topic)
|
@ -1,49 +0,0 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from oslo_messaging.tests.functional.zmq import multiproc_utils
|
||||
|
||||
|
||||
class StartupOrderTestCase(multiproc_utils.MultiprocTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(StartupOrderTestCase, self).setUp()
|
||||
|
||||
self.conf.prog = "test_prog"
|
||||
self.conf.project = "test_project"
|
||||
|
||||
self.config(rpc_response_timeout=10)
|
||||
|
||||
log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
|
||||
str(os.getpid()) + ".log")
|
||||
sys.stdout = open(log_path, "wb", buffering=0)
|
||||
|
||||
def test_call_client_wait_for_server(self):
|
||||
server = self.spawn_server(wait_for_server=True)
|
||||
client = self.get_client(server.topic)
|
||||
for _ in range(3):
|
||||
reply = client.call_a()
|
||||
self.assertIsNotNone(reply)
|
||||
self.assertEqual(3, len(client.replies))
|
||||
|
||||
def test_call_client_dont_wait_for_server(self):
|
||||
server = self.spawn_server(wait_for_server=False)
|
||||
client = self.get_client(server.topic)
|
||||
for _ in range(3):
|
||||
reply = client.call_a()
|
||||
self.assertIsNotNone(reply)
|
||||
self.assertEqual(3, len(client.replies))
|
Loading…
x
Reference in New Issue
Block a user