diff --git a/.zuul.yaml b/.zuul.yaml index da20f4a64..e7f95b797 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -18,27 +18,6 @@ tox_envlist: py27-func-rabbit bindep_profile: rabbit -- job: - name: oslo.messaging-tox-py27-func-zmq - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq - bindep_profile: zmq - -- job: - name: oslo.messaging-tox-py27-func-zmq-proxy - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq-proxy - bindep_profile: zmq - -- job: - name: oslo.messaging-tox-py27-func-zmq-pubsub - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq-pubsub - bindep_profile: zmq - - job: name: oslo.messaging-tox-py35-func-amqp1 parent: openstack-tox-py35 @@ -52,13 +31,6 @@ tox_envlist: py35-func-rabbit bindep_profile: rabbit -- job: - name: oslo.messaging-tox-py35-func-zmq - parent: openstack-tox-py35 - vars: - tox_envlist: py35-func-zmq - bindep_profile: zmq - - job: name: oslo.messaging-src-dsvm-full-rabbit-default parent: legacy-dsvm-base @@ -115,17 +87,6 @@ - openstack/devstack-plugin-kafka - openstack/oslo.messaging -- job: - name: oslo.messaging-src-dsvm-full-zmq-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml - post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml - timeout: 10800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-zmq - - openstack/oslo.messaging - - job: name: oslo.messaging-src-grenade-dsvm parent: legacy-dsvm-base @@ -194,24 +155,6 @@ - openstack/dib-utils - openstack/diskimage-builder -- job: - name: oslo.messaging-telemetry-dsvm-integration-zmq - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml - post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml - timeout: 4200 - required-projects: - - openstack-infra/devstack-gate - - openstack/aodh - - openstack/ceilometer - - openstack/devstack-plugin-zmq - - openstack/oslo.messaging - - openstack/panko - # following are required when DEVSTACK_GATE_HEAT, which this - # job turns on - - openstack/dib-utils - - openstack/diskimage-builder - - job: name: oslo.messaging-telemetry-dsvm-integration-rabbit parent: legacy-dsvm-base @@ -270,19 +213,6 @@ - openstack/oslo.messaging - openstack/tempest -- job: - name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml - post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml - timeout: 7800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-zmq - - openstack/neutron - - openstack/oslo.messaging - - openstack/tempest - - project: check: @@ -292,18 +222,10 @@ - oslo.messaging-tox-py27-func-kafka: voting: false - oslo.messaging-tox-py27-func-rabbit - - oslo.messaging-tox-py27-func-zmq-proxy: - voting: false - - oslo.messaging-tox-py27-func-zmq-pubsub: - voting: false - - oslo.messaging-tox-py27-func-zmq: - voting: false - oslo.messaging-tox-py35-func-amqp1: voting: false - oslo.messaging-tox-py35-func-rabbit: voting: false - - oslo.messaging-tox-py35-func-zmq: - voting: false - oslo.messaging-src-dsvm-full-rabbit-default - oslo.messaging-src-dsvm-full-amqp1-hybrid: @@ -316,8 +238,6 @@ voting: false - oslo.messaging-src-dsvm-full-kafka-default: voting: false - - oslo.messaging-src-dsvm-full-zmq-default: - voting: false - oslo.messaging-src-grenade-dsvm: voting: false @@ -329,8 +249,6 @@ voting: false - oslo.messaging-telemetry-dsvm-integration-kafka: voting: false - - oslo.messaging-telemetry-dsvm-integration-zmq: - voting: false - oslo.messaging-tempest-neutron-dsvm-src-rabbit-default - oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid: @@ -338,8 +256,6 @@ branches: ^(?!stable/ocata).*$ - oslo.messaging-tempest-neutron-dsvm-src-kafka-default: voting: false - - oslo.messaging-tempest-neutron-dsvm-src-zmq-default: - voting: false gate: jobs: diff --git a/oslo_messaging/tests/drivers/zmq/__init__.py b/oslo_messaging/tests/drivers/zmq/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py b/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py deleted file mode 100755 index a20cdb4c7..000000000 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ /dev/null @@ -1,140 +0,0 @@ -# Copyright 2014 Canonical, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import inspect -from stevedore import driver -import testscenarios - -import oslo_messaging -from oslo_messaging.tests import utils as test_utils -from oslo_utils import importutils - -redis = importutils.try_import('redis') - - -def redis_available(): - '''Helper to see if local redis server is running''' - if not redis: - return False - try: - redis.StrictRedis(socket_timeout=1).ping() - return True - except redis.exceptions.ConnectionError: - return False - - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestImplMatchmaker(test_utils.BaseTestCase): - - scenarios = [ - ("dummy", {"rpc_zmq_matchmaker": "dummy"}), - ("redis", {"rpc_zmq_matchmaker": "redis"}), - ] - - def setUp(self): - super(TestImplMatchmaker, self).setUp() - - if self.rpc_zmq_matchmaker == "redis": - if not redis_available(): - self.skipTest("redis unavailable") - - self.test_matcher = driver.DriverManager( - 'oslo.messaging.zmq.matchmaker', - self.rpc_zmq_matchmaker, - ).driver(self.conf) - - if self.rpc_zmq_matchmaker == "redis": - for redis_instance in self.test_matcher._redis_instances: - self.addCleanup(redis_instance.flushdb) - - self.target = oslo_messaging.Target(topic="test_topic") - self.host1 = b"test_host1" - self.host2 = b"test_host2" - - def test_register(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertEqual([self.host1], - self.test_matcher.get_hosts(self.target, "test")) - - def test_register_two_hosts(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host2, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), - [self.host1, self.host2]) - - def test_register_unregister(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host2, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.test_matcher.unregister(self.target, self.host2, "test") - - self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), - [self.host1]) - - def test_register_two_same_hosts(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertEqual([self.host1], - self.test_matcher.get_hosts(self.target, "test")) - - def test_get_hosts_wrong_topic(self): - target = oslo_messaging.Target(topic="no_such_topic") - self.assertEqual([], self.test_matcher.get_hosts(target, "test")) - - def test_handle_redis_package_error(self): - if self.rpc_zmq_matchmaker == "redis": - # move 'redis' variable to prevent this case affect others - module = inspect.getmodule(self.test_matcher) - redis_package = module.redis - - # 'redis' variable is set to None, when package importing is failed - module.redis = None - self.assertRaises(ImportError, self.test_matcher.__init__, - self.conf) - - # retrieve 'redis' variable which is set originally - module.redis = redis_package diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py deleted file mode 100644 index 5c2d7e49e..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import testtools - -import oslo_messaging -from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_socket -from oslo_messaging.tests.drivers.zmq import zmq_common -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - - -class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(ZmqTestPortsRange, self).setUp() - - # Set config values - kwargs = {'rpc_zmq_min_port': 5555, - 'rpc_zmq_max_port': 5560} - self.config(group='oslo_messaging_zmq', **kwargs) - - def test_ports_range(self): - listeners = [] - - for i in range(10): - try: - target = oslo_messaging.Target(topic='testtopic_' + str(i)) - new_listener = self.driver.listen(target, None, None) - listeners.append(new_listener) - except zmq_socket.ZmqPortBusy: - pass - - self.assertLessEqual(len(listeners), 5) - - for l in listeners: - l.cleanup() - - -class TestConfZmqDriverLoad(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestConfZmqDriverLoad, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - - def test_driver_load(self): - transport = oslo_messaging.get_transport(self.conf) - self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) - - -class TestZmqBasics(zmq_common.ZmqBaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqBasics, self).setUp() - self.target = oslo_messaging.Target(topic='topic') - self.ctxt = {'key': 'value'} - self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}} - - def test_send_call_without_method_failure(self): - self.message.pop('method') - self.listener.listen(self.target) - self.assertRaises(KeyError, self.driver.send, - self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - - def _check_listener_received(self): - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.ctxt, self.listener.message.ctxt) - self.assertEqual(self.message, self.listener.message.message) - - def test_send_call_success(self): - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - self.assertTrue(result) - self._check_listener_received() - - def test_send_call_direct_success(self): - self.target.server = 'server' - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - self.assertTrue(result) - self._check_listener_received() - - def test_send_cast_direct_success(self): - self.target.server = 'server' - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=False) - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() - - def test_send_fanout_success(self): - self.target.fanout = True - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=False) - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() - - def test_send_notify_success(self): - self.listener.listen_notifications([(self.target, 'info')]) - self.target.topic += '.info' - result = self.driver.send_notification(self.target, self.ctxt, - self.message, '3.0') - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py deleted file mode 100755 index cc72608a8..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright 2015-2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import time - -import msgpack -import six -import testscenarios - -from oslo_config import cfg - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver.proxy.central \ - import zmq_publisher_proxy -from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._drivers.zmq_driver import zmq_version -from oslo_messaging.tests.drivers.zmq import zmq_common - -load_tests = testscenarios.load_tests_apply_scenarios - -zmq = zmq_async.import_zmq() - -opt_group = cfg.OptGroup(name='zmq_proxy_opts', - title='ZeroMQ proxy options') -cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) - - -class TestPubSub(zmq_common.ZmqBaseTestCase): - - LISTENERS_COUNT = 3 - - scenarios = [ - ('json', {'serialization': 'json', - 'dumps': lambda obj: six.b(json.dumps(obj))}), - ('msgpack', {'serialization': 'msgpack', - 'dumps': msgpack.dumps}) - ] - - def setUp(self): - super(TestPubSub, self).setUp() - - kwargs = {'use_pub_sub': True, - 'rpc_zmq_serialization': self.serialization} - self.config(group='oslo_messaging_zmq', **kwargs) - - self.config(host="127.0.0.1", group="zmq_proxy_opts") - self.config(publisher_port=0, group="zmq_proxy_opts") - - self.publisher = zmq_publisher_proxy.PublisherProxy( - self.conf, self.driver.matchmaker) - self.driver.matchmaker.register_publisher( - (self.publisher.host, ''), - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.listeners = [] - for _ in range(self.LISTENERS_COUNT): - self.listeners.append(zmq_common.TestServerListener(self.driver)) - - def tearDown(self): - super(TestPubSub, self).tearDown() - self.publisher.cleanup() - for listener in self.listeners: - listener.stop() - - def _send_request(self, target): - # Needed only in test env to give listener a chance to connect - # before request fires - time.sleep(1) - context = {} - message = {'method': 'hello-world'} - - self.publisher.send_request( - [b"reply_id", - b'', - six.b(zmq_version.MESSAGE_VERSION), - six.b(str(zmq_names.CAST_FANOUT_TYPE)), - zmq_address.target_to_subscribe_filter(target), - b"message_id", - self.dumps([context, message])] - ) - - def _check_listener(self, listener): - listener._received.wait(timeout=5) - self.assertTrue(listener._received.isSet()) - method = listener.message.message[u'method'] - self.assertEqual(u'hello-world', method) - - def _check_listener_negative(self, listener): - listener._received.wait(timeout=1) - self.assertFalse(listener._received.isSet()) - - def test_single_listener(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - self.listener.listen(target) - - self._send_request(target) - - self._check_listener(self.listener) - - def test_all_listeners(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - - for listener in self.listeners: - listener.listen(target) - - self._send_request(target) - - for listener in self.listeners: - self._check_listener(listener) - - def test_filtered(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - target_wrong = oslo_messaging.Target(topic='wrong', fanout=True) - - self.listeners[0].listen(target) - self.listeners[1].listen(target) - self.listeners[2].listen(target_wrong) - - self._send_request(target) - - self._check_listener(self.listeners[0]) - self._check_listener(self.listeners[1]) - self._check_listener_negative(self.listeners[2]) - - def test_topic_part_matching(self): - target = oslo_messaging.Target(topic='testtopic', server='server') - target_part = oslo_messaging.Target(topic='testtopic', fanout=True) - - self.listeners[0].listen(target) - self.listeners[1].listen(target) - - self._send_request(target_part) - - self._check_listener(self.listeners[0]) - self._check_listener(self.listeners[1]) diff --git a/oslo_messaging/tests/drivers/zmq/test_routing_table.py b/oslo_messaging/tests/drivers/zmq/test_routing_table.py deleted file mode 100644 index 508a161e4..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_routing_table.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - - -class TestRoutingTable(test_utils.BaseTestCase): - - def setUp(self): - super(TestRoutingTable, self).setUp() - - def test_get_next_while_origin_changed(self): - table = zmq_routing_table.RoutingTable(self.conf) - table.register("topic1.server1", "1") - table.register("topic1.server1", "2") - table.register("topic1.server1", "3") - - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for i in range(3): - result.append(next(rr_gen)) - - self.assertEqual(3, len(result)) - self.assertIn("1", result) - self.assertIn("2", result) - self.assertIn("3", result) - - table.register("topic1.server1", "4") - table.register("topic1.server1", "5") - table.register("topic1.server1", "6") - - result = [] - for i in range(6): - result.append(next(rr_gen)) - - self.assertEqual(6, len(result)) - self.assertIn("1", result) - self.assertIn("2", result) - self.assertIn("3", result) - self.assertIn("4", result) - self.assertIn("5", result) - self.assertIn("6", result) - - def test_no_targets(self): - table = zmq_routing_table.RoutingTable(self.conf) - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for t in rr_gen: - result.append(t) - self.assertEqual(0, len(result)) - - def test_target_unchanged(self): - table = zmq_routing_table.RoutingTable(self.conf) - table.register("topic1.server1", "1") - - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for i in range(3): - result.append(next(rr_gen)) - - self.assertEqual(["1", "1", "1"], result) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py deleted file mode 100644 index a0264cf01..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools -import time - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver.client import zmq_receivers -from oslo_messaging._drivers.zmq_driver.client import zmq_senders -from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \ - import DealerConsumerWithAcks -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_options -from oslo_messaging.tests.drivers.zmq import zmq_common -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - - -class TestZmqAckManager(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqAckManager, self).setUp() - - # register and set necessary config opts - self.messaging_conf.transport_driver = 'zmq' - zmq_options.register_opts(self.conf, mock.MagicMock()) - kwargs = {'rpc_zmq_matchmaker': 'dummy', - 'use_pub_sub': False, - 'use_router_proxy': True, - 'rpc_thread_pool_size': 1, - 'rpc_use_acks': True, - 'rpc_ack_timeout_base': 5, - 'rpc_ack_timeout_multiplier': 1, - 'rpc_retry_attempts': 2} - self.config(group='oslo_messaging_zmq', **kwargs) - self.conf.register_opts(zmq_proxy.zmq_proxy_opts, - group='zmq_proxy_opts') - - # mock set_result method of futures - self.set_result_patcher = mock.patch.object( - zmq_receivers.futurist.Future, 'set_result', - side_effect=zmq_receivers.futurist.Future.set_result, autospec=True - ) - self.set_result = self.set_result_patcher.start() - - # mock send method of senders - self.send_patcher = mock.patch.object( - zmq_senders.RequestSenderProxy, 'send', - side_effect=zmq_senders.RequestSenderProxy.send, autospec=True - ) - self.send = self.send_patcher.start() - - # get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # prepare and launch proxy - self.proxy = zmq_proxy.ZmqProxy(self.conf) - vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker)) - self.executor = zmq_async.get_executor(self.proxy.run) - self.executor.execute() - - # create listener - self.listener = zmq_common.TestServerListener(self.driver) - - # create target and message - self.target = oslo_messaging.Target(topic='topic', server='server') - self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}} - - # start listening to target - self.listener.listen(self.target) - - # get ack manager - self.ack_manager = self.driver.client.get().publishers['default'] - - self.addCleanup( - zmq_common.StopRpc( - self, [('listener', 'stop'), ('executor', 'stop'), - ('proxy', 'close'), ('driver', 'cleanup'), - ('send_patcher', 'stop'), - ('set_result_patcher', 'stop')] - ) - ) - - # wait for all connections to be established - # and all parties to be ready for messaging - time.sleep(1) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) - def test_cast_success_without_retries(self, received_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - def test_cast_success_with_one_retry(self): - with mock.patch.object(DealerConsumerWithAcks, - '_acknowledge') as lost_ack_mock: - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.listener._received.wait(5) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - self.listener._received.clear() - with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) as received_ack_mock: - self.ack_manager.pool.shutdown(wait=True) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(2, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - def test_cast_success_with_two_retries(self): - with mock.patch.object(DealerConsumerWithAcks, - '_acknowledge') as lost_ack_mock: - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.listener._received.wait(5) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - self.listener._received.clear() - self.listener._received.wait(7.5) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(2, self.send.call_count) - self.assertEqual(2, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) as received_ack_mock: - self.ack_manager.pool.shutdown(wait=True) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(3, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') - def test_cast_failure_exhausted_retries(self, lost_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(3, self.send.call_count) - self.assertEqual(3, lost_ack_mock.call_count) - self.assertEqual(1, self.set_result.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) - @mock.patch.object(DealerConsumerWithAcks, '_reply', - side_effect=DealerConsumerWithAcks._reply, - autospec=True) - @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache', - side_effect=DealerConsumerWithAcks._reply_from_cache, - autospec=True) - def test_call_success_without_retries(self, unused_reply_from_cache_mock, - received_reply_mock, - received_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=True, timeout=10 - ) - self.assertIsNotNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(3, self.set_result.call_count) - received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY, - reply=True, failure=None) - self.assertEqual(0, unused_reply_from_cache_mock.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') - @mock.patch.object(DealerConsumerWithAcks, '_reply') - @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache') - def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock, - lost_reply_mock, lost_ack_mock): - self.assertRaises(oslo_messaging.MessagingTimeout, - self.driver.send, - self.target, {}, self.message, - wait_for_reply=True, timeout=20) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(3, self.send.call_count) - self.assertEqual(3, lost_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - lost_reply_mock.assert_called_once_with(mock.ANY, - reply=True, failure=None) - self.assertEqual(2, lost_reply_from_cache_mock.call_count) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py b/oslo_messaging/tests/drivers/zmq/test_zmq_address.py deleted file mode 100644 index 519c294cc..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import testscenarios -import testtools - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestZmqAddress(test_utils.BaseTestCase): - - scenarios = [ - ('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}), - ('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)}) - ] - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_only(self): - target = oslo_messaging.Target(topic='topic') - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_round_robin(self): - target = oslo_messaging.Target(topic='topic', server='server') - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic/server', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_fanout(self): - target = oslo_messaging.Target(topic='topic', fanout=True) - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_fanout(self): - target = oslo_messaging.Target(topic='topic', server='server', - fanout=True) - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_fanout_no_prefix(self): - target = oslo_messaging.Target(topic='topic', server='server', - fanout=True) - key = zmq_address.target_to_key(target) - self.assertEqual('topic', key) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py deleted file mode 100644 index a4dccd9ec..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools - -from oslo_messaging._drivers.zmq_driver.poller import green_poller -from oslo_messaging._drivers.zmq_driver.poller import threading_poller -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - - -class TestImportZmq(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestImportZmq, self).setUp() - - def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - mock_try_import = mock.Mock() - zmq_async.importutils.try_import = mock_try_import - - zmq_async.import_zmq() - - mock_try_import.assert_called_with('eventlet.green.zmq', default=None) - - def test_when_evetlet_is_unavailable_then_load_zmq(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - mock_try_import = mock.Mock() - zmq_async.importutils.try_import = mock_try_import - - zmq_async.import_zmq() - - mock_try_import.assert_called_with('zmq', default=None) - - -class TestGetPoller(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestGetPoller, self).setUp() - - def test_when_eventlet_is_available_then_return_GreenPoller(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - poller = zmq_async.get_poller() - - self.assertIsInstance(poller, green_poller.GreenPoller) - - def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - poller = zmq_async.get_poller() - - self.assertIsInstance(poller, threading_poller.ThreadingPoller) - - -class TestGetExecutor(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestGetExecutor, self).setUp() - - def test_when_eventlet_module_is_available_then_return_GreenExecutor(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - executor = zmq_async.get_executor('any method') - - self.assertIsInstance(executor, green_poller.GreenExecutor) - self.assertEqual('any method', executor._method) - - def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - executor = zmq_async.get_executor('any method') - - self.assertIsInstance(executor, - threading_poller.ThreadingExecutor) - self.assertEqual('any method', executor._method) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py deleted file mode 100644 index 45df7967b..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools - -import oslo_messaging -from oslo_messaging._drivers import common -from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \ - import MatchmakerDummy -from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - -redis = zmq_matchmaker_redis.redis -sentinel = zmq_matchmaker_redis.redis_sentinel - - -class TestZmqTransportUrl(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqTransportUrl, self).setUp() - - def setup_url(self, url): - transport = oslo_messaging.get_transport(self.conf, url) - self.addCleanup(transport.cleanup) - driver = transport._driver - return driver, url - - def mock_redis(self): - if redis is None: - self.skipTest("redis not available") - else: - redis_patcher = mock.patch.object(redis, 'StrictRedis') - self.addCleanup(redis_patcher.stop) - return redis_patcher.start() - - def mock_sentinel(self): - if sentinel is None: - self.skipTest("sentinel not available") - else: - sentinel_patcher = mock.patch.object(sentinel, 'Sentinel') - self.addCleanup(sentinel_patcher.stop) - return sentinel_patcher.start() - - def test_empty_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq', driver.matchmaker.url.transport) - - def test_error_url(self): - self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///") - - def test_dummy_url(self): - driver, url = self.setup_url("zmq+dummy:///") - self.assertIs(MatchmakerDummy, - driver.matchmaker.__class__) - self.assertEqual('zmq+dummy', driver.matchmaker.url.transport) - - def test_redis_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq+redis:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - - def test_sentinel_url(self): - self.mock_sentinel() - driver, url = self.setup_url("zmq+sentinel:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, - driver.matchmaker.__class__) - self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) - - def test_host_with_credentials_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq://:password@host:60000/") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq', driver.matchmaker.url.transport) - self.assertEqual( - [{"host": "host", "port": 60000, "password": "password"}], - driver.matchmaker._redis_hosts - ) - - def test_redis_multiple_hosts_url(self): - self.mock_redis() - driver, url = self.setup_url( - "zmq+redis://host1:60001,host2:60002,host3:60003/" - ) - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - self.assertEqual( - [{"host": "host1", "port": 60001, "password": None}, - {"host": "host2", "port": 60002, "password": None}, - {"host": "host3", "port": 60003, "password": None}], - driver.matchmaker._redis_hosts - ) - - def test_sentinel_multiple_hosts_url(self): - self.mock_sentinel() - driver, url = self.setup_url( - "zmq+sentinel://host1:20001,host2:20002,host3:20003/" - ) - self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, - driver.matchmaker.__class__) - self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) - self.assertEqual( - [("host1", 20001), ("host2", 20002), ("host3", 20003)], - driver.matchmaker._sentinel_hosts - ) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py deleted file mode 100644 index 772a97ded..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache -from oslo_messaging.tests import utils as test_utils - - -class TestZmqTTLCache(test_utils.BaseTestCase): - - def setUp(self): - super(TestZmqTTLCache, self).setUp() - - def call_count_decorator(unbound_method): - def wrapper(self, *args, **kwargs): - wrapper.call_count += 1 - return unbound_method(self, *args, **kwargs) - wrapper.call_count = 0 - return wrapper - - zmq_ttl_cache.TTLCache._update_cache = \ - call_count_decorator(zmq_ttl_cache.TTLCache._update_cache) - - self.cache = zmq_ttl_cache.TTLCache(ttl=1) - - self.addCleanup(lambda: self.cache.cleanup()) - - def _test_add_get(self): - self.cache.add('x', 'a') - - self.assertEqual(self.cache.get('x'), 'a') - self.assertEqual(self.cache.get('x', 'b'), 'a') - self.assertIsNone(self.cache.get('y')) - self.assertEqual(self.cache.get('y', 'b'), 'b') - - time.sleep(1) - - self.assertIsNone(self.cache.get('x')) - self.assertEqual(self.cache.get('x', 'b'), 'b') - - def test_add_get_with_executor(self): - self._test_add_get() - - def test_add_get_without_executor(self): - self.cache._executor.stop() - self._test_add_get() - - def _test_in_operator(self): - self.cache.add(1) - - self.assertIn(1, self.cache) - - time.sleep(0.5) - - self.cache.add(2) - - self.assertIn(1, self.cache) - self.assertIn(2, self.cache) - - time.sleep(0.75) - - self.cache.add(3) - - self.assertNotIn(1, self.cache) - self.assertIn(2, self.cache) - self.assertIn(3, self.cache) - - time.sleep(0.5) - - self.assertNotIn(2, self.cache) - self.assertIn(3, self.cache) - - def test_in_operator_with_executor(self): - self._test_in_operator() - - def test_in_operator_without_executor(self): - self.cache._executor.stop() - self._test_in_operator() - - def _is_expired(self, key): - with self.cache._lock: - _, expiration_time = self.cache._cache[key] - return self.cache._is_expired(expiration_time, time.time()) - - def test_executor(self): - self.cache.add(1) - - self.assertEqual([1], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(1)) - - time.sleep(0.75) - - self.assertEqual(1, self.cache._update_cache.call_count) - - self.cache.add(2) - - self.assertEqual([1, 2], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(1)) - self.assertFalse(self._is_expired(2)) - - time.sleep(0.75) - - self.assertEqual(2, self.cache._update_cache.call_count) - - self.cache.add(3) - - if 1 in self.cache: - self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys())) - self.assertTrue(self._is_expired(1)) - else: - self.assertEqual([2, 3], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(2)) - self.assertFalse(self._is_expired(3)) - - time.sleep(0.75) - - self.assertEqual(3, self.cache._update_cache.call_count) - - self.assertEqual([3], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(3)) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py b/oslo_messaging/tests/drivers/zmq/test_zmq_version.py deleted file mode 100644 index 9b0189403..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver import zmq_version -from oslo_messaging.tests import utils as test_utils - - -class Doer(object): - - def __init__(self): - self.x = 1 - self.y = 2 - self.z = 3 - - def _sudo(self): - pass - - def do(self): - pass - - def _do_v_1_1(self): - pass - - def _do_v_2_2(self): - pass - - def _do_v_3_3(self): - pass - - -class TestZmqVersion(test_utils.BaseTestCase): - - def setUp(self): - super(TestZmqVersion, self).setUp() - self.doer = Doer() - - def test_get_unknown_attr_versions(self): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, 'qwerty') - - def test_get_non_method_attr_versions(self): - for attr_name in vars(self.doer): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, attr_name) - - def test_get_private_method_versions(self): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, '_sudo') - - def test_get_public_method_versions(self): - do_versions = zmq_version.get_method_versions(self.doer, 'do') - self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys())) diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py deleted file mode 100644 index 2e3699933..000000000 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import threading - -import fixtures -from six.moves import mock -import testtools - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_options -from oslo_messaging._i18n import _LE -from oslo_messaging.tests import utils as test_utils - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class TestServerListener(object): - - def __init__(self, driver): - self.driver = driver - self.listener = None - self.executor = zmq_async.get_executor(self._run) - self._stop = threading.Event() - self._received = threading.Event() - self.message = None - - def listen(self, target): - self.listener = self.driver.listen(target, None, - None)._poll_style_listener - self.executor.execute() - - def listen_notifications(self, targets_and_priorities): - self.listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - self.executor.execute() - - def _run(self): - try: - messages = self.listener.poll() - if messages: - message = messages[0] - message.acknowledge() - self._received.set() - self.message = message - message.reply(reply=True) - except Exception: - LOG.exception(_LE("Unexpected exception occurred.")) - - def stop(self): - self.executor.stop() - - -class ZmqBaseTestCase(test_utils.BaseTestCase): - """Base test case for all ZMQ tests """ - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(ZmqBaseTestCase, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - zmq_options.register_opts(self.conf, mock.MagicMock()) - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_zmq_ipc_dir': self.internal_ipc_dir, - 'use_pub_sub': False, - 'use_router_proxy': False, - 'rpc_zmq_matchmaker': 'dummy'} - self.config(group='oslo_messaging_zmq', **kwargs) - self.config(rpc_response_timeout=5) - - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - self.listener = TestServerListener(self.driver) - - self.addCleanup( - StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')]) - ) - - -class StopRpc(object): - def __init__(self, obj, attrs_and_stops): - self.obj = obj - self.attrs_and_stops = attrs_and_stops - - def __call__(self): - for attr, stop in self.attrs_and_stops: - if hasattr(self.obj, attr): - obj_attr = getattr(self.obj, attr) - if hasattr(obj_attr, stop): - obj_attr_stop = getattr(obj_attr, stop) - obj_attr_stop() diff --git a/oslo_messaging/tests/functional/zmq/__init__.py b/oslo_messaging/tests/functional/zmq/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py deleted file mode 100644 index cf3b6e3a5..000000000 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ /dev/null @@ -1,232 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import logging.handlers -import multiprocessing -import os -import sys -import threading -import time -import uuid - -from oslo_config import cfg - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests.functional import utils - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class QueueHandler(logging.Handler): - """This is a logging handler which sends events to a multiprocessing queue. - - The plan is to add it to Python 3.2, but this can be copy pasted into - user code for use with earlier Python versions. - """ - - def __init__(self, queue): - """Initialise an instance, using the passed queue.""" - logging.Handler.__init__(self) - self.queue = queue - - def emit(self, record): - """Emit a record. - - Writes the LogRecord to the queue. - """ - try: - ei = record.exc_info - if ei: - # just to get traceback text into record.exc_text - dummy = self.format(record) # noqa - record.exc_info = None # not needed any more - self.queue.put_nowait(record) - except (KeyboardInterrupt, SystemExit): - raise - except Exception: - self.handleError(record) - - -def listener_configurer(conf): - root = logging.getLogger() - h = logging.StreamHandler(sys.stdout) - f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s ' - '%(levelname)-8s %(message)s') - h.setFormatter(f) - root.addHandler(h) - log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \ - "/" + "zmq_multiproc.log" - file_handler = logging.StreamHandler(open(log_path, 'w')) - file_handler.setFormatter(f) - root.addHandler(file_handler) - - -def server_configurer(queue): - h = QueueHandler(queue) - root = logging.getLogger() - root.addHandler(h) - root.setLevel(logging.DEBUG) - - -def listener_thread(queue, configurer, conf): - configurer(conf) - while True: - time.sleep(0.3) - try: - record = queue.get() - if record is None: - break - logger = logging.getLogger(record.name) - logger.handle(record) - except (KeyboardInterrupt, SystemExit): - raise - - -class Client(oslo_messaging.RPCClient): - - def __init__(self, transport, topic): - super(Client, self).__init__( - transport=transport, target=oslo_messaging.Target(topic=topic)) - self.replies = [] - - def call_a(self): - LOG.warning("call_a - client side") - rep = self.call({}, 'call_a') - LOG.warning("after call_a - client side") - self.replies.append(rep) - return rep - - -class ReplyServerEndpoint(object): - - def call_a(self, *args, **kwargs): - LOG.warning("call_a - Server endpoint reached!") - return "OK" - - -class Server(object): - - def __init__(self, conf, log_queue, transport_url, name, topic=None): - self.conf = conf - self.log_queue = log_queue - self.transport_url = transport_url - self.name = name - self.topic = topic or str(uuid.uuid4()) - self.ready = multiprocessing.Value('b', False) - self._stop = multiprocessing.Event() - - def start(self): - self.process = multiprocessing.Process(target=self._run_server, - name=self.name, - args=(self.conf, - self.transport_url, - self.log_queue, - self.ready)) - self.process.start() - LOG.debug("Server process started: pid: %d", self.process.pid) - - def _run_server(self, conf, url, log_queue, ready): - server_configurer(log_queue) - LOG.debug("Starting RPC server") - - transport = oslo_messaging.get_transport(conf, url=url) - target = oslo_messaging.Target(topic=self.topic, server=self.name) - self.rpc_server = oslo_messaging.get_rpc_server( - transport=transport, target=target, - endpoints=[ReplyServerEndpoint()], - executor='eventlet') - self.rpc_server.start() - ready.value = True - LOG.debug("RPC server being started") - while not self._stop.is_set(): - LOG.debug("Waiting for the stop signal ...") - time.sleep(1) - self.rpc_server.stop() - self.rpc_server.wait() - LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) - - def cleanup(self): - LOG.debug("Stopping server") - self.shutdown() - - def shutdown(self): - self._stop.set() - - def restart(self, time_for_restart=1): - pass - - def hang(self): - pass - - def crash(self): - pass - - def ping(self): - pass - - -class MultiprocTestCase(utils.SkipIfNoTransportURL): - - def setUp(self): - super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts()) - - if not self.url.startswith("zmq"): - self.skipTest("ZeroMQ specific skipped...") - - self.transport = oslo_messaging.get_transport(self.conf, url=self.url) - - LOG.debug("Start log queue") - - self.log_queue = multiprocessing.Queue() - self.log_listener = threading.Thread(target=listener_thread, - args=(self.log_queue, - listener_configurer, - self.conf)) - self.log_listener.start() - self.spawned = [] - - self.conf.prog = "test_prog" - self.conf.project = "test_project" - - def tearDown(self): - for process in self.spawned: - process.cleanup() - super(MultiprocTestCase, self).tearDown() - - def get_client(self, topic): - return Client(self.transport, topic) - - def spawn_server(self, wait_for_server=False, topic=None): - name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8]) - server = Server(self.conf, self.log_queue, self.url, name, topic) - LOG.debug("[SPAWN] %s (starting)...", server.name) - server.start() - if wait_for_server: - while not server.ready.value: - LOG.debug("[SPAWN] %s (waiting for server ready)...", - server.name) - time.sleep(1) - LOG.debug("[SPAWN] Server %s:%d started.", - server.name, server.process.pid) - self.spawned.append(server) - return server - - def spawn_servers(self, number, wait_for_server=False, common_topic=True): - topic = str(uuid.uuid4()) if common_topic else None - for _ in range(number): - self.spawn_server(wait_for_server, topic) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py deleted file mode 100644 index ebea76ec7..000000000 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -from oslo_messaging.tests.functional.zmq import multiproc_utils - - -class StartupOrderTestCase(multiproc_utils.MultiprocTestCase): - - def setUp(self): - super(StartupOrderTestCase, self).setUp() - - self.conf.prog = "test_prog" - self.conf.project = "test_project" - - self.config(rpc_response_timeout=10) - - log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, - str(os.getpid()) + ".log") - sys.stdout = open(log_path, "wb", buffering=0) - - def test_call_client_wait_for_server(self): - server = self.spawn_server(wait_for_server=True) - client = self.get_client(server.topic) - for _ in range(3): - reply = client.call_a() - self.assertIsNotNone(reply) - self.assertEqual(3, len(client.replies)) - - def test_call_client_dont_wait_for_server(self): - server = self.spawn_server(wait_for_server=False) - client = self.get_client(server.topic) - for _ in range(3): - reply = client.call_a() - self.assertIsNotNone(reply) - self.assertEqual(3, len(client.replies))