Merge "Implement get_rpc_client function"

This commit is contained in:
Zuul 2022-12-01 18:45:46 +00:00 committed by Gerrit Code Review
commit 2e81fac973
7 changed files with 74 additions and 29 deletions

View File

@ -30,6 +30,7 @@ __all__ = [
'expected_exceptions', 'expected_exceptions',
'get_rpc_transport', 'get_rpc_transport',
'get_rpc_server', 'get_rpc_server',
'get_rpc_client',
'expose' 'expose'
] ]

View File

@ -32,6 +32,7 @@ __all__ = [
'RPCClient', 'RPCClient',
'RPCVersionCapError', 'RPCVersionCapError',
'RemoteError', 'RemoteError',
'get_rpc_client',
] ]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -263,6 +264,9 @@ class RPCClient(_BaseCallContext):
The RPCClient class is responsible for sending method invocations to and The RPCClient class is responsible for sending method invocations to and
receiving return values from remote RPC servers via a messaging transport. receiving return values from remote RPC servers via a messaging transport.
The class should always be instantiated by using the get_rpc_client
function and not constructing the class directly.
Two RPC patterns are supported: RPC calls and RPC casts. Two RPC patterns are supported: RPC calls and RPC casts.
An RPC cast is used when an RPC method does *not* return a value to An RPC cast is used when an RPC method does *not* return a value to
@ -295,7 +299,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport): def __init__(self, transport):
target = messaging.Target(topic='test', version='2.0') target = messaging.Target(topic='test', version='2.0')
self._client = messaging.RPCClient(transport, target) self._client = messaging.get_rpc_client(transport, target)
def test(self, ctxt, arg): def test(self, ctxt, arg):
return self._client.call(ctxt, 'test', arg=arg) return self._client.call(ctxt, 'test', arg=arg)
@ -320,7 +324,7 @@ class RPCClient(_BaseCallContext):
transport = messaging.get_rpc_transport(cfg.CONF) transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0') target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target) client = messaging.get_rpc_client(transport, target)
client.call(ctxt, 'test', arg=arg) client.call(ctxt, 'test', arg=arg)
but this is probably only useful in limited circumstances as a wrapper but this is probably only useful in limited circumstances as a wrapper
@ -334,7 +338,7 @@ class RPCClient(_BaseCallContext):
have the RPC request fail with a MessageDeliveryFailure after the given have the RPC request fail with a MessageDeliveryFailure after the given
number of retries. For example:: number of retries. For example::
client = messaging.RPCClient(transport, target, retry=None) client = messaging.get_rpc_client(transport, target, retry=None)
client.call(ctxt, 'sync') client.call(ctxt, 'sync')
try: try:
client.prepare(retry=0).cast(ctxt, 'ping') client.prepare(retry=0).cast(ctxt, 'ping')
@ -346,9 +350,13 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target, def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None, timeout=None, version_cap=None, serializer=None, retry=None,
call_monitor_timeout=None, transport_options=None): call_monitor_timeout=None, transport_options=None,
_manual_load=True):
"""Construct an RPC client. """Construct an RPC client.
This should not be called directly, use the get_rpc_client function
to instantiate this class.
:param transport: a messaging transport handle :param transport: a messaging transport handle
:type transport: Transport :type transport: Transport
:param target: the default target for invocations :param target: the default target for invocations
@ -371,7 +379,17 @@ class RPCClient(_BaseCallContext):
(less than the overall timeout (less than the overall timeout
parameter). parameter).
:type call_monitor_timeout: int :type call_monitor_timeout: int
:param transport_options: Transport options passed to client.
:type transport_options: TransportOptions
:param _manual_load: Internal use only to check if class was
manually instantiated or not.
:type _manual_load: bool
""" """
if _manual_load:
LOG.warning("Using RPCClient manually to instantiate client. "
"Please use get_rpc_client to obtain an RPC client "
"instance.")
if serializer is None: if serializer is None:
serializer = msg_serializer.NoOpSerializer() serializer = msg_serializer.NoOpSerializer()
@ -530,3 +548,16 @@ class RPCClient(_BaseCallContext):
def can_send_version(self, version=_marker): def can_send_version(self, version=_marker):
"""Check to see if a version is compatible with the version cap.""" """Check to see if a version is compatible with the version cap."""
return self.prepare(version=version).can_send_version() return self.prepare(version=version).can_send_version()
def get_rpc_client(transport, target, **kwargs):
"""Construct an RPC client.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param **kwargs: The kwargs will be passed down to the
RPCClient constructor
"""
return RPCClient(transport, target, _manual_load=False, **kwargs)

View File

