Merge "Add bulk pull OVO interface"
This commit is contained in:
commit
265cec7b0f
@ -49,6 +49,14 @@ def _validate_resource_type(resource_type):
|
|||||||
raise InvalidResourceTypeClass(resource_type=resource_type)
|
raise InvalidResourceTypeClass(resource_type=resource_type)
|
||||||
|
|
||||||
|
|
||||||
|
def _resource_to_class(resource_type):
|
||||||
|
_validate_resource_type(resource_type)
|
||||||
|
|
||||||
|
# we've already validated the resource type, so we are pretty sure the
|
||||||
|
# class is there => no need to validate it specifically
|
||||||
|
return resources.get_resource_cls(resource_type)
|
||||||
|
|
||||||
|
|
||||||
def resource_type_versioned_topic(resource_type, version=None):
|
def resource_type_versioned_topic(resource_type, version=None):
|
||||||
"""Return the topic for a resource type.
|
"""Return the topic for a resource type.
|
||||||
|
|
||||||
@ -74,19 +82,14 @@ class ResourcesPullRpcApi(object):
|
|||||||
if not hasattr(cls, '_instance'):
|
if not hasattr(cls, '_instance'):
|
||||||
cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
|
cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
|
||||||
target = oslo_messaging.Target(
|
target = oslo_messaging.Target(
|
||||||
topic=topics.PLUGIN, version='1.0',
|
topic=topics.PLUGIN, version='1.1',
|
||||||
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||||
cls._instance.client = n_rpc.get_client(target)
|
cls._instance.client = n_rpc.get_client(target)
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
@log_helpers.log_method_call
|
@log_helpers.log_method_call
|
||||||
def pull(self, context, resource_type, resource_id):
|
def pull(self, context, resource_type, resource_id):
|
||||||
_validate_resource_type(resource_type)
|
resource_type_cls = _resource_to_class(resource_type)
|
||||||
|
|
||||||
# we've already validated the resource type, so we are pretty sure the
|
|
||||||
# class is there => no need to validate it specifically
|
|
||||||
resource_type_cls = resources.get_resource_cls(resource_type)
|
|
||||||
|
|
||||||
cctxt = self.client.prepare()
|
cctxt = self.client.prepare()
|
||||||
primitive = cctxt.call(context, 'pull',
|
primitive = cctxt.call(context, 'pull',
|
||||||
resource_type=resource_type,
|
resource_type=resource_type,
|
||||||
@ -95,9 +98,18 @@ class ResourcesPullRpcApi(object):
|
|||||||
if primitive is None:
|
if primitive is None:
|
||||||
raise ResourceNotFound(resource_type=resource_type,
|
raise ResourceNotFound(resource_type=resource_type,
|
||||||
resource_id=resource_id)
|
resource_id=resource_id)
|
||||||
|
|
||||||
return resource_type_cls.clean_obj_from_primitive(primitive)
|
return resource_type_cls.clean_obj_from_primitive(primitive)
|
||||||
|
|
||||||
|
@log_helpers.log_method_call
|
||||||
|
def bulk_pull(self, context, resource_type, filter_kwargs=None):
|
||||||
|
resource_type_cls = _resource_to_class(resource_type)
|
||||||
|
cctxt = self.client.prepare()
|
||||||
|
primitives = cctxt.call(context, 'bulk_pull',
|
||||||
|
resource_type=resource_type,
|
||||||
|
version=resource_type_cls.VERSION, filter_kwargs=filter_kwargs)
|
||||||
|
return [resource_type_cls.clean_obj_from_primitive(primitive)
|
||||||
|
for primitive in primitives]
|
||||||
|
|
||||||
|
|
||||||
class ResourcesPullRpcCallback(object):
|
class ResourcesPullRpcCallback(object):
|
||||||
"""Plugin-side RPC (implementation) for agent-to-plugin interaction.
|
"""Plugin-side RPC (implementation) for agent-to-plugin interaction.
|
||||||
@ -109,9 +121,10 @@ class ResourcesPullRpcCallback(object):
|
|||||||
|
|
||||||
# History
|
# History
|
||||||
# 1.0 Initial version
|
# 1.0 Initial version
|
||||||
|
# 1.1 Added bulk_pull
|
||||||
|
|
||||||
target = oslo_messaging.Target(
|
target = oslo_messaging.Target(
|
||||||
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
|
version='1.1', namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||||
|
|
||||||
@oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound)
|
@oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound)
|
||||||
def pull(self, context, resource_type, version, resource_id):
|
def pull(self, context, resource_type, version, resource_id):
|
||||||
@ -119,6 +132,16 @@ class ResourcesPullRpcCallback(object):
|
|||||||
if obj:
|
if obj:
|
||||||
return obj.obj_to_primitive(target_version=version)
|
return obj.obj_to_primitive(target_version=version)
|
||||||
|
|
||||||
|
@oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound)
|
||||||
|
def bulk_pull(self, context, resource_type, version, filter_kwargs=None):
|
||||||
|
filter_kwargs = filter_kwargs or {}
|
||||||
|
resource_type_cls = _resource_to_class(resource_type)
|
||||||
|
# TODO(kevinbenton): add in producer registry so producers can add
|
||||||
|
# hooks to mangle these things like they can with 'pull'.
|
||||||
|
return [obj.obj_to_primitive(target_version=version)
|
||||||
|
for obj in resource_type_cls.get_objects(context, _pager=None,
|
||||||
|
**filter_kwargs)]
|
||||||
|
|
||||||
|
|
||||||
class ResourcesPushToServersRpcApi(object):
|
class ResourcesPushToServersRpcApi(object):
|
||||||
"""Publisher-side RPC (stub) for plugin-to-plugin fanout interaction.
|
"""Publisher-side RPC (stub) for plugin-to-plugin fanout interaction.
|
||||||
|
@ -98,8 +98,6 @@ class OVOServerRpcInterface(object):
|
|||||||
"""ML2 server-side RPC interface.
|
"""ML2 server-side RPC interface.
|
||||||
|
|
||||||
Generates RPC callback notifications on ML2 object changes.
|
Generates RPC callback notifications on ML2 object changes.
|
||||||
|
|
||||||
TODO(kevinbenton): interface to query server for these objects
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -151,6 +151,23 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
|
|||||||
version=TEST_VERSION, resource_id=resource_id)
|
version=TEST_VERSION, resource_id=resource_id)
|
||||||
self.assertEqual(expected_obj, result)
|
self.assertEqual(expected_obj, result)
|
||||||
|
|
||||||
|
def test_bulk_pull(self):
|
||||||
|
self.obj_registry.register(FakeResource)
|
||||||
|
expected_objs = [_create_test_resource(self.context),
|
||||||
|
_create_test_resource(self.context)]
|
||||||
|
self.cctxt_mock.call.return_value = [
|
||||||
|
e.obj_to_primitive() for e in expected_objs]
|
||||||
|
|
||||||
|
filter_kwargs = {'a': 'b', 'c': 'd'}
|
||||||
|
result = self.rpc.bulk_pull(
|
||||||
|
self.context, FakeResource.obj_name(),
|
||||||
|
filter_kwargs=filter_kwargs)
|
||||||
|
|
||||||
|
self.cctxt_mock.call.assert_called_once_with(
|
||||||
|
self.context, 'bulk_pull', resource_type='FakeResource',
|
||||||
|
version=TEST_VERSION, filter_kwargs=filter_kwargs)
|
||||||
|
self.assertEqual(expected_objs, result)
|
||||||
|
|
||||||
def test_pull_resource_not_found(self):
|
def test_pull_resource_not_found(self):
|
||||||
resource_dict = _create_test_dict()
|
resource_dict = _create_test_dict()
|
||||||
resource_id = resource_dict['id']
|
resource_id = resource_dict['id']
|
||||||
@ -198,6 +215,29 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
|||||||
primitive['versioned_object.data'])
|
primitive['versioned_object.data'])
|
||||||
self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
|
self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
|
||||||
|
|
||||||
|
def test_bulk_pull(self):
|
||||||
|
r1 = self.resource_obj
|
||||||
|
r2 = _create_test_resource(self.context)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_objs(*args, **kwargs):
|
||||||
|
if 'id' not in kwargs:
|
||||||
|
return [r1, r2]
|
||||||
|
return [r for r in [r1, r2] if r.id == kwargs['id']]
|
||||||
|
|
||||||
|
# the bulk interface currently retrieves directly from the registry
|
||||||
|
with mock.patch.object(FakeResource, 'get_objects', new=get_objs):
|
||||||
|
objs = self.callbacks.bulk_pull(
|
||||||
|
self.context, resource_type=FakeResource.obj_name(),
|
||||||
|
version=TEST_VERSION)
|
||||||
|
self.assertItemsEqual([r1.obj_to_primitive(),
|
||||||
|
r2.obj_to_primitive()],
|
||||||
|
objs)
|
||||||
|
objs = self.callbacks.bulk_pull(
|
||||||
|
self.context, resource_type=FakeResource.obj_name(),
|
||||||
|
version=TEST_VERSION, filter_kwargs={'id': r1.id})
|
||||||
|
self.assertEqual([r1.obj_to_primitive()], objs)
|
||||||
|
|
||||||
@mock.patch.object(FakeResource, 'obj_to_primitive')
|
@mock.patch.object(FakeResource, 'obj_to_primitive')
|
||||||
def test_pull_backports_to_older_version(self, to_prim_mock):
|
def test_pull_backports_to_older_version(self, to_prim_mock):
|
||||||
with mock.patch.object(resources_rpc.prod_registry, 'pull',
|
with mock.patch.object(resources_rpc.prod_registry, 'pull',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user