diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index 850c4b3676f..c7799f470c5 100644 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -49,6 +49,14 @@ def _validate_resource_type(resource_type): raise InvalidResourceTypeClass(resource_type=resource_type) +def _resource_to_class(resource_type): + _validate_resource_type(resource_type) + + # we've already validated the resource type, so we are pretty sure the + # class is there => no need to validate it specifically + return resources.get_resource_cls(resource_type) + + def resource_type_versioned_topic(resource_type, version=None): """Return the topic for a resource type. @@ -74,19 +82,14 @@ class ResourcesPullRpcApi(object): if not hasattr(cls, '_instance'): cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls) target = oslo_messaging.Target( - topic=topics.PLUGIN, version='1.0', + topic=topics.PLUGIN, version='1.1', namespace=constants.RPC_NAMESPACE_RESOURCES) cls._instance.client = n_rpc.get_client(target) return cls._instance @log_helpers.log_method_call def pull(self, context, resource_type, resource_id): - _validate_resource_type(resource_type) - - # we've already validated the resource type, so we are pretty sure the - # class is there => no need to validate it specifically - resource_type_cls = resources.get_resource_cls(resource_type) - + resource_type_cls = _resource_to_class(resource_type) cctxt = self.client.prepare() primitive = cctxt.call(context, 'pull', resource_type=resource_type, @@ -95,9 +98,18 @@ class ResourcesPullRpcApi(object): if primitive is None: raise ResourceNotFound(resource_type=resource_type, resource_id=resource_id) - return resource_type_cls.clean_obj_from_primitive(primitive) + @log_helpers.log_method_call + def bulk_pull(self, context, resource_type, filter_kwargs=None): + resource_type_cls = _resource_to_class(resource_type) + cctxt = self.client.prepare() + primitives = cctxt.call(context, 'bulk_pull', + resource_type=resource_type, + version=resource_type_cls.VERSION, filter_kwargs=filter_kwargs) + return [resource_type_cls.clean_obj_from_primitive(primitive) + for primitive in primitives] + class ResourcesPullRpcCallback(object): """Plugin-side RPC (implementation) for agent-to-plugin interaction. @@ -109,9 +121,10 @@ class ResourcesPullRpcCallback(object): # History # 1.0 Initial version + # 1.1 Added bulk_pull target = oslo_messaging.Target( - version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES) + version='1.1', namespace=constants.RPC_NAMESPACE_RESOURCES) @oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound) def pull(self, context, resource_type, version, resource_id): @@ -119,6 +132,16 @@ class ResourcesPullRpcCallback(object): if obj: return obj.obj_to_primitive(target_version=version) + @oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound) + def bulk_pull(self, context, resource_type, version, filter_kwargs=None): + filter_kwargs = filter_kwargs or {} + resource_type_cls = _resource_to_class(resource_type) + # TODO(kevinbenton): add in producer registry so producers can add + # hooks to mangle these things like they can with 'pull'. + return [obj.obj_to_primitive(target_version=version) + for obj in resource_type_cls.get_objects(context, _pager=None, + **filter_kwargs)] + class ResourcesPushToServersRpcApi(object): """Publisher-side RPC (stub) for plugin-to-plugin fanout interaction. diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index 0187a1efbbc..2255f884985 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -98,8 +98,6 @@ class OVOServerRpcInterface(object): """ML2 server-side RPC interface. Generates RPC callback notifications on ML2 object changes. - - TODO(kevinbenton): interface to query server for these objects """ def __init__(self): diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py index 8ccf40ff297..843ced2c948 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -151,6 +151,23 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase): version=TEST_VERSION, resource_id=resource_id) self.assertEqual(expected_obj, result) + def test_bulk_pull(self): + self.obj_registry.register(FakeResource) + expected_objs = [_create_test_resource(self.context), + _create_test_resource(self.context)] + self.cctxt_mock.call.return_value = [ + e.obj_to_primitive() for e in expected_objs] + + filter_kwargs = {'a': 'b', 'c': 'd'} + result = self.rpc.bulk_pull( + self.context, FakeResource.obj_name(), + filter_kwargs=filter_kwargs) + + self.cctxt_mock.call.assert_called_once_with( + self.context, 'bulk_pull', resource_type='FakeResource', + version=TEST_VERSION, filter_kwargs=filter_kwargs) + self.assertEqual(expected_objs, result) + def test_pull_resource_not_found(self): resource_dict = _create_test_dict() resource_id = resource_dict['id'] @@ -198,6 +215,29 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): primitive['versioned_object.data']) self.assertEqual(self.resource_obj.obj_to_primitive(), primitive) + def test_bulk_pull(self): + r1 = self.resource_obj + r2 = _create_test_resource(self.context) + + @classmethod + def get_objs(*args, **kwargs): + if 'id' not in kwargs: + return [r1, r2] + return [r for r in [r1, r2] if r.id == kwargs['id']] + + # the bulk interface currently retrieves directly from the registry + with mock.patch.object(FakeResource, 'get_objects', new=get_objs): + objs = self.callbacks.bulk_pull( + self.context, resource_type=FakeResource.obj_name(), + version=TEST_VERSION) + self.assertItemsEqual([r1.obj_to_primitive(), + r2.obj_to_primitive()], + objs) + objs = self.callbacks.bulk_pull( + self.context, resource_type=FakeResource.obj_name(), + version=TEST_VERSION, filter_kwargs={'id': r1.id}) + self.assertEqual([r1.obj_to_primitive()], objs) + @mock.patch.object(FakeResource, 'obj_to_primitive') def test_pull_backports_to_older_version(self, to_prim_mock): with mock.patch.object(resources_rpc.prod_registry, 'pull',