Merge "[zmq] Implement Response and Envelope classes"

This commit is contained in:
Jenkins 2016-02-27 16:14:32 +00:00 committed by Gerrit Code Review
commit ed84711596
7 changed files with 159 additions and 30 deletions

@ -54,9 +54,7 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
LOG.debug("-> Redirecting request %s to TCP publisher",
multipart_message)
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and \
envelope[zmq_names.FIELD_MSG_TYPE] \
in zmq_names.MULTISEND_TYPES:
if self.conf.use_pub_sub and envelope.is_mult_send:
self.pub_publisher.send_request(multipart_message)
def _receive_in_request(self, socket):
@ -65,9 +63,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
envelope = socket.recv_pyobj()
if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES:
if not envelope.is_mult_send:
LOG.error(_LE("Message type %s is not supported by proxy"),
envelope[zmq_names.FIELD_MSG_TYPE])
envelope.msg_type)
payload = socket.recv_multipart()
payload.insert(0, envelope)
return payload

@ -23,7 +23,6 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -69,12 +68,12 @@ class DealerCallPublisher(object):
self.reply_waiter.untrack_id(request.message_id)
LOG.debug("Received reply %s", reply)
if reply[zmq_names.FIELD_FAILURE]:
if reply.failure:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
reply.failure,
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
return reply.reply_body
def cleanup(self):
self.reply_waiter.cleanup()
@ -135,12 +134,12 @@ class ReplyWaiter(object):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
reply_id = reply[zmq_names.FIELD_MSG_ID]
call_future = self.replies.get(reply_id)
call_future = self.replies.get(reply.message_id)
if call_future:
call_future.set_result(reply)
else:
LOG.warning(_LW("Received timed out reply: %s"), reply_id)
LOG.warning(_LW("Received timed out reply: %s"),
reply.message_id)
def cleanup(self):
self.poller.close()

@ -65,21 +65,18 @@ class PubPublisherProxy(object):
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
msg_type = envelope[zmq_names.FIELD_MSG_TYPE]
target = envelope[zmq_names.FIELD_TARGET]
message_id = envelope[zmq_names.FIELD_MSG_ID]
if msg_type not in zmq_names.MULTISEND_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(msg_type)
if not envelope.is_mult_send:
raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type)
topic_filter = zmq_address.target_to_subscribe_filter(target)
topic_filter = envelope.topic_filter
self.socket.send(topic_filter, zmq.SNDMORE)
self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
LOG.debug("Publishing message [%(topic)s] %(message_id)s to "
"a target %(target)s ",
{"message_id": message_id,
"target": target,
{"message_id": envelope.message_id,
"target": envelope.target,
"topic": topic_filter})
def cleanup(self):

@ -0,0 +1,62 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_names
class Envelope(object):
def __init__(self, msg_type=None, message_id=None, target=None, **kwargs):
self._msg_type = msg_type
self._message_id = message_id
self._target = target
self._kwargs = kwargs
@property
def msg_type(self):
return self._msg_type
@property
def message_id(self):
return self._message_id
@property
def target(self):
return self._target
@property
def is_mult_send(self):
return self._msg_type in zmq_names.MULTISEND_TYPES
@property
def topic_filter(self):
return zmq_address.target_to_subscribe_filter(self._target)
def set(self, key, value):
self._kwargs[key] = value
def get(self, key):
self._kwargs.get(key)
def to_dict(self):
envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_TARGET: self._target}
envelope.update({k: v for k, v in self._kwargs.items()
if v is not None})
return envelope
def __str__(self):
return str(self.to_dict())

@ -18,6 +18,7 @@ import uuid
import six
from oslo_messaging._drivers.zmq_driver.client import zmq_envelope
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
@ -70,9 +71,9 @@ class Request(object):
self.message_id = str(uuid.uuid1())
def create_envelope(self):
return {'msg_type': self.msg_type,
'message_id': self.message_id,
'target': self.target}
return zmq_envelope.Envelope(msg_type=self.msg_type,
message_id=self.message_id,
target=self.target)
@abc.abstractproperty
def msg_type(self):
@ -113,7 +114,7 @@ class CallRequest(RpcRequest):
def create_envelope(self):
envelope = super(CallRequest, self).create_envelope()
envelope['timeout'] = self.timeout
envelope.set('timeout', self.timeout)
return envelope

@ -0,0 +1,70 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver import zmq_names
class Response(object):
def __init__(self, id=None, type=None, message_id=None,
reply_id=None, reply_body=None,
failure=None, log_failure=None):
self._id = id
self._type = type
self._message_id = message_id
self._reply_id = reply_id
self._reply_body = reply_body
self._failure = failure
self._log_failure = log_failure
@property
def id_(self):
return self._id
@property
def type_(self):
return self._type
@property
def message_id(self):
return self._message_id
@property
def reply_id(self):
return self._reply_id
@property
def reply_body(self):
return self._reply_body
@property
def failure(self):
return self._failure
@property
def log_failure(self):
return self._log_failure
def to_dict(self):
return {zmq_names.FIELD_ID: self._id,
zmq_names.FIELD_TYPE: self._type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_REPLY_ID: self._reply_id,
zmq_names.FIELD_REPLY: self._reply_body,
zmq_names.FIELD_FAILURE: self._failure,
zmq_names.FIELD_LOG_FAILURE: self._log_failure}
def __str__(self):
return str(self.to_dict())

@ -17,6 +17,7 @@ import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -41,18 +42,19 @@ class ZmqIncomingRequest(base.RpcIncomingMessage):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE,
zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure,
zmq_names.FIELD_MSG_ID: self.request.message_id}
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
message_id=self.request.message_id,
reply_id=self.reply_id,
reply_body=reply,
failure=failure,
log_failure=log_failure)
LOG.debug("Replying %s", (str(self.request.message_id)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(message_reply)
self.reply_socket.send_pyobj(response)
self.poller.resume_polling(self.reply_socket)
def requeue(self):