[zmq] Fix TestZmqAckManager periodic failure
Change-Id: I011b80ae2db094cd34615b80e71a6545833d8ace Closes-Bug: #1617560
This commit is contained in:
parent
0706361141
commit
09816f0451
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
import testtools
|
import testtools
|
||||||
|
import time
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||||
@ -34,10 +35,10 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
@testtools.skipIf(zmq is None, "zmq not available")
|
@testtools.skipIf(zmq is None, "zmq not available")
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestZmqAckManager, self).setUp()
|
super(TestZmqAckManager, self).setUp()
|
||||||
|
|
||||||
|
# register and set necessary config opts
|
||||||
self.messaging_conf.transport_driver = 'zmq'
|
self.messaging_conf.transport_driver = 'zmq'
|
||||||
zmq_options.register_opts(self.conf)
|
zmq_options.register_opts(self.conf)
|
||||||
|
|
||||||
# set config opts
|
|
||||||
kwargs = {'rpc_zmq_matchmaker': 'dummy',
|
kwargs = {'rpc_zmq_matchmaker': 'dummy',
|
||||||
'use_pub_sub': False,
|
'use_pub_sub': False,
|
||||||
'use_router_proxy': True,
|
'use_router_proxy': True,
|
||||||
@ -68,9 +69,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
transport = oslo_messaging.get_transport(self.conf)
|
transport = oslo_messaging.get_transport(self.conf)
|
||||||
self.driver = transport._driver
|
self.driver = transport._driver
|
||||||
|
|
||||||
# get ack manager
|
|
||||||
self.ack_manager = self.driver.client.get().publishers['default']
|
|
||||||
|
|
||||||
# prepare and launch proxy
|
# prepare and launch proxy
|
||||||
self.proxy = zmq_proxy.ZmqProxy(self.conf,
|
self.proxy = zmq_proxy.ZmqProxy(self.conf,
|
||||||
zmq_queue_proxy.UniversalQueueProxy)
|
zmq_queue_proxy.UniversalQueueProxy)
|
||||||
@ -85,6 +83,12 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
self.target = oslo_messaging.Target(topic='topic', server='server')
|
self.target = oslo_messaging.Target(topic='topic', server='server')
|
||||||
self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
|
self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
|
||||||
|
|
||||||
|
# start listening to target
|
||||||
|
self.listener.listen(self.target)
|
||||||
|
|
||||||
|
# get ack manager
|
||||||
|
self.ack_manager = self.driver.client.get().publishers['default']
|
||||||
|
|
||||||
self.addCleanup(
|
self.addCleanup(
|
||||||
zmq_common.StopRpc(
|
zmq_common.StopRpc(
|
||||||
self, [('listener', 'stop'), ('executor', 'stop'),
|
self, [('listener', 'stop'), ('executor', 'stop'),
|
||||||
@ -94,13 +98,16 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# wait for all connections to be established
|
||||||
|
# and all parties to be ready for messaging
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
@mock.patch.object(
|
@mock.patch.object(
|
||||||
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
|
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
|
||||||
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
|
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
|
||||||
autospec=True
|
autospec=True
|
||||||
)
|
)
|
||||||
def test_cast_success_without_retries(self, received_ack_mock):
|
def test_cast_success_without_retries(self, received_ack_mock):
|
||||||
self.listener.listen(self.target)
|
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
self.target, {}, self.message, wait_for_reply=False
|
self.target, {}, self.message, wait_for_reply=False
|
||||||
)
|
)
|
||||||
@ -113,7 +120,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
self.assertEqual(2, self.set_result.call_count)
|
self.assertEqual(2, self.set_result.call_count)
|
||||||
|
|
||||||
def test_cast_success_with_one_retry(self):
|
def test_cast_success_with_one_retry(self):
|
||||||
self.listener.listen(self.target)
|
|
||||||
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
|
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
|
||||||
'acknowledge') as lost_ack_mock:
|
'acknowledge') as lost_ack_mock:
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
@ -139,7 +145,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
self.assertEqual(2, self.set_result.call_count)
|
self.assertEqual(2, self.set_result.call_count)
|
||||||
|
|
||||||
def test_cast_success_with_two_retries(self):
|
def test_cast_success_with_two_retries(self):
|
||||||
self.listener.listen(self.target)
|
|
||||||
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
|
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
|
||||||
'acknowledge') as lost_ack_mock:
|
'acknowledge') as lost_ack_mock:
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
@ -171,7 +176,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
|
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
|
||||||
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
|
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
|
||||||
self.listener.listen(self.target)
|
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
self.target, {}, self.message, wait_for_reply=False
|
self.target, {}, self.message, wait_for_reply=False
|
||||||
)
|
)
|
||||||
@ -195,7 +199,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
)
|
)
|
||||||
def test_call_success_without_retries(self, received_reply_mock,
|
def test_call_success_without_retries(self, received_reply_mock,
|
||||||
received_ack_mock):
|
received_ack_mock):
|
||||||
self.listener.listen(self.target)
|
|
||||||
result = self.driver.send(
|
result = self.driver.send(
|
||||||
self.target, {}, self.message, wait_for_reply=True, timeout=10
|
self.target, {}, self.message, wait_for_reply=True, timeout=10
|
||||||
)
|
)
|
||||||
@ -213,7 +216,6 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
def test_call_failure_exhausted_retries_and_timeout_error(self,
|
def test_call_failure_exhausted_retries_and_timeout_error(self,
|
||||||
lost_reply_mock,
|
lost_reply_mock,
|
||||||
lost_ack_mock):
|
lost_ack_mock):
|
||||||
self.listener.listen(self.target)
|
|
||||||
self.assertRaises(oslo_messaging.MessagingTimeout,
|
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||||
self.driver.send,
|
self.driver.send,
|
||||||
self.target, {}, self.message,
|
self.target, {}, self.message,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user