[zmq] Remove redundant Envelope class
Change-Id: Ia5f28fab6fa3baeb4f4c48ff6019df128cef042b Partial-Bug: #1582207
This commit is contained in:
parent
e57afac051
commit
eb02de8223
oslo_messaging/_drivers/zmq_driver
@ -86,10 +86,8 @@ class CallSender(zmq_publisher_base.QueuedSender):
|
||||
self.reply_waiter = reply_waiter
|
||||
|
||||
def _do_send_request(self, socket, request):
|
||||
envelope = request.create_envelope()
|
||||
# DEALER socket specific envelope empty delimiter
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||
|
@ -30,7 +30,6 @@ class DealerPublisher(zmq_publisher_base.QueuedSender):
|
||||
|
||||
def _send_message_data(socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||
@ -69,7 +68,6 @@ class DealerPublisherAsync(object):
|
||||
@staticmethod
|
||||
def _send_message_data(socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||
|
@ -47,10 +47,8 @@ class ReplyWaiter(object):
|
||||
def receive_method(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
envelope = socket.recv_pyobj()
|
||||
assert envelope is not None, "Invalid envelope!"
|
||||
reply = socket.recv_pyobj()
|
||||
LOG.debug("Received reply %s", envelope)
|
||||
LOG.debug("Received reply %s", reply.message_id)
|
||||
return reply
|
||||
|
||||
def run_loop(self):
|
||||
|
@ -55,8 +55,8 @@ class PubPublisherProxy(object):
|
||||
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE), "Fanout expected!"
|
||||
topic_filter = multipart_message.pop(0)
|
||||
message_id = multipart_message.pop(0)
|
||||
reply_id = multipart_message.pop(0)
|
||||
message_id = multipart_message.pop(0)
|
||||
assert reply_id is not None, "Reply id expected!"
|
||||
|
||||
self.socket.send(topic_filter, zmq.SNDMORE)
|
||||
|
@ -1,89 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
|
||||
class Envelope(object):
|
||||
|
||||
def __init__(self, msg_type=None, message_id=None, target=None,
|
||||
routing_key=None, **kwargs):
|
||||
self._msg_type = msg_type
|
||||
self._message_id = message_id
|
||||
self._target = target
|
||||
self._reply_id = None
|
||||
self._routing_key = routing_key
|
||||
self._kwargs = kwargs
|
||||
|
||||
@property
|
||||
def reply_id(self):
|
||||
return self._reply_id
|
||||
|
||||
@reply_id.setter
|
||||
def reply_id(self, value):
|
||||
self._reply_id = value
|
||||
|
||||
@property
|
||||
def routing_key(self):
|
||||
return self._routing_key
|
||||
|
||||
@routing_key.setter
|
||||
def routing_key(self, value):
|
||||
self._routing_key = value
|
||||
|
||||
@property
|
||||
def msg_type(self):
|
||||
return self._msg_type
|
||||
|
||||
@msg_type.setter
|
||||
def msg_type(self, value):
|
||||
self._msg_type = value
|
||||
|
||||
@property
|
||||
def message_id(self):
|
||||
return self._message_id
|
||||
|
||||
@property
|
||||
def target(self):
|
||||
return self._target
|
||||
|
||||
@property
|
||||
def is_mult_send(self):
|
||||
return self._msg_type in zmq_names.MULTISEND_TYPES
|
||||
|
||||
@property
|
||||
def topic_filter(self):
|
||||
return zmq_address.target_to_subscribe_filter(self._target)
|
||||
|
||||
def has(self, key):
|
||||
return key in self._kwargs
|
||||
|
||||
def set(self, key, value):
|
||||
self._kwargs[key] = value
|
||||
|
||||
def get(self, key):
|
||||
self._kwargs.get(key)
|
||||
|
||||
def to_dict(self):
|
||||
envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type,
|
||||
zmq_names.FIELD_MSG_ID: self._message_id,
|
||||
zmq_names.FIELD_TARGET: self._target,
|
||||
zmq_names.FIELD_ROUTING_KEY: self._routing_key}
|
||||
envelope.update({k: v for k, v in self._kwargs.items()
|
||||
if v is not None})
|
||||
return envelope
|
||||
|
||||
def __str__(self):
|
||||
return str(self.to_dict())
|
@ -18,7 +18,6 @@ import uuid
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_envelope
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE
|
||||
@ -70,14 +69,6 @@ class Request(object):
|
||||
|
||||
self.message_id = str(uuid.uuid1())
|
||||
|
||||
def create_envelope(self, routing_key=None, reply_id=None):
|
||||
envelope = zmq_envelope.Envelope(msg_type=self.msg_type,
|
||||
message_id=self.message_id,
|
||||
target=self.target,
|
||||
routing_key=routing_key)
|
||||
envelope.reply_id = reply_id
|
||||
return envelope
|
||||
|
||||
@abc.abstractproperty
|
||||
def msg_type(self):
|
||||
"""ZMQ message type"""
|
||||
@ -112,12 +103,6 @@ class CallRequest(RpcRequest):
|
||||
|
||||
super(CallRequest, self).__init__(*args, **kwargs)
|
||||
|
||||
def create_envelope(self, routing_key=None, reply_id=None):
|
||||
envelope = super(CallRequest, self).create_envelope(
|
||||
routing_key, reply_id)
|
||||
envelope.set('timeout', self.timeout)
|
||||
return envelope
|
||||
|
||||
|
||||
class CastRequest(RpcRequest):
|
||||
|
||||
|
@ -57,13 +57,12 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
reply_id = socket.recv()
|
||||
empty = socket.recv()
|
||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||
envelope = socket.recv_pyobj()
|
||||
request = socket.recv_pyobj()
|
||||
return request, envelope, reply_id
|
||||
return request, reply_id
|
||||
|
||||
def receive_message(self, socket):
|
||||
try:
|
||||
request, envelope, reply_id = self._receive_request(socket)
|
||||
request, reply_id = self._receive_request(socket)
|
||||
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
|
||||
{"host": self.host,
|
||||
"type": request.msg_type,
|
||||
@ -72,7 +71,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
return zmq_incoming_message.ZmqIncomingRequest(
|
||||
socket, reply_id, request, envelope, self.poller)
|
||||
socket, reply_id, request, self.poller)
|
||||
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
|
||||
return RouterIncomingMessage(
|
||||
request.context, request.message, socket, reply_id,
|
||||
|
@ -29,13 +29,12 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
class ZmqIncomingRequest(base.RpcIncomingMessage):
|
||||
|
||||
def __init__(self, socket, rep_id, request, envelope, poller):
|
||||
def __init__(self, socket, rep_id, request, poller):
|
||||
super(ZmqIncomingRequest, self).__init__(request.context,
|
||||
request.message)
|
||||
self.reply_socket = socket
|
||||
self.reply_id = rep_id
|
||||
self.request = request
|
||||
self.envelope = envelope
|
||||
self.received = None
|
||||
self.poller = poller
|
||||
|
||||
@ -53,7 +52,6 @@ class ZmqIncomingRequest(base.RpcIncomingMessage):
|
||||
self.received = True
|
||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||
self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE)
|
||||
self.reply_socket.send_pyobj(response)
|
||||
|
||||
def requeue(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user