@ -114,7 +114,7 @@ class RpcServerFixture(fixtures.Fixture):
target=self.target, target=self.target,
endpoints=endpoints, endpoints=endpoints,
executor=self.executor) executor=self.executor)
self._ctrl = oslo_messaging.RPCClient(transport.transport, self._ctrl = oslo_messaging.get_rpc_client(transport.transport,
self.ctrl_target) self.ctrl_target)
self._start() self._start()
transport.wait() transport.wait()
@ -230,7 +230,7 @@ class ClientStub(object):
transport_options=None, **kwargs): transport_options=None, **kwargs):
self.name = name or "functional-tests" self.name = name or "functional-tests"
self.cast = cast self.cast = cast
self.client = oslo_messaging.RPCClient( self.client = oslo_messaging.get_rpc_client(
transport=transport, transport=transport,
target=target, target=target,
transport_options=transport_options, transport_options=transport_options,

View File

@ -44,7 +44,8 @@ class TestCastCall(test_utils.BaseTestCase):
self.config(rpc_response_timeout=None) self.config(rpc_response_timeout=None)
transport_options = oslo_messaging.TransportOptions() transport_options = oslo_messaging.TransportOptions()
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
transport, oslo_messaging.Target(),
transport_options=transport_options) transport_options=transport_options)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -70,7 +71,7 @@ class TestCastCall(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
transport_options = oslo_messaging.TransportOptions(at_least_once=True) transport_options = oslo_messaging.TransportOptions(at_least_once=True)
client = oslo_messaging.RPCClient( client = oslo_messaging.get_rpc_client(
transport, transport,
oslo_messaging.Target(), oslo_messaging.Target(),
transport_options=transport_options) transport_options=transport_options)
@ -215,7 +216,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
expect_target = oslo_messaging.Target(**self.expect) expect_target = oslo_messaging.Target(**self.expect)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target) client = oslo_messaging.get_rpc_client(transport, target)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -269,8 +270,8 @@ class TestCallTimeout(test_utils.BaseTestCase):
self.config(rpc_response_timeout=self.confval) self.config(rpc_response_timeout=self.confval)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
timeout=self.ctor, transport, oslo_messaging.Target(), timeout=self.ctor,
call_monitor_timeout=self.cm) call_monitor_timeout=self.cm)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -302,7 +303,8 @@ class TestCallRetry(test_utils.BaseTestCase):
def test_call_retry(self): def test_call_retry(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
transport, oslo_messaging.Target(),
retry=self.ctor) retry=self.ctor)
transport._send = mock.Mock() transport._send = mock.Mock()
@ -332,8 +334,8 @@ class TestCallFanout(test_utils.BaseTestCase):
def test_call_fanout(self): def test_call_fanout(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, client = oslo_messaging.get_rpc_client(
oslo_messaging.Target(**self.target)) transport, oslo_messaging.Target(**self.target))
if self.prepare is not _notset: if self.prepare is not _notset:
client = client.prepare(**self.prepare) client = client.prepare(**self.prepare)
@ -363,8 +365,8 @@ class TestSerializer(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer() serializer = msg_serializer.NoOpSerializer()
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), client = oslo_messaging.get_rpc_client(
serializer=serializer) transport, oslo_messaging.Target(), serializer=serializer)
transport._send = mock.Mock() transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True, kwargs = dict(wait_for_reply=True,
@ -465,7 +467,7 @@ class TestVersionCap(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version) target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target, client = oslo_messaging.get_rpc_client(transport, target,
version_cap=self.cap) version_cap=self.cap)
if self.success: if self.success:
@ -574,7 +576,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version) target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target, client = oslo_messaging.get_rpc_client(transport, target,
version_cap=self.cap) version_cap=self.cap)
prep_kwargs = {} prep_kwargs = {}
@ -598,7 +600,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_invalid_version_type(self): def test_invalid_version_type(self):
target = oslo_messaging.Target(topic='sometopic') target = oslo_messaging.Target(topic='sometopic')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target) client = oslo_messaging.get_rpc_client(transport, target)
self.assertRaises(exceptions.MessagingException, self.assertRaises(exceptions.MessagingException,
client.prepare, version='5') client.prepare, version='5')
self.assertRaises(exceptions.MessagingException, self.assertRaises(exceptions.MessagingException,
@ -612,7 +614,7 @@ class TestTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.rpc.client.LOG') @mock.patch('oslo_messaging.rpc.client.LOG')
def test_warning_when_notifier_transport(self, log): def test_warning_when_notifier_transport(self, log):
transport = oslo_messaging.get_notification_transport(self.conf) transport = oslo_messaging.get_notification_transport(self.conf)
oslo_messaging.RPCClient(transport, oslo_messaging.Target()) oslo_messaging.get_rpc_client(transport, oslo_messaging.Target())
log.warning.assert_called_once_with( log.warning.assert_called_once_with(
"Using notification transport for RPC. Please use " "Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport " "get_rpc_transport to obtain an RPC transport "

View File

@ -102,7 +102,7 @@ class ServerSetupMixin(object):
def _setup_client(self, transport, topic='testtopic', exchange=None): def _setup_client(self, transport, topic='testtopic', exchange=None):
target = oslo_messaging.Target(topic=topic, exchange=exchange) target = oslo_messaging.Target(topic=topic, exchange=exchange)
return oslo_messaging.RPCClient(transport, target=target, return oslo_messaging.get_rpc_client(transport, target=target,
serializer=self.serializer) serializer=self.serializer)

View File

@ -0,0 +1,11 @@
---
features:
- |
Added new ``get_rpc_client`` function to instantiate the RPCClient
class
deprecations:
- |
Instantiating the RPCClient class directly is deprecated in favor
of using the new ``get_rpc_client`` function to expose a more
common API similar to existing functions such as ``get_rpc_server``
and ``get_rpc_transport``

View File

@ -410,7 +410,7 @@ class RPCClient(Client):
def __init__(self, client_id, transport, target, timeout, is_cast, def __init__(self, client_id, transport, target, timeout, is_cast,
wait_after_msg, sync_mode=False): wait_after_msg, sync_mode=False):
client = rpc.RPCClient(transport, target) client = rpc.get_rpc_client(transport, target)
method = _rpc_cast if is_cast else _rpc_call method = _rpc_cast if is_cast else _rpc_call
super(RPCClient, self).__init__(client_id, super(RPCClient, self).__init__(client_id,