diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst index 1b1111eefa2..e3cdd8345b3 100644 --- a/doc/source/devref/rpc_callbacks.rst +++ b/doc/source/devref/rpc_callbacks.rst @@ -211,10 +211,10 @@ The agent code processing port updates may look like:: from neutron.api.rpc.callbacks import resources - def process_resource_updates(resource_type, resource, event_type): + def process_resource_updates(resource_type, resource_list, event_type): # send to the right handler which will update any control plane - # details related to the updated resource... + # details related to the updated resources... def subscribe_resources(): @@ -238,7 +238,7 @@ The relevant function is: The callback function will receive the following arguments: * resource_type: the type of resource which is receiving the update. -* resource: resource of supported object +* resource_list: list of resources which have been pushed by server. * event_type: will be one of CREATED, UPDATED, or DELETED, see neutron.api.rpc.callbacks.events for details. @@ -263,9 +263,22 @@ Sending resource events ----------------------- On the server side, resource updates could come from anywhere, a service plugin, -an extension, anything that updates, creates, or destroys the resource and that +an extension, anything that updates, creates, or destroys the resources and that is of any interest to subscribed agents. +A callback is expected to receive a list of resources. When resources in the list +belong to the same resource type, a single push RPC message is sent; if the list +contains objects of different resource types, resources of each type are grouped +and sent separately, one push RPC message per type. On the receiver side, +resources in a list always belong to the same type. In other words, a server-side +push of a list of heterogenous objects will result into N messages on bus and +N client-side callback invocations, where N is the number of unique resource +types in the given list, e.g. L(A, A, B, C, C, C) would be fragmented into +L1(A, A), L2(B), L3(C, C, C), and each list pushed separately. + +Note: there is no guarantee in terms of order in which separate resource lists +will be delivered to consumers. + The server/publisher side may look like:: from neutron.api.rpc.callbacks.producer import registry @@ -274,17 +287,17 @@ The server/publisher side may look like:: def create_qos_policy(...): policy = fetch_policy(...) update_the_db(...) - registry.push(policy, events.CREATED) + registry.push([policy], events.CREATED) def update_qos_policy(...): policy = fetch_policy(...) update_the_db(...) - registry.push(policy, events.UPDATED) + registry.push([policy], events.UPDATED) def delete_qos_policy(...): policy = fetch_policy(...) update_the_db(...) - registry.push(policy, events.DELETED) + registry.push([policy], events.DELETED) References diff --git a/neutron/agent/l2/extensions/qos.py b/neutron/agent/l2/extensions/qos.py index bcf4b06be2c..ddd80e2fc4e 100644 --- a/neutron/agent/l2/extensions/qos.py +++ b/neutron/agent/l2/extensions/qos.py @@ -220,13 +220,14 @@ class QosAgentExtension(l2_agent_extension.L2AgentExtension): connection.create_consumer(topic, endpoints, fanout=True) @lockutils.synchronized('qos-port') - def _handle_notification(self, qos_policy, event_type): + def _handle_notification(self, qos_policies, event_type): # server does not allow to remove a policy that is attached to any # port, so we ignore DELETED events. Also, if we receive a CREATED # event for a policy, it means that there are no ports so far that are # attached to it. That's why we are interested in UPDATED events only if event_type == events.UPDATED: - self._process_update_policy(qos_policy) + for qos_policy in qos_policies: + self._process_update_policy(qos_policy) @lockutils.synchronized('qos-port') def handle_port(self, context, port): diff --git a/neutron/api/rpc/callbacks/consumer/registry.py b/neutron/api/rpc/callbacks/consumer/registry.py index a59b5db9e9c..9418b51cd4a 100644 --- a/neutron/api/rpc/callbacks/consumer/registry.py +++ b/neutron/api/rpc/callbacks/consumer/registry.py @@ -27,12 +27,12 @@ def unsubscribe(callback, resource_type): _get_manager().unregister(callback, resource_type) -def push(resource_type, resource, event_type): - """Push resource events into all registered callbacks for the type.""" +def push(resource_type, resource_list, event_type): + """Push resource list into all registered callbacks for the event type.""" callbacks = _get_manager().get_callbacks(resource_type) for callback in callbacks: - callback(resource, event_type) + callback(resource_list, event_type) def clear(): diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index e3f6acb4d95..a83a6af5bdc 100644 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import collections + from neutron_lib import exceptions from oslo_log import helpers as log_helpers from oslo_log import log as logging @@ -179,27 +181,63 @@ class ResourcesPushRpcApi(object): def __init__(self): target = oslo_messaging.Target( - version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES) self.client = n_rpc.get_client(target) - def _prepare_object_fanout_context(self, obj, version): + def _prepare_object_fanout_context(self, obj, resource_version, + rpc_version): """Prepare fanout context, one topic per object type.""" - obj_topic = resource_type_versioned_topic(obj.obj_name(), version) - return self.client.prepare(fanout=True, topic=obj_topic) + obj_topic = resource_type_versioned_topic(obj.obj_name(), + resource_version) + return self.client.prepare(fanout=True, topic=obj_topic, + version=rpc_version) + + @staticmethod + def _classify_resources_by_type(resource_list): + resources_by_type = collections.defaultdict(list) + for resource in resource_list: + resource_type = resources.get_resource_type(resource) + resources_by_type[resource_type].append(resource) + return resources_by_type @log_helpers.log_method_call - def push(self, context, resource, event_type): - resource_type = resources.get_resource_type(resource) + def push(self, context, resource_list, event_type): + """Push an event and list of resources to agents, batched per type. + When a list of different resource types is passed to this method, + the push will be sent as separate individual list pushes, one per + resource type. + """ + + resources_by_type = self._classify_resources_by_type(resource_list) + for resource_type, type_resources in resources_by_type.items(): + self._push(context, resource_type, type_resources, event_type) + + def _push(self, context, resource_type, resource_list, event_type): + """Push an event and list of resources of the same type to agents.""" _validate_resource_type(resource_type) - versions = version_manager.get_resource_versions(resource_type) - for version in versions: - cctxt = self._prepare_object_fanout_context(resource, version) - dehydrated_resource = resource.obj_to_primitive( - target_version=version) - cctxt.cast(context, 'push', - resource=dehydrated_resource, - event_type=event_type) + compat_call = len(resource_list) == 1 + + for version in version_manager.get_resource_versions(resource_type): + cctxt = self._prepare_object_fanout_context( + resource_list[0], version, + rpc_version='1.0' if compat_call else '1.1') + + dehydrated_resources = [ + resource.obj_to_primitive(target_version=version) + for resource in resource_list] + + if compat_call: + #TODO(mangelajo): remove in Ocata, backwards compatibility + # for agents expecting a single element as + # a single element instead of a list, this + # is only relevant to the QoSPolicy topic queue + cctxt.cast(context, 'push', + resource=dehydrated_resources[0], + event_type=event_type) + else: + cctxt.cast(context, 'push', + resource_list=dehydrated_resources, + event_type=event_type) class ResourcesPushRpcCallback(object): @@ -211,14 +249,22 @@ class ResourcesPushRpcCallback(object): """ # History # 1.0 Initial version + # 1.1 push method introduces resource_list support - target = oslo_messaging.Target(version='1.0', + target = oslo_messaging.Target(version='1.1', namespace=constants.RPC_NAMESPACE_RESOURCES) - def push(self, context, resource, event_type): - resource_obj = obj_base.NeutronObject.clean_obj_from_primitive( - resource) - LOG.debug("Resources notification (%(event_type)s): %(resource)s", - {'event_type': event_type, 'resource': repr(resource_obj)}) - resource_type = resources.get_resource_type(resource_obj) - cons_registry.push(resource_type, resource_obj, event_type) + def push(self, context, **kwargs): + """Push receiver, will always receive resources of the same type.""" + # TODO(mangelajo): accept single 'resource' parameter for backwards + # compatibility during Newton, remove in Ocata + resource_list = ([kwargs['resource']] if 'resource' in kwargs else + kwargs['resource_list']) + event_type = kwargs['event_type'] + + resource_objs = [ + obj_base.NeutronObject.clean_obj_from_primitive(resource) + for resource in resource_list] + + resource_type = resources.get_resource_type(resource_objs[0]) + cons_registry.push(resource_type, resource_objs, event_type) diff --git a/neutron/services/qos/notification_drivers/message_queue.py b/neutron/services/qos/notification_drivers/message_queue.py index 7e7f1e533eb..5cfcb7a9117 100644 --- a/neutron/services/qos/notification_drivers/message_queue.py +++ b/neutron/services/qos/notification_drivers/message_queue.py @@ -53,7 +53,7 @@ class RpcQosServiceNotificationDriver( pass def update_policy(self, context, policy): - self.notification_api.push(context, policy, events.UPDATED) + self.notification_api.push(context, [policy], events.UPDATED) def delete_policy(self, context, policy): - self.notification_api.push(context, policy, events.DELETED) + self.notification_api.push(context, [policy], events.DELETED) diff --git a/neutron/tests/functional/agent/l2/extensions/test_ovs_agent_qos_extension.py b/neutron/tests/functional/agent/l2/extensions/test_ovs_agent_qos_extension.py index cb2237dfcbe..7492be4d364 100644 --- a/neutron/tests/functional/agent/l2/extensions/test_ovs_agent_qos_extension.py +++ b/neutron/tests/functional/agent/l2/extensions/test_ovs_agent_qos_extension.py @@ -229,7 +229,7 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework): policy_copy.rules[0].max_kbps = 500 policy_copy.rules[0].max_burst_kbps = 5 policy_copy.rules[1].dscp_mark = TEST_DSCP_MARK_2 - consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED) + consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED) self.wait_until_bandwidth_limit_rule_applied(self.ports[0], policy_copy.rules[0]) self._assert_bandwidth_limit_rule_is_set(self.ports[0], @@ -265,6 +265,6 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework): policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1]) policy_copy.rules = list() - consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED) + consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED) self.wait_until_bandwidth_limit_rule_applied(port_dict, None) diff --git a/neutron/tests/unit/agent/l2/extensions/test_qos.py b/neutron/tests/unit/agent/l2/extensions/test_qos.py index 371d71930f2..90327b28f9b 100644 --- a/neutron/tests/unit/agent/l2/extensions/test_qos.py +++ b/neutron/tests/unit/agent/l2/extensions/test_qos.py @@ -238,7 +238,7 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase): self.qos_ext, '_process_update_policy') as update_mock: policy_obj = mock.Mock() - self.qos_ext._handle_notification(policy_obj, events.UPDATED) + self.qos_ext._handle_notification([policy_obj], events.UPDATED) update_mock.assert_called_with(policy_obj) def test__process_update_policy(self): diff --git a/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py index d07b49c2fd5..bf3685a42e2 100644 --- a/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py +++ b/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py @@ -51,6 +51,6 @@ class ConsumerRegistryTestCase(base.BaseTestCase): callback2 = mock.Mock() callbacks = {callback1, callback2} manager_mock().get_callbacks.return_value = callbacks - registry.push(resource_type_, resource_, event_type_) + registry.push(resource_type_, [resource_], event_type_) for callback in callbacks: - callback.assert_called_with(resource_, event_type_) + callback.assert_called_with([resource_], event_type_) diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py index 5bbdca1a2e5..6467436338b 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -28,30 +28,45 @@ from neutron.objects import base as objects_base from neutron.tests import base +TEST_EVENT = 'test_event' +TEST_VERSION = '1.0' + + def _create_test_dict(uuid=None): return {'id': uuid or uuidutils.generate_uuid(), 'field': 'foo'} -def _create_test_resource(context=None): +def _create_test_resource(context=None, resource_cls=None): + resource_cls = resource_cls or FakeResource resource_dict = _create_test_dict() - resource = FakeResource(context, **resource_dict) + resource = resource_cls(context, **resource_dict) resource.obj_reset_changes() return resource -class FakeResource(objects_base.NeutronObject): - # Version 1.0: Initial version - VERSION = '1.0' +class BaseFakeResource(objects_base.NeutronObject): + @classmethod + def get_objects(cls, context, **kwargs): + return list() + + +class FakeResource(BaseFakeResource): + VERSION = TEST_VERSION fields = { 'id': obj_fields.UUIDField(), 'field': obj_fields.StringField() } - @classmethod - def get_objects(cls, context, **kwargs): - return list() + +class FakeResource2(BaseFakeResource): + VERSION = TEST_VERSION + + fields = { + 'id': obj_fields.UUIDField(), + 'field': obj_fields.StringField() + } class ResourcesRpcBaseTestCase(base.BaseTestCase): @@ -63,6 +78,21 @@ class ResourcesRpcBaseTestCase(base.BaseTestCase): fixture.VersionedObjectRegistryFixture()) self.context = context.get_admin_context() + mock.patch.object(resources_rpc.resources, + 'is_valid_resource_type').start() + mock.patch.object(resources_rpc.resources, 'get_resource_cls', + side_effect=self._get_resource_cls).start() + + self.resource_objs = [_create_test_resource(self.context) + for _ in range(2)] + self.resource_objs2 = [_create_test_resource(self.context, + FakeResource2) + for _ in range(2)] + + @staticmethod + def _get_resource_cls(resource_type): + return {FakeResource.obj_name(): FakeResource, + FakeResource2.obj_name(): FakeResource2}.get(resource_type) class _ValidateResourceTypeTestCase(base.BaseTestCase): @@ -99,9 +129,6 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase): def setUp(self): super(ResourcesPullRpcApiTestCase, self).setUp() - mock.patch.object(resources_rpc, '_validate_resource_type').start() - mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls', - return_value=FakeResource).start() self.rpc = resources_rpc.ResourcesPullRpcApi() mock.patch.object(self.rpc, 'client').start() self.cctxt_mock = self.rpc.client.prepare.return_value @@ -120,7 +147,7 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase): self.cctxt_mock.call.assert_called_once_with( self.context, 'pull', resource_type='FakeResource', - version=FakeResource.VERSION, resource_id=resource_id) + version=TEST_VERSION, resource_id=resource_id) self.assertEqual(expected_obj, result) def test_pull_resource_not_found(self): @@ -162,7 +189,7 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): return_value=self.resource_obj) as registry_mock: primitive = self.callbacks.pull( self.context, resource_type=FakeResource.obj_name(), - version=FakeResource.VERSION, + version=TEST_VERSION, resource_id=self.resource_obj.id) registry_mock.assert_called_once_with( 'FakeResource', self.resource_obj.id, context=self.context) @@ -182,58 +209,96 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase): + """Tests the neutron server side of the RPC interface.""" def setUp(self): super(ResourcesPushRpcApiTestCase, self).setUp() mock.patch.object(resources_rpc.n_rpc, 'get_client').start() - mock.patch.object(resources_rpc, '_validate_resource_type').start() self.rpc = resources_rpc.ResourcesPushRpcApi() self.cctxt_mock = self.rpc.client.prepare.return_value - self.resource_obj = _create_test_resource(self.context) + mock.patch.object(version_manager, 'get_resource_versions', + return_value=set([TEST_VERSION])).start() def test__prepare_object_fanout_context(self): expected_topic = topics.RESOURCE_TOPIC_PATTERN % { - 'resource_type': resources.get_resource_type(self.resource_obj), - 'version': self.resource_obj.VERSION} + 'resource_type': resources.get_resource_type( + self.resource_objs[0]), + 'version': TEST_VERSION} - with mock.patch.object(resources_rpc.resources, 'get_resource_cls', - return_value=FakeResource): - observed = self.rpc._prepare_object_fanout_context( - self.resource_obj, self.resource_obj.VERSION) + observed = self.rpc._prepare_object_fanout_context( + self.resource_objs[0], self.resource_objs[0].VERSION, '1.0') self.rpc.client.prepare.assert_called_once_with( - fanout=True, topic=expected_topic) + fanout=True, topic=expected_topic, version='1.0') self.assertEqual(self.cctxt_mock, observed) - def test_pushy(self): - with mock.patch.object(resources_rpc.resources, 'get_resource_cls', - return_value=FakeResource): - with mock.patch.object(version_manager, 'get_resource_versions', - return_value=set([FakeResource.VERSION])): - self.rpc.push( - self.context, self.resource_obj, 'TYPE') + def test_push_single_type(self): + self.rpc.push( + self.context, self.resource_objs, TEST_EVENT) self.cctxt_mock.cast.assert_called_once_with( self.context, 'push', - resource=self.resource_obj.obj_to_primitive(), - event_type='TYPE') + resource_list=[resource.obj_to_primitive() + for resource in self.resource_objs], + event_type=TEST_EVENT) + + def test_push_mixed(self): + self.rpc.push( + self.context, self.resource_objs + self.resource_objs2, + event_type=TEST_EVENT) + + self.cctxt_mock.cast.assert_any_call( + self.context, 'push', + resource_list=[resource.obj_to_primitive() + for resource in self.resource_objs], + event_type=TEST_EVENT) + + self.cctxt_mock.cast.assert_any_call( + self.context, 'push', + resource_list=[resource.obj_to_primitive() + for resource in self.resource_objs2], + event_type=TEST_EVENT) + + def test_push_mitaka_backwardscompat(self): + #TODO(mangelajo) remove in Ocata, since the 'resource' parameter + # is just for backwards compatibility with Mitaka + # agents. + self.rpc.push( + self.context, [self.resource_objs[0]], TEST_EVENT) + + self.cctxt_mock.cast.assert_called_once_with( + self.context, 'push', + resource=self.resource_objs[0].obj_to_primitive(), + event_type=TEST_EVENT) class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase): + """Tests the agent-side of the RPC interface.""" def setUp(self): super(ResourcesPushRpcCallbackTestCase, self).setUp() - mock.patch.object(resources_rpc, '_validate_resource_type').start() - mock.patch.object( - resources_rpc.resources, - 'get_resource_cls', return_value=FakeResource).start() - self.resource_obj = _create_test_resource(self.context) - self.resource_prim = self.resource_obj.obj_to_primitive() self.callbacks = resources_rpc.ResourcesPushRpcCallback() @mock.patch.object(resources_rpc.cons_registry, 'push') def test_push(self, reg_push_mock): self.obj_registry.register(FakeResource) - self.callbacks.push(self.context, self.resource_prim, 'TYPE') - reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(), - self.resource_obj, 'TYPE') + self.callbacks.push(self.context, + resource_list=[resource.obj_to_primitive() + for resource in self.resource_objs], + event_type=TEST_EVENT) + reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(), + self.resource_objs, + TEST_EVENT) + + @mock.patch.object(resources_rpc.cons_registry, 'push') + def test_push_mitaka_backwardscompat(self, reg_push_mock): + #TODO(mangelajo) remove in Ocata, since the 'resource' parameter + # is just for backwards compatibility with Mitaka + # agents. + self.obj_registry.register(FakeResource) + self.callbacks.push(self.context, + resource=self.resource_objs[0].obj_to_primitive(), + event_type=TEST_EVENT) + reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(), + [self.resource_objs[0]], + TEST_EVENT) diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_manager.py b/neutron/tests/unit/services/qos/notification_drivers/test_manager.py index acf1024c934..bb5d836b379 100644 --- a/neutron/tests/unit/services/qos/notification_drivers/test_manager.py +++ b/neutron/tests/unit/services/qos/notification_drivers/test_manager.py @@ -66,7 +66,7 @@ class TestQosDriversManager(TestQosDriversManagerBase): self.driver_manager = driver_mgr.QosServiceNotificationDriverManager() def _validate_registry_params(self, event_type, policy): - self.rpc_api.push.assert_called_with(self.context, policy, + self.rpc_api.push.assert_called_with(self.context, [policy], event_type) def test_create_policy_default_configuration(self): diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py index 32530ebc3f2..0aab19caf8f 100644 --- a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py +++ b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py @@ -53,7 +53,7 @@ class TestQosRpcNotificationDriver(base.BaseQosTestCase): **self.rule_data['bandwidth_limit_rule']) def _validate_push_params(self, event_type, policy): - self.rpc_api.push.assert_called_once_with(self.context, policy, + self.rpc_api.push.assert_called_once_with(self.context, [policy], event_type) def test_create_policy(self):