[zmq] Reduce object serialization on router proxy
Switched from envelope object to binary routing key. Change-Id: I1a5501771d33ecc2211b9d72e88f15871f838da3 Closes-Bug: #1580448
This commit is contained in:
parent
9cdc9e006b
commit
4efd2d98cc
@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||||
import zmq_pub_publisher
|
import zmq_pub_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
@ -67,11 +69,11 @@ class UniversalQueueProxy(object):
|
|||||||
if message is None:
|
if message is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
envelope = message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
msg_type = message[0]
|
||||||
if self.conf.use_pub_sub and envelope.is_mult_send:
|
if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||||
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
|
zmq_names.NOTIFY_TYPE):
|
||||||
self.pub_publisher.send_request(message)
|
self.pub_publisher.send_request(message)
|
||||||
elif not envelope.is_mult_send:
|
elif msg_type in zmq_names.DIRECT_TYPES:
|
||||||
self._redirect_message(self.be_router_socket
|
self._redirect_message(self.be_router_socket
|
||||||
if socket is self.fe_router_socket
|
if socket is self.fe_router_socket
|
||||||
else self.fe_router_socket, message)
|
else self.fe_router_socket, message)
|
||||||
@ -83,9 +85,12 @@ class UniversalQueueProxy(object):
|
|||||||
assert reply_id is not None, "Valid id expected"
|
assert reply_id is not None, "Valid id expected"
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
assert empty == b'', "Empty delimiter expected"
|
assert empty == b'', "Empty delimiter expected"
|
||||||
envelope = socket.recv_pyobj()
|
msg_type = int(socket.recv())
|
||||||
|
routing_key = socket.recv()
|
||||||
payload = socket.recv_multipart()
|
payload = socket.recv_multipart()
|
||||||
payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope)
|
payload.insert(0, reply_id)
|
||||||
|
payload.insert(0, routing_key)
|
||||||
|
payload.insert(0, msg_type)
|
||||||
return payload
|
return payload
|
||||||
except (AssertionError, zmq.ZMQError):
|
except (AssertionError, zmq.ZMQError):
|
||||||
LOG.error("Received message with wrong format")
|
LOG.error("Received message with wrong format")
|
||||||
@ -93,14 +98,16 @@ class UniversalQueueProxy(object):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _redirect_message(socket, multipart_message):
|
def _redirect_message(socket, multipart_message):
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
message_type = multipart_message.pop(0)
|
||||||
LOG.debug("<-> Dispatch message: %s", envelope)
|
routing_key = multipart_message.pop(0)
|
||||||
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
|
reply_id = multipart_message.pop(0)
|
||||||
|
message_id = multipart_message[0]
|
||||||
socket.send(envelope.routing_key, zmq.SNDMORE)
|
socket.send(routing_key, zmq.SNDMORE)
|
||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
socket.send(reply_id, zmq.SNDMORE)
|
||||||
socket.send(response_binary)
|
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||||
|
LOG.debug("Redirecting message %s" % message_id)
|
||||||
|
socket.send_multipart(multipart_message)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.fe_router_socket.close()
|
self.fe_router_socket.close()
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import six
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||||
@ -21,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
|||||||
import zmq_reply_waiter
|
import zmq_reply_waiter
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||||
import zmq_publisher_base
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
|
||||||
@ -43,13 +45,16 @@ class DealerPublisherProxy(object):
|
|||||||
raise zmq_publisher_base.UnsupportedSendPattern(
|
raise zmq_publisher_base.UnsupportedSendPattern(
|
||||||
request.msg_type)
|
request.msg_type)
|
||||||
|
|
||||||
envelope = request.create_envelope(
|
routing_key = self.routing_table.get_routable_host(request.target) \
|
||||||
routing_key=self.routing_table.get_routable_host(request.target)
|
if request.msg_type in zmq_names.DIRECT_TYPES else \
|
||||||
if request.msg_type in zmq_names.DIRECT_TYPES else None)
|
zmq_address.target_to_subscribe_filter(request.target)
|
||||||
|
|
||||||
self.socket.send(b'', zmq.SNDMORE)
|
self.socket.send(b'', zmq.SNDMORE)
|
||||||
self.socket.send_pyobj(envelope, zmq.SNDMORE)
|
self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
|
||||||
self.socket.send_pyobj(request)
|
self.socket.send(six.b(routing_key), zmq.SNDMORE)
|
||||||
|
self.socket.send(six.b(request.message_id), zmq.SNDMORE)
|
||||||
|
self.socket.send_pyobj(request.context, zmq.SNDMORE)
|
||||||
|
self.socket.send_pyobj(request.message)
|
||||||
|
|
||||||
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
||||||
"a target %(target)s",
|
"a target %(target)s",
|
||||||
@ -64,7 +69,7 @@ class DealerPublisherProxy(object):
|
|||||||
class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
|
class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker, sockets_manager):
|
def __init__(self, conf, matchmaker, sockets_manager):
|
||||||
reply_waiter = zmq_reply_waiter.ReplyWaiter(conf)
|
reply_waiter = ReplyWaiterProxy(conf)
|
||||||
sender = CallSenderProxy(conf, matchmaker, sockets_manager,
|
sender = CallSenderProxy(conf, matchmaker, sockets_manager,
|
||||||
reply_waiter)
|
reply_waiter)
|
||||||
super(DealerCallPublisherProxy, self).__init__(
|
super(DealerCallPublisherProxy, self).__init__(
|
||||||
@ -84,19 +89,36 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
|
|||||||
return self.socket
|
return self.socket
|
||||||
|
|
||||||
def _do_send_request(self, socket, request):
|
def _do_send_request(self, socket, request):
|
||||||
envelope = request.create_envelope(
|
routing_key = self.routing_table.get_routable_host(request.target)
|
||||||
routing_key=self.routing_table.get_routable_host(request.target),
|
|
||||||
reply_id=self.socket.handle.identity)
|
|
||||||
# DEALER socket specific envelope empty delimiter
|
# DEALER socket specific envelope empty delimiter
|
||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
|
||||||
socket.send_pyobj(request)
|
socket.send(six.b(routing_key), zmq.SNDMORE)
|
||||||
|
socket.send(six.b(request.message_id), zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request.context, zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request.message)
|
||||||
|
|
||||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||||
{"message": request.message_id,
|
{"message": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
|
|
||||||
|
class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter):
|
||||||
|
|
||||||
|
def receive_method(self, socket):
|
||||||
|
empty = socket.recv()
|
||||||
|
assert empty == b'', "Empty expected!"
|
||||||
|
reply_id = socket.recv()
|
||||||
|
assert reply_id is not None, "Reply ID expected!"
|
||||||
|
message_type = int(socket.recv())
|
||||||
|
assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
|
||||||
|
message_id = socket.recv()
|
||||||
|
reply = socket.recv_pyobj()
|
||||||
|
LOG.debug("Received reply %s", message_id)
|
||||||
|
return reply
|
||||||
|
|
||||||
|
|
||||||
class RoutingTable(object):
|
class RoutingTable(object):
|
||||||
"""This class implements local routing-table cache
|
"""This class implements local routing-table cache
|
||||||
taken from matchmaker. Its purpose is to give the next routable
|
taken from matchmaker. Its purpose is to give the next routable
|
||||||
|
@ -42,17 +42,16 @@ class ReplyWaiter(object):
|
|||||||
self.replies.pop(message_id)
|
self.replies.pop(message_id)
|
||||||
|
|
||||||
def poll_socket(self, socket):
|
def poll_socket(self, socket):
|
||||||
|
self.poller.register(socket, recv_method=self.receive_method)
|
||||||
|
|
||||||
def _receive_method(socket):
|
def receive_method(self, socket):
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
assert empty == b'', "Empty expected!"
|
assert empty == b'', "Empty expected!"
|
||||||
envelope = socket.recv_pyobj()
|
envelope = socket.recv_pyobj()
|
||||||
assert envelope is not None, "Invalid envelope!"
|
assert envelope is not None, "Invalid envelope!"
|
||||||
reply = socket.recv_pyobj()
|
reply = socket.recv_pyobj()
|
||||||
LOG.debug("Received reply %s", envelope)
|
LOG.debug("Received reply %s", envelope)
|
||||||
return reply
|
return reply
|
||||||
|
|
||||||
self.poller.register(socket, recv_method=_receive_method)
|
|
||||||
|
|
||||||
def run_loop(self):
|
def run_loop(self):
|
||||||
reply, socket = self.poller.poll(
|
reply, socket = self.poller.poll(
|
||||||
|
@ -14,8 +14,6 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
|
||||||
import zmq_publisher_base
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
@ -53,21 +51,21 @@ class PubPublisherProxy(object):
|
|||||||
self.socket.port)
|
self.socket.port)
|
||||||
|
|
||||||
def send_request(self, multipart_message):
|
def send_request(self, multipart_message):
|
||||||
|
message_type = multipart_message.pop(0)
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||||
if not envelope.is_mult_send:
|
zmq_names.NOTIFY_TYPE), "Fanout expected!"
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type)
|
topic_filter = multipart_message.pop(0)
|
||||||
|
message_id = multipart_message.pop(0)
|
||||||
topic_filter = envelope.topic_filter
|
reply_id = multipart_message.pop(0)
|
||||||
|
assert reply_id is not None, "Reply id expected!"
|
||||||
|
|
||||||
self.socket.send(topic_filter, zmq.SNDMORE)
|
self.socket.send(topic_filter, zmq.SNDMORE)
|
||||||
self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
self.socket.send(message_id, zmq.SNDMORE)
|
||||||
|
self.socket.send_multipart(multipart_message)
|
||||||
|
|
||||||
LOG.debug("Publishing message [%(topic)s] %(message_id)s to "
|
LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
|
||||||
"a target %(target)s ",
|
{"topic": topic_filter,
|
||||||
{"message_id": envelope.message_id,
|
"message_id": message_id})
|
||||||
"target": envelope.target,
|
|
||||||
"topic": topic_filter})
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
@ -32,15 +34,14 @@ zmq = zmq_async.import_zmq()
|
|||||||
|
|
||||||
class DealerIncomingMessage(base.RpcIncomingMessage):
|
class DealerIncomingMessage(base.RpcIncomingMessage):
|
||||||
|
|
||||||
def __init__(self, context, message, msg_id):
|
def __init__(self, context, message):
|
||||||
super(DealerIncomingMessage, self).__init__(context, message)
|
super(DealerIncomingMessage, self).__init__(context, message)
|
||||||
self.msg_id = msg_id
|
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
"""Reply is not needed for non-call messages"""
|
"""Reply is not needed for non-call messages"""
|
||||||
|
|
||||||
def acknowledge(self):
|
def acknowledge(self):
|
||||||
LOG.debug("Not sending acknowledge for %s", self.msg_id)
|
"""Not sending acknowledge"""
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
"""Requeue is not supported"""
|
"""Requeue is not supported"""
|
||||||
@ -48,31 +49,29 @@ class DealerIncomingMessage(base.RpcIncomingMessage):
|
|||||||
|
|
||||||
class DealerIncomingRequest(base.RpcIncomingMessage):
|
class DealerIncomingRequest(base.RpcIncomingMessage):
|
||||||
|
|
||||||
def __init__(self, socket, request, envelope):
|
def __init__(self, socket, reply_id, message_id, context, message):
|
||||||
super(DealerIncomingRequest, self).__init__(request.context,
|
super(DealerIncomingRequest, self).__init__(context, message)
|
||||||
request.message)
|
|
||||||
self.reply_socket = socket
|
self.reply_socket = socket
|
||||||
self.request = request
|
self.reply_id = reply_id
|
||||||
self.envelope = envelope
|
self.message_id = message_id
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
if failure is not None:
|
if failure is not None:
|
||||||
failure = rpc_common.serialize_remote_exception(failure,
|
failure = rpc_common.serialize_remote_exception(failure,
|
||||||
log_failure)
|
log_failure)
|
||||||
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
||||||
message_id=self.request.message_id,
|
message_id=self.message_id,
|
||||||
reply_id=self.envelope.reply_id,
|
reply_id=self.reply_id,
|
||||||
reply_body=reply,
|
reply_body=reply,
|
||||||
failure=failure,
|
failure=failure,
|
||||||
log_failure=log_failure)
|
log_failure=log_failure)
|
||||||
|
|
||||||
LOG.debug("Replying %s", (str(self.request.message_id)))
|
LOG.debug("Replying %s", self.message_id)
|
||||||
|
|
||||||
self.envelope.routing_key = self.envelope.reply_id
|
|
||||||
self.envelope.msg_type = zmq_names.REPLY_TYPE
|
|
||||||
|
|
||||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||||
self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE)
|
self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE)
|
||||||
|
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||||
|
self.reply_socket.send(self.message_id, zmq.SNDMORE)
|
||||||
self.reply_socket.send_pyobj(response)
|
self.reply_socket.send_pyobj(response)
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
@ -95,29 +94,25 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
zmq.DEALER)
|
zmq.DEALER)
|
||||||
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
||||||
|
|
||||||
def _receive_request(self, socket):
|
|
||||||
empty = socket.recv()
|
|
||||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
|
||||||
envelope = socket.recv_pyobj()
|
|
||||||
request = socket.recv_pyobj()
|
|
||||||
return request, envelope
|
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
request, envelope = self._receive_request(socket)
|
empty = socket.recv()
|
||||||
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
|
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||||
{"host": self.host,
|
reply_id = socket.recv()
|
||||||
"type": request.msg_type,
|
message_type = int(socket.recv())
|
||||||
"id": request.message_id,
|
message_id = socket.recv()
|
||||||
"target": request.target})
|
context = socket.recv_pyobj()
|
||||||
|
message = socket.recv_pyobj()
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
LOG.debug("[%(host)s] Received message %(id)s",
|
||||||
return DealerIncomingRequest(socket, request, envelope)
|
{"host": self.host, "id": message_id})
|
||||||
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
|
if message_type == zmq_names.CALL_TYPE:
|
||||||
return DealerIncomingMessage(request.context, request.message,
|
return DealerIncomingRequest(
|
||||||
request.message_id)
|
socket, reply_id, message_id, context, message)
|
||||||
|
elif message_type in zmq_names.NON_BLOCKING_TYPES:
|
||||||
|
return DealerIncomingMessage(context, message)
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
|
LOG.error(_LE("Unknown message type: %s"),
|
||||||
|
zmq_names.message_type_str(message_type))
|
||||||
|
|
||||||
except (zmq.ZMQError, AssertionError) as e:
|
except (zmq.ZMQError, AssertionError) as e:
|
||||||
LOG.error(_LE("Receiving message failure: %s"), str(e))
|
LOG.error(_LE("Receiving message failure: %s"), str(e))
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -22,7 +21,6 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
|
|||||||
import zmq_consumer_base
|
import zmq_consumer_base
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE
|
||||||
|
|
||||||
@ -33,17 +31,14 @@ zmq = zmq_async.import_zmq()
|
|||||||
|
|
||||||
class SubIncomingMessage(base.RpcIncomingMessage):
|
class SubIncomingMessage(base.RpcIncomingMessage):
|
||||||
|
|
||||||
def __init__(self, request, socket):
|
def __init__(self, context, message):
|
||||||
super(SubIncomingMessage, self).__init__(
|
super(SubIncomingMessage, self).__init__(context, message)
|
||||||
request.context, request.message)
|
|
||||||
self.socket = socket
|
|
||||||
self.msg_id = request.message_id
|
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
"""Reply is not needed for non-call messages."""
|
"""Reply is not needed for non-call messages."""
|
||||||
|
|
||||||
def acknowledge(self):
|
def acknowledge(self):
|
||||||
LOG.debug("Not sending acknowledge for %s", self.msg_id)
|
"""Requeue is not supported"""
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
"""Requeue is not supported"""
|
"""Requeue is not supported"""
|
||||||
@ -57,7 +52,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
self.target = server.target
|
self.target = server.target
|
||||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB)
|
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB)
|
||||||
self.sockets.append(self.socket)
|
self.sockets.append(self.socket)
|
||||||
self.id = uuid.uuid4()
|
|
||||||
self._subscribe_on_target(self.target)
|
self._subscribe_on_target(self.target)
|
||||||
self.on_publishers(self.matchmaker.get_publishers())
|
self.on_publishers(self.matchmaker.get_publishers())
|
||||||
self.poller.register(self.socket, self.receive_message)
|
self.poller.register(self.socket, self.receive_message)
|
||||||
@ -66,11 +60,9 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
for host, sync in publishers:
|
for host, sync in publishers:
|
||||||
self.socket.connect(zmq_address.get_tcp_direct_address(host))
|
self.socket.connect(zmq_address.get_tcp_direct_address(host))
|
||||||
LOG.debug("[%s] SUB consumer connected to publishers %s",
|
LOG.debug("[%s] SUB consumer connected to publishers %s",
|
||||||
self.id, publishers)
|
self.socket.handle.identity, publishers)
|
||||||
|
|
||||||
def _subscribe_on_target(self, target):
|
def _subscribe_on_target(self, target):
|
||||||
# NOTE(ozamiatin): No locks needed here, because this is called
|
|
||||||
# before the async updater loop started
|
|
||||||
topic_filter = zmq_address.target_to_subscribe_filter(target)
|
topic_filter = zmq_address.target_to_subscribe_filter(target)
|
||||||
if target.topic:
|
if target.topic:
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
|
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
|
||||||
@ -78,31 +70,27 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
|
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
|
||||||
if target.topic and target.server:
|
if target.topic and target.server:
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
||||||
|
|
||||||
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
|
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
|
||||||
{"host": self.id, "filter": topic_filter})
|
{"host": self.socket.handle.identity,
|
||||||
|
"filter": topic_filter})
|
||||||
|
|
||||||
def _receive_request(self, socket):
|
@staticmethod
|
||||||
|
def _receive_request(socket):
|
||||||
topic_filter = socket.recv()
|
topic_filter = socket.recv()
|
||||||
LOG.debug("[%(id)s] Received %(topic_filter)s topic",
|
message_id = socket.recv()
|
||||||
{'id': self.id, 'topic_filter': topic_filter})
|
context = socket.recv_pyobj()
|
||||||
request = socket.recv_pyobj()
|
message = socket.recv_pyobj()
|
||||||
return request
|
LOG.debug("Received %(topic_filter)s topic message %(id)s",
|
||||||
|
{'id': message_id, 'topic_filter': topic_filter})
|
||||||
|
return context, message
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
request = self._receive_request(socket)
|
context, message = self._receive_request(socket)
|
||||||
if not request:
|
if not message:
|
||||||
return None
|
return None
|
||||||
LOG.debug("Received %(type)s, %(id)s, %(target)s",
|
|
||||||
{"type": request.msg_type,
|
|
||||||
"id": request.message_id,
|
|
||||||
"target": request.target})
|
|
||||||
|
|
||||||
if request.msg_type not in zmq_names.MULTISEND_TYPES:
|
return SubIncomingMessage(context, message)
|
||||||
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
|
|
||||||
else:
|
|
||||||
return SubIncomingMessage(request, socket)
|
|
||||||
except (zmq.ZMQError, AssertionError) as e:
|
except (zmq.ZMQError, AssertionError) as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||||
|
|
||||||
|
@ -36,13 +36,12 @@ MULTIPART_IDX_ENVELOPE = 0
|
|||||||
MULTIPART_IDX_BODY = 1
|
MULTIPART_IDX_BODY = 1
|
||||||
|
|
||||||
|
|
||||||
CALL_TYPE = 'call'
|
CALL_TYPE = 1
|
||||||
CAST_TYPE = 'cast'
|
CAST_TYPE = 2
|
||||||
CAST_FANOUT_TYPE = 'cast-f'
|
CAST_FANOUT_TYPE = 3
|
||||||
NOTIFY_TYPE = 'notify'
|
NOTIFY_TYPE = 4
|
||||||
|
REPLY_TYPE = 5
|
||||||
REPLY_TYPE = 'reply'
|
ACK_TYPE = 6
|
||||||
ACK_TYPE = 'ack'
|
|
||||||
|
|
||||||
MESSAGE_TYPES = (CALL_TYPE,
|
MESSAGE_TYPES = (CALL_TYPE,
|
||||||
CAST_TYPE,
|
CAST_TYPE,
|
||||||
@ -50,7 +49,7 @@ MESSAGE_TYPES = (CALL_TYPE,
|
|||||||
NOTIFY_TYPE)
|
NOTIFY_TYPE)
|
||||||
|
|
||||||
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
|
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
|
||||||
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE)
|
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE)
|
||||||
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
|
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
|
||||||
NOTIFY_TYPES = (NOTIFY_TYPE,)
|
NOTIFY_TYPES = (NOTIFY_TYPE,)
|
||||||
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
|
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
|
||||||
@ -66,3 +65,13 @@ def socket_type_str(socket_type):
|
|||||||
zmq.PUB: "PUB",
|
zmq.PUB: "PUB",
|
||||||
zmq.SUB: "SUB"}
|
zmq.SUB: "SUB"}
|
||||||
return zmq_socket_str[socket_type]
|
return zmq_socket_str[socket_type]
|
||||||
|
|
||||||
|
|
||||||
|
def message_type_str(message_type):
|
||||||
|
msg_type_str = {CALL_TYPE: "CALL",
|
||||||
|
CAST_TYPE: "CAST",
|
||||||
|
CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE",
|
||||||
|
NOTIFY_TYPE: "NOTIFY_TYPE",
|
||||||
|
REPLY_TYPE: "REPLY_TYPE",
|
||||||
|
ACK_TYPE: "ACK_TYPE"}
|
||||||
|
return msg_type_str[message_type]
|
||||||
|
@ -15,13 +15,12 @@
|
|||||||
import pickle
|
import pickle
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import contextlib
|
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||||
import zmq_pub_publisher
|
import zmq_pub_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver.client import zmq_request
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging.tests.drivers.zmq import zmq_common
|
from oslo_messaging.tests.drivers.zmq import zmq_common
|
||||||
|
|
||||||
|
|
||||||
@ -51,11 +50,16 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
|||||||
# Needed only in test env to give listener a chance to connect
|
# Needed only in test env to give listener a chance to connect
|
||||||
# before request fires
|
# before request fires
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
with contextlib.closing(zmq_request.FanoutRequest(
|
context = {}
|
||||||
target, context={}, message={'method': 'hello-world'},
|
message = {'method': 'hello-world'}
|
||||||
retry=None)) as request:
|
|
||||||
self.publisher.send_request([request.create_envelope(),
|
self.publisher.send_request(
|
||||||
pickle.dumps(request)])
|
[zmq_names.CAST_FANOUT_TYPE,
|
||||||
|
zmq_address.target_to_subscribe_filter(target),
|
||||||
|
b"message",
|
||||||
|
b"0000-0000",
|
||||||
|
pickle.dumps(context),
|
||||||
|
pickle.dumps(message)])
|
||||||
|
|
||||||
def _check_listener(self, listener):
|
def _check_listener(self, listener):
|
||||||
listener._received.wait(timeout=5)
|
listener._received.wait(timeout=5)
|
||||||
|
@ -16,7 +16,7 @@ cat > ${DATADIR}/zmq.conf <<EOF
|
|||||||
transport_url=${TRANSPORT_URL}
|
transport_url=${TRANSPORT_URL}
|
||||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||||
use_router_proxy=True
|
use_router_proxy=true
|
||||||
[matchmaker_redis]
|
[matchmaker_redis]
|
||||||
port=${ZMQ_REDIS_PORT}
|
port=${ZMQ_REDIS_PORT}
|
||||||
EOF
|
EOF
|
||||||
|
Loading…
x
Reference in New Issue
Block a user