diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index a8f2d71f8..7d1cdf13a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -86,10 +86,8 @@ class CallSender(zmq_publisher_base.QueuedSender): self.reply_waiter = reply_waiter def _do_send_request(self, socket, request): - envelope = request.create_envelope() # DEALER socket specific envelope empty delimiter socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 0cd13ff07..593451967 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -30,7 +30,6 @@ class DealerPublisher(zmq_publisher_base.QueuedSender): def _send_message_data(socket, request): socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", @@ -69,7 +68,6 @@ class DealerPublisherAsync(object): @staticmethod def _send_message_data(socket, request): socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py index 027bc7baf..bb15fb2d9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py @@ -47,10 +47,8 @@ class ReplyWaiter(object): def receive_method(self, socket): empty = socket.recv() assert empty == b'', "Empty expected!" - envelope = socket.recv_pyobj() - assert envelope is not None, "Invalid envelope!" reply = socket.recv_pyobj() - LOG.debug("Received reply %s", envelope) + LOG.debug("Received reply %s", reply.message_id) return reply def run_loop(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index dbe995b24..68b9de234 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -55,8 +55,8 @@ class PubPublisherProxy(object): assert message_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE), "Fanout expected!" topic_filter = multipart_message.pop(0) - message_id = multipart_message.pop(0) reply_id = multipart_message.pop(0) + message_id = multipart_message.pop(0) assert reply_id is not None, "Reply id expected!" self.socket.send(topic_filter, zmq.SNDMORE) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py deleted file mode 100644 index d1913b430..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_names - - -class Envelope(object): - - def __init__(self, msg_type=None, message_id=None, target=None, - routing_key=None, **kwargs): - self._msg_type = msg_type - self._message_id = message_id - self._target = target - self._reply_id = None - self._routing_key = routing_key - self._kwargs = kwargs - - @property - def reply_id(self): - return self._reply_id - - @reply_id.setter - def reply_id(self, value): - self._reply_id = value - - @property - def routing_key(self): - return self._routing_key - - @routing_key.setter - def routing_key(self, value): - self._routing_key = value - - @property - def msg_type(self): - return self._msg_type - - @msg_type.setter - def msg_type(self, value): - self._msg_type = value - - @property - def message_id(self): - return self._message_id - - @property - def target(self): - return self._target - - @property - def is_mult_send(self): - return self._msg_type in zmq_names.MULTISEND_TYPES - - @property - def topic_filter(self): - return zmq_address.target_to_subscribe_filter(self._target) - - def has(self, key): - return key in self._kwargs - - def set(self, key, value): - self._kwargs[key] = value - - def get(self, key): - self._kwargs.get(key) - - def to_dict(self): - envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type, - zmq_names.FIELD_MSG_ID: self._message_id, - zmq_names.FIELD_TARGET: self._target, - zmq_names.FIELD_ROUTING_KEY: self._routing_key} - envelope.update({k: v for k, v in self._kwargs.items() - if v is not None}) - return envelope - - def __str__(self): - return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index a9ba36e48..b3f8aae86 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -18,7 +18,6 @@ import uuid import six -from oslo_messaging._drivers.zmq_driver.client import zmq_envelope from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE @@ -70,14 +69,6 @@ class Request(object): self.message_id = str(uuid.uuid1()) - def create_envelope(self, routing_key=None, reply_id=None): - envelope = zmq_envelope.Envelope(msg_type=self.msg_type, - message_id=self.message_id, - target=self.target, - routing_key=routing_key) - envelope.reply_id = reply_id - return envelope - @abc.abstractproperty def msg_type(self): """ZMQ message type""" @@ -112,12 +103,6 @@ class CallRequest(RpcRequest): super(CallRequest, self).__init__(*args, **kwargs) - def create_envelope(self, routing_key=None, reply_id=None): - envelope = super(CallRequest, self).create_envelope( - routing_key, reply_id) - envelope.set('timeout', self.timeout) - return envelope - class CastRequest(RpcRequest): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index da487f5c9..0e40d5cae 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -57,13 +57,12 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' - envelope = socket.recv_pyobj() request = socket.recv_pyobj() - return request, envelope, reply_id + return request, reply_id def receive_message(self, socket): try: - request, envelope, reply_id = self._receive_request(socket) + request, reply_id = self._receive_request(socket) LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", {"host": self.host, "type": request.msg_type, @@ -72,7 +71,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - socket, reply_id, request, envelope, self.poller) + socket, reply_id, request, self.poller) elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: return RouterIncomingMessage( request.context, request.message, socket, reply_id, diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 2dc8ec309..51c83e2c2 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -29,13 +29,12 @@ zmq = zmq_async.import_zmq() class ZmqIncomingRequest(base.RpcIncomingMessage): - def __init__(self, socket, rep_id, request, envelope, poller): + def __init__(self, socket, rep_id, request, poller): super(ZmqIncomingRequest, self).__init__(request.context, request.message) self.reply_socket = socket self.reply_id = rep_id self.request = request - self.envelope = envelope self.received = None self.poller = poller @@ -53,7 +52,6 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE) self.reply_socket.send_pyobj(response) def requeue(self):