diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index b5e71c13d..6d92465c5 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -54,9 +54,7 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): LOG.debug("-> Redirecting request %s to TCP publisher", multipart_message) envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - if self.conf.use_pub_sub and \ - envelope[zmq_names.FIELD_MSG_TYPE] \ - in zmq_names.MULTISEND_TYPES: + if self.conf.use_pub_sub and envelope.is_mult_send: self.pub_publisher.send_request(multipart_message) def _receive_in_request(self, socket): @@ -65,9 +63,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): empty = socket.recv() assert empty == b'', "Empty delimiter expected" envelope = socket.recv_pyobj() - if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES: + if not envelope.is_mult_send: LOG.error(_LE("Message type %s is not supported by proxy"), - envelope[zmq_names.FIELD_MSG_TYPE]) + envelope.msg_type) payload = socket.recv_multipart() payload.insert(0, envelope) return payload diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index 2491311dd..383c41485 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -23,7 +23,6 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) @@ -69,12 +68,12 @@ class DealerCallPublisher(object): self.reply_waiter.untrack_id(request.message_id) LOG.debug("Received reply %s", reply) - if reply[zmq_names.FIELD_FAILURE]: + if reply.failure: raise rpc_common.deserialize_remote_exception( - reply[zmq_names.FIELD_FAILURE], + reply.failure, request.allowed_remote_exmods) else: - return reply[zmq_names.FIELD_REPLY] + return reply.reply_body def cleanup(self): self.reply_waiter.cleanup() @@ -135,12 +134,12 @@ class ReplyWaiter(object): reply, socket = self.poller.poll( timeout=self.conf.rpc_poll_timeout) if reply is not None: - reply_id = reply[zmq_names.FIELD_MSG_ID] - call_future = self.replies.get(reply_id) + call_future = self.replies.get(reply.message_id) if call_future: call_future.set_result(reply) else: - LOG.warning(_LW("Received timed out reply: %s"), reply_id) + LOG.warning(_LW("Received timed out reply: %s"), + reply.message_id) def cleanup(self): self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 7d1f6b95a..14c6444bf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -65,21 +65,18 @@ class PubPublisherProxy(object): def send_request(self, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - msg_type = envelope[zmq_names.FIELD_MSG_TYPE] - target = envelope[zmq_names.FIELD_TARGET] - message_id = envelope[zmq_names.FIELD_MSG_ID] - if msg_type not in zmq_names.MULTISEND_TYPES: - raise zmq_publisher_base.UnsupportedSendPattern(msg_type) + if not envelope.is_mult_send: + raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type) - topic_filter = zmq_address.target_to_subscribe_filter(target) + topic_filter = envelope.topic_filter self.socket.send(topic_filter, zmq.SNDMORE) self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) LOG.debug("Publishing message [%(topic)s] %(message_id)s to " "a target %(target)s ", - {"message_id": message_id, - "target": target, + {"message_id": envelope.message_id, + "target": envelope.target, "topic": topic_filter}) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py new file mode 100644 index 000000000..1b3d023a7 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py @@ -0,0 +1,62 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_names + + +class Envelope(object): + + def __init__(self, msg_type=None, message_id=None, target=None, **kwargs): + self._msg_type = msg_type + self._message_id = message_id + self._target = target + self._kwargs = kwargs + + @property + def msg_type(self): + return self._msg_type + + @property + def message_id(self): + return self._message_id + + @property + def target(self): + return self._target + + @property + def is_mult_send(self): + return self._msg_type in zmq_names.MULTISEND_TYPES + + @property + def topic_filter(self): + return zmq_address.target_to_subscribe_filter(self._target) + + def set(self, key, value): + self._kwargs[key] = value + + def get(self, key): + self._kwargs.get(key) + + def to_dict(self): + envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type, + zmq_names.FIELD_MSG_ID: self._message_id, + zmq_names.FIELD_TARGET: self._target} + envelope.update({k: v for k, v in self._kwargs.items() + if v is not None}) + return envelope + + def __str__(self): + return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index f457136e0..d957288bf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -18,6 +18,7 @@ import uuid import six +from oslo_messaging._drivers.zmq_driver.client import zmq_envelope from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE @@ -70,9 +71,9 @@ class Request(object): self.message_id = str(uuid.uuid1()) def create_envelope(self): - return {'msg_type': self.msg_type, - 'message_id': self.message_id, - 'target': self.target} + return zmq_envelope.Envelope(msg_type=self.msg_type, + message_id=self.message_id, + target=self.target) @abc.abstractproperty def msg_type(self): @@ -113,7 +114,7 @@ class CallRequest(RpcRequest): def create_envelope(self): envelope = super(CallRequest, self).create_envelope() - envelope['timeout'] = self.timeout + envelope.set('timeout', self.timeout) return envelope diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py new file mode 100644 index 000000000..9342bafb0 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py @@ -0,0 +1,70 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_messaging._drivers.zmq_driver import zmq_names + + +class Response(object): + + def __init__(self, id=None, type=None, message_id=None, + reply_id=None, reply_body=None, + failure=None, log_failure=None): + + self._id = id + self._type = type + self._message_id = message_id + self._reply_id = reply_id + self._reply_body = reply_body + self._failure = failure + self._log_failure = log_failure + + @property + def id_(self): + return self._id + + @property + def type_(self): + return self._type + + @property + def message_id(self): + return self._message_id + + @property + def reply_id(self): + return self._reply_id + + @property + def reply_body(self): + return self._reply_body + + @property + def failure(self): + return self._failure + + @property + def log_failure(self): + return self._log_failure + + def to_dict(self): + return {zmq_names.FIELD_ID: self._id, + zmq_names.FIELD_TYPE: self._type, + zmq_names.FIELD_MSG_ID: self._message_id, + zmq_names.FIELD_REPLY_ID: self._reply_id, + zmq_names.FIELD_REPLY: self._reply_body, + zmq_names.FIELD_FAILURE: self._failure, + zmq_names.FIELD_LOG_FAILURE: self._log_failure} + + def __str__(self): + return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index b17b2d289..d0d3b0470 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -17,6 +17,7 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -41,18 +42,19 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): if failure is not None: failure = rpc_common.serialize_remote_exception(failure, log_failure) - message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE, - zmq_names.FIELD_REPLY: reply, - zmq_names.FIELD_FAILURE: failure, - zmq_names.FIELD_LOG_FAILURE: log_failure, - zmq_names.FIELD_MSG_ID: self.request.message_id} + response = zmq_response.Response(type=zmq_names.REPLY_TYPE, + message_id=self.request.message_id, + reply_id=self.reply_id, + reply_body=reply, + failure=failure, + log_failure=log_failure) LOG.debug("Replying %s", (str(self.request.message_id))) self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_pyobj(message_reply) + self.reply_socket.send_pyobj(response) self.poller.resume_polling(self.reply_socket) def requeue(self):