diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py index 744b9ba2b..7f5f434ee 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py @@ -14,6 +14,7 @@ import mock import testtools +import time import oslo_messaging from oslo_messaging._drivers.zmq_driver.client import zmq_receivers @@ -34,10 +35,10 @@ class TestZmqAckManager(test_utils.BaseTestCase): @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): super(TestZmqAckManager, self).setUp() + + # register and set necessary config opts self.messaging_conf.transport_driver = 'zmq' zmq_options.register_opts(self.conf) - - # set config opts kwargs = {'rpc_zmq_matchmaker': 'dummy', 'use_pub_sub': False, 'use_router_proxy': True, @@ -68,9 +69,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver - # get ack manager - self.ack_manager = self.driver.client.get().publishers['default'] - # prepare and launch proxy self.proxy = zmq_proxy.ZmqProxy(self.conf, zmq_queue_proxy.UniversalQueueProxy) @@ -85,6 +83,12 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.target = oslo_messaging.Target(topic='topic', server='server') self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}} + # start listening to target + self.listener.listen(self.target) + + # get ack manager + self.ack_manager = self.driver.client.get().publishers['default'] + self.addCleanup( zmq_common.StopRpc( self, [('listener', 'stop'), ('executor', 'stop'), @@ -94,13 +98,16 @@ class TestZmqAckManager(test_utils.BaseTestCase): ) ) + # wait for all connections to be established + # and all parties to be ready for messaging + time.sleep(1) + @mock.patch.object( zmq_incoming_message.ZmqIncomingMessage, 'acknowledge', side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge, autospec=True ) def test_cast_success_without_retries(self, received_ack_mock): - self.listener.listen(self.target) result = self.driver.send( self.target, {}, self.message, wait_for_reply=False ) @@ -113,7 +120,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, self.set_result.call_count) def test_cast_success_with_one_retry(self): - self.listener.listen(self.target) with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge') as lost_ack_mock: result = self.driver.send( @@ -139,7 +145,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, self.set_result.call_count) def test_cast_success_with_two_retries(self): - self.listener.listen(self.target) with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge') as lost_ack_mock: result = self.driver.send( @@ -171,7 +176,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge') def test_cast_failure_exhausted_retries(self, lost_ack_mock): - self.listener.listen(self.target) result = self.driver.send( self.target, {}, self.message, wait_for_reply=False ) @@ -195,7 +199,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): ) def test_call_success_without_retries(self, received_reply_mock, received_ack_mock): - self.listener.listen(self.target) result = self.driver.send( self.target, {}, self.message, wait_for_reply=True, timeout=10 ) @@ -213,7 +216,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): def test_call_failure_exhausted_retries_and_timeout_error(self, lost_reply_mock, lost_ack_mock): - self.listener.listen(self.target) self.assertRaises(oslo_messaging.MessagingTimeout, self.driver.send, self.target, {}, self.message,