Merge "Add transport_options parameter"

This commit is contained in:
Zuul 2019-06-17 16:35:55 +00:00 committed by Gerrit Code Review
commit e7420afa2f
8 changed files with 102 additions and 46 deletions

@ -580,7 +580,7 @@ class AMQPDriverBase(base.BaseDriver):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
envelope=True, notify=False, retry=None):
envelope=True, notify=False, retry=None, transport_options=None):
msg = message
@ -626,7 +626,8 @@ class AMQPDriverBase(base.BaseDriver):
" topic '%(topic)s'", {'exchange': exchange,
'topic': topic})
conn.topic_send(exchange_name=exchange, topic=topic,
msg=msg, timeout=timeout, retry=retry)
msg=msg, timeout=timeout, retry=retry,
transport_options=transport_options)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout,
@ -639,9 +640,10 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
call_monitor_timeout, retry=retry)
call_monitor_timeout, retry=retry,
transport_options=transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,

@ -20,7 +20,6 @@ from oslo_utils import excutils
from oslo_utils import timeutils
import six
from oslo_messaging import exceptions
base_opts = [
@ -41,6 +40,7 @@ def batch_poll_helper(func):
:py:meth:`PollStyleListener.poll` implementation that only polls for a
single message per call.
"""
def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None):
incomings = []
driver_prefetch = in_self.prefetch_size
@ -57,6 +57,7 @@ def batch_poll_helper(func):
break
return incomings
return wrapper
@ -244,9 +245,9 @@ class Listener(object):
all backend implementations.
:type prefetch_size: int
"""
def __init__(self, batch_size, batch_timeout,
prefetch_size=-1):
self.on_incoming_callback = None
self.batch_timeout = batch_timeout
self.prefetch_size = prefetch_size
@ -283,6 +284,7 @@ class PollStyleListenerAdapter(Listener):
"""A Listener that uses a PollStyleListener for message transfer. A
dedicated thread is created to do message polling.
"""
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
@ -364,7 +366,7 @@ class BaseDriver(object):
@abc.abstractmethod
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
retry=None):
retry=None, transport_options=None):
"""Send a message to the given target and optionally wait for a reply.
This method is used by the RPC client when sending RPC requests to a
server.
@ -434,6 +436,10 @@ class BaseDriver(object):
:type call_monitor_timeout: float
:param retry: maximum message send attempts permitted
:type retry: int
:param transport_options: additional parameters to configure the driver
for example to send parameters as "mandatory"
flag in RabbitMQ
:type transport_options: dictionary
:returns: A reply message or None if no reply expected
:raises: :py:exc:`MessagingException`, any exception thrown by the
remote server when executing the RPC call.

@ -188,7 +188,8 @@ class FakeDriver(base.BaseDriver):
"""
json.dumps(message)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
transport_options=None):
self._check_serialize(message)
exchange = self._exchange_manager.get_exchange(target.exchange)
@ -216,10 +217,11 @@ class FakeDriver(base.BaseDriver):
return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)
return self._send(target, ctxt, message, wait_for_reply, timeout,
transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake

@ -1104,7 +1104,7 @@ class Connection(object):
self.declare_consumer(consumer)
def _ensure_publishing(self, method, exchange, msg, routing_key=None,
timeout=None, retry=None):
timeout=None, retry=None, transport_options=None):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
@ -1113,7 +1113,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s", log_info)
LOG.debug('Exception', exc_info=exc)
method = functools.partial(method, exchange, msg, routing_key, timeout)
method = functools.partial(method, exchange, msg, routing_key,
timeout, transport_options)
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
@ -1135,7 +1136,8 @@ class Connection(object):
'connection_id': self.connection_id})
return info
def _publish(self, exchange, msg, routing_key=None, timeout=None):
def _publish(self, exchange, msg, routing_key=None, timeout=None,
transport_options=None):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
@ -1144,7 +1146,8 @@ class Connection(object):
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
'key': routing_key,
'transport_options': str(transport_options)}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
@ -1158,7 +1161,8 @@ class Connection(object):
compression=self.kombu_compression)
def _publish_and_creates_default_queue(self, exchange, msg,
routing_key=None, timeout=None):
routing_key=None, timeout=None,
transport_options=None):
"""Publisher that declares a default queue
When the exchange is missing instead of silently creates an exchange
@ -1195,7 +1199,8 @@ class Connection(object):
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
timeout=None):
timeout=None,
transport_options=None):
"""Publisher that raises exception if exchange is missing."""
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
@ -1203,7 +1208,7 @@ class Connection(object):
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
timeout=timeout, transport_options=transport_options)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
@ -1230,7 +1235,8 @@ class Connection(object):
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
transport_options=None):
"""Send a 'topic' message."""
exchange = kombu.entity.Exchange(
name=exchange_name,
@ -1240,7 +1246,8 @@ class Connection(object):
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, timeout=timeout,
retry=retry)
retry=retry,
transport_options=transport_options)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""

@ -1,4 +1,3 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -45,7 +44,6 @@ _client_opts = [
class RemoteError(exceptions.MessagingException):
"""Signifies that a remote endpoint method has raised an exception.
Contains a string representation of the type of the original exception,
@ -94,7 +92,7 @@ class _BaseCallContext(object):
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None, retry=None,
call_monitor_timeout=None):
call_monitor_timeout=None, transport_options=None):
self.conf = transport.conf
self.transport = transport
@ -104,6 +102,7 @@ class _BaseCallContext(object):
self.call_monitor_timeout = call_monitor_timeout
self.retry = retry
self.version_cap = version_cap
self.transport_options = transport_options
super(_BaseCallContext, self).__init__()
@ -150,7 +149,9 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -172,10 +173,12 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry)
result = \
self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -190,7 +193,6 @@ class _BaseCallContext(object):
class _CallContext(_BaseCallContext):
_marker = _BaseCallContext._marker
@classmethod
@ -198,7 +200,7 @@ class _CallContext(_BaseCallContext):
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
call_monitor_timeout=_marker, transport_options=_marker):
cls._check_version(version)
kwargs = dict(
exchange=exchange,
@ -219,11 +221,13 @@ class _CallContext(_BaseCallContext):
retry = call_context.retry
if call_monitor_timeout is cls._marker:
call_monitor_timeout = call_context.call_monitor_timeout
if transport_options is cls._marker:
transport_options = call_context.transport_options
return _CallContext(call_context.transport, target,
call_context.serializer,
timeout, version_cap, retry,
call_monitor_timeout)
call_monitor_timeout, transport_options)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
@ -237,7 +241,6 @@ class _CallContext(_BaseCallContext):
class RPCClient(_BaseCallContext):
"""A class for invoking methods on remote RPC servers.
The RPCClient class is responsible for sending method invocations to and
@ -326,7 +329,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None,
call_monitor_timeout=None):
call_monitor_timeout=None, transport_options=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
@ -362,7 +365,7 @@ class RPCClient(_BaseCallContext):
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry,
call_monitor_timeout
call_monitor_timeout, transport_options
)
self.conf.register_opts(_client_opts)
@ -370,7 +373,7 @@ class RPCClient(_BaseCallContext):
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
call_monitor_timeout=_marker, transport_options=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
@ -401,6 +404,10 @@ class RPCClient(_BaseCallContext):
0 means no retry is attempted.
N means attempt at most N retries.
:type retry: int
:param transport_options: additional parameters to configure the driver
for example to send parameters as "mandatory"
flag in RabbitMQ
:type transport_options: dictionary
:param call_monitor_timeout: an optional timeout (in seconds) for
active call heartbeating. If specified,
requires the server to heartbeat
@ -413,7 +420,7 @@ class RPCClient(_BaseCallContext):
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry,
call_monitor_timeout)
call_monitor_timeout, transport_options)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method without blocking for a return value.

@ -49,7 +49,31 @@ class TestCastCall(test_utils.BaseTestCase):
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None}
kwargs = {'retry': None, 'transport_options': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
kwargs['call_monitor_timeout'] = None
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,
**kwargs)
def test_cast_call_with_transport_options(self):
self.config(rpc_response_timeout=None)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
transport_options={'my_k': 'my_val'})
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@ -203,7 +227,8 @@ class TestCastToTarget(test_utils.BaseTestCase):
transport._send.assert_called_once_with(expect_target,
{},
msg,
retry=None)
retry=None,
transport_options=None)
TestCastToTarget.generate_scenarios()
@ -245,7 +270,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None,
call_monitor_timeout=self.cm)
call_monitor_timeout=self.cm, transport_options=None)
if self.prepare is not _notset:
client = client.prepare(timeout=self.prepare)
@ -277,7 +302,8 @@ class TestCallRetry(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
retry=self.expect, call_monitor_timeout=None)
retry=self.expect, call_monitor_timeout=None,
transport_options=None)
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
@ -334,8 +360,8 @@ class TestSerializer(test_utils.BaseTestCase):
serializer=serializer)
transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs = dict(wait_for_reply=True,
timeout=None) if self.call else {}
kwargs['retry'] = None
if self.call:
kwargs['call_monitor_timeout'] = None
@ -367,6 +393,7 @@ class TestSerializer(test_utils.BaseTestCase):
transport._send.assert_called_once_with(oslo_messaging.Target(),
dict(user='alice'),
msg,
transport_options=None,
**kwargs)
expected_calls = [mock.call(self.ctxt, arg) for arg in self.args]
self.assertEqual(expected_calls,
@ -466,7 +493,9 @@ class TestVersionCap(test_utils.BaseTestCase):
self.assertFalse(self.success)
else:
self.assertTrue(self.success)
transport._send.assert_called_once_with(target, {}, msg, **kwargs)
transport._send.assert_called_once_with(target, {}, msg,
transport_options=None,
**kwargs)
TestVersionCap.generate_scenarios()

@ -231,7 +231,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply=None,
timeout=None,
call_monitor_timeout=None,
retry=None)
retry=None,
transport_options=None)
def test_send_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
@ -250,7 +251,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply='wait_for_reply',
timeout='timeout',
call_monitor_timeout='cm_timeout',
retry='retry')
retry='retry',
transport_options=None)
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))

@ -116,7 +116,7 @@ class Transport(object):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
@ -124,7 +124,8 @@ class Transport(object):
wait_for_reply=wait_for_reply,
timeout=timeout,
call_monitor_timeout=call_monitor_timeout,
retry=retry)
retry=retry,
transport_options=transport_options)
def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic: