diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 30b4d2bf8..1cdea3a27 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -580,7 +580,7 @@ class AMQPDriverBase(base.BaseDriver): def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, call_monitor_timeout=None, - envelope=True, notify=False, retry=None): + envelope=True, notify=False, retry=None, transport_options=None): msg = message @@ -626,7 +626,8 @@ class AMQPDriverBase(base.BaseDriver): " topic '%(topic)s'", {'exchange': exchange, 'topic': topic}) conn.topic_send(exchange_name=exchange, topic=topic, - msg=msg, timeout=timeout, retry=retry) + msg=msg, timeout=timeout, retry=retry, + transport_options=transport_options) if wait_for_reply: result = self._waiter.wait(msg_id, timeout, @@ -639,9 +640,10 @@ class AMQPDriverBase(base.BaseDriver): self._waiter.unlisten(msg_id) def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): return self._send(target, ctxt, message, wait_for_reply, timeout, - call_monitor_timeout, retry=retry) + call_monitor_timeout, retry=retry, + transport_options=transport_options) def send_notification(self, target, ctxt, message, version, retry=None): return self._send(target, ctxt, message, diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 82dcda5b7..fb44540e9 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -20,7 +20,6 @@ from oslo_utils import excutils from oslo_utils import timeutils import six - from oslo_messaging import exceptions base_opts = [ @@ -41,6 +40,7 @@ def batch_poll_helper(func): :py:meth:`PollStyleListener.poll` implementation that only polls for a single message per call. """ + def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None): incomings = [] driver_prefetch = in_self.prefetch_size @@ -57,6 +57,7 @@ def batch_poll_helper(func): break return incomings + return wrapper @@ -244,9 +245,9 @@ class Listener(object): all backend implementations. :type prefetch_size: int """ + def __init__(self, batch_size, batch_timeout, prefetch_size=-1): - self.on_incoming_callback = None self.batch_timeout = batch_timeout self.prefetch_size = prefetch_size @@ -283,6 +284,7 @@ class PollStyleListenerAdapter(Listener): """A Listener that uses a PollStyleListener for message transfer. A dedicated thread is created to do message polling. """ + def __init__(self, poll_style_listener, batch_size, batch_timeout): super(PollStyleListenerAdapter, self).__init__( batch_size, batch_timeout, poll_style_listener.prefetch_size @@ -364,7 +366,7 @@ class BaseDriver(object): @abc.abstractmethod def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, call_monitor_timeout=None, - retry=None): + retry=None, transport_options=None): """Send a message to the given target and optionally wait for a reply. This method is used by the RPC client when sending RPC requests to a server. @@ -434,6 +436,10 @@ class BaseDriver(object): :type call_monitor_timeout: float :param retry: maximum message send attempts permitted :type retry: int + :param transport_options: additional parameters to configure the driver + for example to send parameters as "mandatory" + flag in RabbitMQ + :type transport_options: dictionary :returns: A reply message or None if no reply expected :raises: :py:exc:`MessagingException`, any exception thrown by the remote server when executing the RPC call. diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index fccfebe08..05300e7b1 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -188,7 +188,8 @@ class FakeDriver(base.BaseDriver): """ json.dumps(message) - def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + transport_options=None): self._check_serialize(message) exchange = self._exchange_manager.get_exchange(target.exchange) @@ -216,10 +217,11 @@ class FakeDriver(base.BaseDriver): return None def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): # NOTE(sileht): retry doesn't need to be implemented, the fake # transport always works - return self._send(target, ctxt, message, wait_for_reply, timeout) + return self._send(target, ctxt, message, wait_for_reply, timeout, + transport_options) def send_notification(self, target, ctxt, message, version, retry=None): # NOTE(sileht): retry doesn't need to be implemented, the fake diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b08b6efa3..aedb163c3 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1104,7 +1104,7 @@ class Connection(object): self.declare_consumer(consumer) def _ensure_publishing(self, method, exchange, msg, routing_key=None, - timeout=None, retry=None): + timeout=None, retry=None, transport_options=None): """Send to a publisher based on the publisher class.""" def _error_callback(exc): @@ -1113,7 +1113,8 @@ class Connection(object): "'%(topic)s': %(err_str)s", log_info) LOG.debug('Exception', exc_info=exc) - method = functools.partial(method, exchange, msg, routing_key, timeout) + method = functools.partial(method, exchange, msg, routing_key, + timeout, transport_options) with self._connection_lock: self.ensure(method, retry=retry, error_callback=_error_callback) @@ -1135,7 +1136,8 @@ class Connection(object): 'connection_id': self.connection_id}) return info - def _publish(self, exchange, msg, routing_key=None, timeout=None): + def _publish(self, exchange, msg, routing_key=None, timeout=None, + transport_options=None): """Publish a message.""" if not (exchange.passive or exchange.name in self._declared_exchanges): @@ -1144,7 +1146,8 @@ class Connection(object): log_info = {'msg': msg, 'who': exchange or 'default', - 'key': routing_key} + 'key': routing_key, + 'transport_options': str(transport_options)} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) @@ -1158,7 +1161,8 @@ class Connection(object): compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, - routing_key=None, timeout=None): + routing_key=None, timeout=None, + transport_options=None): """Publisher that declares a default queue When the exchange is missing instead of silently creates an exchange @@ -1195,7 +1199,8 @@ class Connection(object): def _publish_and_raises_on_missing_exchange(self, exchange, msg, routing_key=None, - timeout=None): + timeout=None, + transport_options=None): """Publisher that raises exception if exchange is missing.""" if not exchange.passive: raise RuntimeError("_publish_and_retry_on_missing_exchange() must " @@ -1203,7 +1208,7 @@ class Connection(object): try: self._publish(exchange, msg, routing_key=routing_key, - timeout=timeout) + timeout=timeout, transport_options=transport_options) return except self.connection.channel_errors as exc: if exc.code == 404: @@ -1230,7 +1235,8 @@ class Connection(object): self._ensure_publishing(self._publish_and_raises_on_missing_exchange, exchange, msg, routing_key=msg_id) - def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None, + transport_options=None): """Send a 'topic' message.""" exchange = kombu.entity.Exchange( name=exchange_name, @@ -1240,7 +1246,8 @@ class Connection(object): self._ensure_publishing(self._publish, exchange, msg, routing_key=topic, timeout=timeout, - retry=retry) + retry=retry, + transport_options=transport_options) def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 636b2a194..ea5f54e2c 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -1,4 +1,3 @@ - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -45,7 +44,6 @@ _client_opts = [ class RemoteError(exceptions.MessagingException): - """Signifies that a remote endpoint method has raised an exception. Contains a string representation of the type of the original exception, @@ -94,7 +92,7 @@ class _BaseCallContext(object): def __init__(self, transport, target, serializer, timeout=None, version_cap=None, retry=None, - call_monitor_timeout=None): + call_monitor_timeout=None, transport_options=None): self.conf = transport.conf self.transport = transport @@ -104,6 +102,7 @@ class _BaseCallContext(object): self.call_monitor_timeout = call_monitor_timeout self.retry = retry self.version_cap = version_cap + self.transport_options = transport_options super(_BaseCallContext, self).__init__() @@ -150,7 +149,9 @@ class _BaseCallContext(object): self._check_version_cap(msg.get('version')) try: - self.transport._send(self.target, msg_ctxt, msg, retry=self.retry) + self.transport._send(self.target, msg_ctxt, msg, + retry=self.retry, + transport_options=self.transport_options) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -172,10 +173,12 @@ class _BaseCallContext(object): self._check_version_cap(msg.get('version')) try: - result = self.transport._send(self.target, msg_ctxt, msg, - wait_for_reply=True, timeout=timeout, - call_monitor_timeout=cm_timeout, - retry=self.retry) + result = \ + self.transport._send(self.target, msg_ctxt, msg, + wait_for_reply=True, timeout=timeout, + call_monitor_timeout=cm_timeout, + retry=self.retry, + transport_options=self.transport_options) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -190,7 +193,6 @@ class _BaseCallContext(object): class _CallContext(_BaseCallContext): - _marker = _BaseCallContext._marker @classmethod @@ -198,7 +200,7 @@ class _CallContext(_BaseCallContext): exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker, - call_monitor_timeout=_marker): + call_monitor_timeout=_marker, transport_options=_marker): cls._check_version(version) kwargs = dict( exchange=exchange, @@ -219,11 +221,13 @@ class _CallContext(_BaseCallContext): retry = call_context.retry if call_monitor_timeout is cls._marker: call_monitor_timeout = call_context.call_monitor_timeout + if transport_options is cls._marker: + transport_options = call_context.transport_options return _CallContext(call_context.transport, target, call_context.serializer, timeout, version_cap, retry, - call_monitor_timeout) + call_monitor_timeout, transport_options) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, @@ -237,7 +241,6 @@ class _CallContext(_BaseCallContext): class RPCClient(_BaseCallContext): - """A class for invoking methods on remote RPC servers. The RPCClient class is responsible for sending method invocations to and @@ -326,7 +329,7 @@ class RPCClient(_BaseCallContext): def __init__(self, transport, target, timeout=None, version_cap=None, serializer=None, retry=None, - call_monitor_timeout=None): + call_monitor_timeout=None, transport_options=None): """Construct an RPC client. :param transport: a messaging transport handle @@ -362,7 +365,7 @@ class RPCClient(_BaseCallContext): super(RPCClient, self).__init__( transport, target, serializer, timeout, version_cap, retry, - call_monitor_timeout + call_monitor_timeout, transport_options ) self.conf.register_opts(_client_opts) @@ -370,7 +373,7 @@ class RPCClient(_BaseCallContext): def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker, - call_monitor_timeout=_marker): + call_monitor_timeout=_marker, transport_options=_marker): """Prepare a method invocation context. Use this method to override client properties for an individual method @@ -401,6 +404,10 @@ class RPCClient(_BaseCallContext): 0 means no retry is attempted. N means attempt at most N retries. :type retry: int + :param transport_options: additional parameters to configure the driver + for example to send parameters as "mandatory" + flag in RabbitMQ + :type transport_options: dictionary :param call_monitor_timeout: an optional timeout (in seconds) for active call heartbeating. If specified, requires the server to heartbeat @@ -413,7 +420,7 @@ class RPCClient(_BaseCallContext): exchange, topic, namespace, version, server, fanout, timeout, version_cap, retry, - call_monitor_timeout) + call_monitor_timeout, transport_options) def cast(self, ctxt, method, **kwargs): """Invoke a method without blocking for a return value. diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index 9fa40db05..ec22d705c 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -49,7 +49,31 @@ class TestCastCall(test_utils.BaseTestCase): transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None} + kwargs = {'retry': None, 'transport_options': None} + if self.call: + kwargs['wait_for_reply'] = True + kwargs['timeout'] = None + kwargs['call_monitor_timeout'] = None + + method = client.call if self.call else client.cast + method(self.ctxt, 'foo', **self.args) + + transport._send.assert_called_once_with(oslo_messaging.Target(), + self.ctxt, + msg, + **kwargs) + + def test_cast_call_with_transport_options(self): + self.config(rpc_response_timeout=None) + + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') + client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), + transport_options={'my_k': 'my_val'}) + + transport._send = mock.Mock() + + msg = dict(method='foo', args=self.args) + kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -203,7 +227,8 @@ class TestCastToTarget(test_utils.BaseTestCase): transport._send.assert_called_once_with(expect_target, {}, msg, - retry=None) + retry=None, + transport_options=None) TestCastToTarget.generate_scenarios() @@ -245,7 +270,7 @@ class TestCallTimeout(test_utils.BaseTestCase): msg = dict(method='foo', args={}) kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None, - call_monitor_timeout=self.cm) + call_monitor_timeout=self.cm, transport_options=None) if self.prepare is not _notset: client = client.prepare(timeout=self.prepare) @@ -277,7 +302,8 @@ class TestCallRetry(test_utils.BaseTestCase): msg = dict(method='foo', args={}) kwargs = dict(wait_for_reply=True, timeout=60, - retry=self.expect, call_monitor_timeout=None) + retry=self.expect, call_monitor_timeout=None, + transport_options=None) if self.prepare is not _notset: client = client.prepare(retry=self.prepare) @@ -334,8 +360,8 @@ class TestSerializer(test_utils.BaseTestCase): serializer=serializer) transport._send = mock.Mock() - - kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {} + kwargs = dict(wait_for_reply=True, + timeout=None) if self.call else {} kwargs['retry'] = None if self.call: kwargs['call_monitor_timeout'] = None @@ -367,6 +393,7 @@ class TestSerializer(test_utils.BaseTestCase): transport._send.assert_called_once_with(oslo_messaging.Target(), dict(user='alice'), msg, + transport_options=None, **kwargs) expected_calls = [mock.call(self.ctxt, arg) for arg in self.args] self.assertEqual(expected_calls, @@ -466,7 +493,9 @@ class TestVersionCap(test_utils.BaseTestCase): self.assertFalse(self.success) else: self.assertTrue(self.success) - transport._send.assert_called_once_with(target, {}, msg, **kwargs) + transport._send.assert_called_once_with(target, {}, msg, + transport_options=None, + **kwargs) TestVersionCap.generate_scenarios() diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index a2c17f9c4..02a19f784 100755 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -231,7 +231,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): wait_for_reply=None, timeout=None, call_monitor_timeout=None, - retry=None) + retry=None, + transport_options=None) def test_send_all_args(self): t = transport.Transport(_FakeDriver(cfg.CONF)) @@ -250,7 +251,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): wait_for_reply='wait_for_reply', timeout='timeout', call_monitor_timeout='cm_timeout', - retry='retry') + retry='retry', + transport_options=None) def test_send_notification(self): t = transport.Transport(_FakeDriver(cfg.CONF)) diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index e44cb26b6..c263e3541 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -116,7 +116,7 @@ class Transport(object): self._driver.require_features(requeue=requeue) def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) @@ -124,7 +124,8 @@ class Transport(object): wait_for_reply=wait_for_reply, timeout=timeout, call_monitor_timeout=call_monitor_timeout, - retry=retry) + retry=retry, + transport_options=transport_options) def _send_notification(self, target, ctxt, message, version, retry=None): if not target.topic: