diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f91c34166..aed6563a9 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -229,7 +229,7 @@ class ZmqDriver(base.BaseDriver): :param target: Message destination target :type target: oslo_messaging.Target """ - server = self.server.get() + server = zmq_server.ZmqServer(self, self.conf, self.matchmaker) server.listen(target) return server diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py index 5f20b807d..8351e2ef9 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py @@ -16,7 +16,6 @@ import logging import os from oslo_utils import excutils -import six from stevedore import driver from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy @@ -51,11 +50,8 @@ class ZmqBroker(object): ).driver(self.conf) self.context = zmq.Context() - self.queue = six.moves.queue.Queue() - self.proxies = [zmq_queue_proxy.OutgoingQueueProxy( - conf, self.context, self.queue, self.matchmaker), - zmq_queue_proxy.IncomingQueueProxy( - conf, self.context, self.queue) + self.proxies = [zmq_queue_proxy.UniversalQueueProxy( + conf, self.context, self.matchmaker) ] def _create_ipc_dirs(self): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 11114d008..dd6665a03 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -14,65 +14,69 @@ import logging -import six - from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_dealer_publisher from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LI zmq = zmq_async.import_zmq(zmq_concurrency='native') LOG = logging.getLogger(__name__) -class OutgoingQueueProxy(zmq_base_proxy.BaseProxy): +class UniversalQueueProxy(zmq_base_proxy.BaseProxy): + + def __init__(self, conf, context, matchmaker): + super(UniversalQueueProxy, self).__init__(conf, context) + self.poller = zmq_async.get_poller(zmq_concurrency='native') + + self.router_socket = context.socket(zmq.ROUTER) + self.router_socket.bind(zmq_address.get_broker_address(conf)) + + self.poller.register(self.router_socket, self._receive_in_request) + LOG.info(_LI("Polling at universal proxy")) - def __init__(self, conf, context, queue, matchmaker): - super(OutgoingQueueProxy, self).__init__(conf, context) - self.queue = queue self.matchmaker = matchmaker - self.publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) - LOG.info(_LI("Polling at outgoing proxy ...")) + reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller) + self.publisher = zmq_dealer_publisher.DealerPublisherProxy( + conf, matchmaker, reply_receiver) def run(self): - try: - request = self.queue.get(timeout=self.conf.rpc_poll_timeout) - LOG.info(_LI("Redirecting request %s to TCP publisher ...") - % request) - self.publisher.send_request(request) - except six.moves.queue.Empty: + message, socket = self.poller.poll(self.conf.rpc_poll_timeout) + if message is None: return + if socket == self.router_socket: + self._redirect_in_request(message) + else: + self._redirect_reply(message) -class IncomingQueueProxy(zmq_base_proxy.BaseProxy): + def _redirect_in_request(self, request): + LOG.info(_LI("-> Redirecting request %s to TCP publisher") + % request) + self.publisher.send_request(request) - def __init__(self, conf, context, queue): - super(IncomingQueueProxy, self).__init__(conf, context) - self.poller = zmq_async.get_poller( - zmq_concurrency='native') - - self.queue = queue - - self.socket = context.socket(zmq.ROUTER) - self.socket.bind(zmq_address.get_broker_address(conf)) - self.poller.register(self.socket, self.receive_request) - LOG.info(_LI("Polling at incoming proxy ...")) - - def run(self): - request, socket = self.poller.poll(self.conf.rpc_poll_timeout) - if request is None: + def _redirect_reply(self, reply): + LOG.info(_LI("Reply proxy %s") % reply) + if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE: + LOG.info(_LI("Acknowledge dropped %s") % reply) return - LOG.info(_LI("Received request and queue it: %s") % str(request)) + LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s") + % reply[zmq_names.IDX_REPLY_BODY:]) - self.queue.put(request) + self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:]) - def receive_request(self, socket): + def _receive_in_request(self, socket): reply_id = socket.recv() assert reply_id is not None, "Valid id expected" empty = socket.recv() assert empty == b'', "Empty delimiter expected" - return socket.recv_pyobj() + envelope = socket.recv_pyobj() + if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE: + envelope[zmq_names.FIELD_REPLY_ID] = reply_id + payload = socket.recv_multipart() + payload.insert(0, envelope) + return payload diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index 2c8fc5ec5..602e5a99d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -29,12 +29,10 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): def __init__(self, conf, matchmaker): super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER) - self.ack_receiver = AcknowledgementReceiver() def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + self._check_request_pattern(request) dealer_socket, hosts = self._check_hosts_connections(request.target) @@ -47,25 +45,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): % request.msg_type) return - self.ack_receiver.track_socket(dealer_socket.handle) - if request.msg_type in zmq_names.MULTISEND_TYPES: for _ in range(dealer_socket.connections_count()): self._send_request(dealer_socket, request) else: self._send_request(dealer_socket, request) + def _check_request_pattern(self, request): + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + def _send_request(self, socket, request): socket.send(b'', zmq.SNDMORE) socket.send_pyobj(request) - LOG.info(_LI("Sending message %(message)s to a target %(target)s") - % {"message": request.message, + LOG.info(_LI("Sending message_id %(message)s to a target %(target)s") + % {"message": request.message_id, "target": request.target}) def cleanup(self): - self.ack_receiver.cleanup() super(DealerPublisher, self).cleanup() @@ -81,7 +80,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + envelope = request.create_envelope() + self.socket.send(b'', zmq.SNDMORE) + self.socket.send_pyobj(envelope, zmq.SNDMORE) self.socket.send_pyobj(request) def cleanup(self): @@ -89,6 +91,67 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): self.socket.close() +class DealerPublisherProxy(DealerPublisher): + + def __init__(self, conf, matchmaker, reply_receiver): + super(DealerPublisherProxy, self).__init__(conf, matchmaker) + self.reply_receiver = reply_receiver + + def send_request(self, multipart_message): + + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + + LOG.info(_LI("Envelope: %s") % envelope) + + target = envelope[zmq_names.FIELD_TARGET] + dealer_socket, hosts = self._check_hosts_connections(target) + + if not dealer_socket.connections: + # NOTE(ozamiatin): Here we can provide + # a queue for keeping messages to send them later + # when some listener appears. However such approach + # being more reliable will consume additional memory. + LOG.warning(_LW("Request %s was dropped because no connection") + % envelope[zmq_names.FIELD_MSG_TYPE]) + return + + self.reply_receiver.track_socket(dealer_socket.handle) + + LOG.info(_LI("Sending message %(message)s to a target %(target)s") + % {"message": envelope[zmq_names.FIELD_MSG_ID], + "target": envelope[zmq_names.FIELD_TARGET]}) + + if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES: + for _ in range(dealer_socket.connections_count()): + self._send_request(dealer_socket, multipart_message) + else: + self._send_request(dealer_socket, multipart_message) + + def _send_request(self, socket, multipart_message): + + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj( + multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE], + zmq.SNDMORE) + socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) + + +class ReplyReceiver(object): + + def __init__(self, poller): + self.poller = poller + LOG.info(_LI("Reply waiter created in broker")) + + def _receive_reply(self, socket): + return socket.recv_multipart() + + def track_socket(self, socket): + self.poller.register(socket, self._receive_reply) + + def cleanup(self): + self.poller.close() + + class AcknowledgementReceiver(object): def __init__(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index faee64d25..0a8098af3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -89,6 +89,13 @@ class PublisherBase(object): :param request: Message data and destination container object :type request: zmq_request.Request """ + LOG.info(_LI("Sending %(type)s message_id %(message)s to a target" + "%(target)s key: %(key)s, host:%(host)s") + % {"type": request.msg_type, + "message": request.message_id, + "target": request.target, + "key": zmq_address.target_to_key(request.target), + "host": request.host}) socket.send_pyobj(request) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index d4dbaa9ab..c6063b8ab 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -14,6 +14,9 @@ import contextlib import logging +import uuid + +import six import oslo_messaging from oslo_messaging._drivers import common as rpc_common @@ -40,24 +43,34 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): if request.msg_type != zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - socket = self._connect_to_host(request.target, request.timeout) + socket, connect_address = self._connect_to_host(request.target, + request.timeout) + request.host = connect_address self._send_request(socket, request) return self._receive_reply(socket, request) + def _resolve_host_address(self, target, timeout=0): + host = self.matchmaker.get_single_host(target, timeout) + return zmq_address.get_tcp_direct_address(host) + def _connect_to_host(self, target, timeout=0): try: self.zmq_context = zmq.Context() socket = self.zmq_context.socket(zmq.REQ) - host = self.matchmaker.get_single_host(target, timeout) - connect_address = zmq_address.get_tcp_direct_address(host) + if six.PY3: + socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1())) + else: + socket.identity = str(uuid.uuid1()) + + connect_address = self._resolve_host_address(target, timeout) LOG.info(_LI("Connecting REQ to %s") % connect_address) socket.connect(connect_address) self.outbound_sockets[str(target)] = socket - return socket + return socket, connect_address except zmq.ZMQError as e: errmsg = _LE("Error connecting to socket: %s") % str(e) @@ -77,6 +90,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): if reply is None: raise oslo_messaging.MessagingTimeout( "Timeout %s seconds was reached" % request.timeout) + LOG.info(_LI("Received reply %s") % reply) if reply[zmq_names.FIELD_FAILURE]: raise rpc_common.deserialize_remote_exception( reply[zmq_names.FIELD_FAILURE], @@ -87,3 +101,26 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): def close(self): # For contextlib compatibility self.cleanup() + + +class ReqPublisherLight(ReqPublisher): + + def __init__(self, conf, matchmaker): + super(ReqPublisherLight, self).__init__(conf, matchmaker) + + def _resolve_host_address(self, target, timeout=0): + return zmq_address.get_broker_address(self.conf) + + def _send_request(self, socket, request): + + LOG.info(_LI("Sending %(type)s message_id %(message)s" + " to a target %(target)s, host:%(host)s") + % {"type": request.msg_type, + "message": request.message_id, + "target": request.target, + "host": request.host}) + + envelope = request.create_envelope() + + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send_pyobj(request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 3e7888d5f..9fb38224d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -37,16 +37,18 @@ class ZmqClient(object): if self.conf.zmq_use_broker: self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight( conf, zmq_address.get_broker_address(self.conf)) + self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight else: self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( conf, matchmaker) + self.req_publisher_cls = zmq_req_publisher.ReqPublisher def send_call(self, target, context, message, timeout=None, retry=None): with contextlib.closing(zmq_request.CallRequest( target, context=context, message=message, timeout=timeout, retry=retry, allowed_remote_exmods=self.allowed_remote_exmods)) as request: - with contextlib.closing(zmq_req_publisher.ReqPublisher( + with contextlib.closing(self.req_publisher_cls( self.conf, self.matchmaker)) as req_publisher: return req_publisher.send_request(request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 92d444a33..455b7ba5a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -63,6 +63,12 @@ class Request(object): self.message = message self.retry = retry self.message_id = str(uuid.uuid1()) + self.proxy_reply_id = None + + def create_envelope(self): + return {'msg_type': self.msg_type, + 'message_id': self.message_id, + 'target': self.target} @abc.abstractproperty def msg_type(self): @@ -86,6 +92,11 @@ class RpcRequest(Request): super(RpcRequest, self).__init__(*args, **kwargs) + def create_envelope(self): + envelope = super(RpcRequest, self).create_envelope() + envelope['timeout'] = self.timeout + return envelope + class CallRequest(RpcRequest): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index c8402c6c8..cbf4e1066 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -17,6 +17,7 @@ from oslo_config import cfg from oslo_utils import importutils from oslo_messaging._drivers.zmq_driver.matchmaker import base +from oslo_messaging._drivers.zmq_driver import zmq_address redis = importutils.try_import('redis') LOG = logging.getLogger(__name__) @@ -48,34 +49,30 @@ class RedisMatchMaker(base.MatchMakerBase): password=self.conf.matchmaker_redis.password, ) - def _target_to_key(self, target): - attributes = ['topic', 'exchange', 'server'] - prefix = "ZMQ-target" - key = ":".join((getattr(target, attr) or "*") for attr in attributes) - return "%s-%s" % (prefix, key) - - def _get_keys_by_pattern(self, pattern): - return self._redis.keys(pattern) - def _get_hosts_by_key(self, key): return self._redis.lrange(key, 0, -1) def register(self, target, hostname): - key = self._target_to_key(target) - if hostname not in self._get_hosts_by_key(key): - self._redis.lpush(key, hostname) + + if target.topic and target.server: + key = zmq_address.target_to_key(target) + if hostname not in self._get_hosts_by_key(key): + self._redis.lpush(key, hostname) + + if target.topic: + if hostname not in self._get_hosts_by_key(target.topic): + self._redis.lpush(target.topic, hostname) + + if target.server: + if hostname not in self._get_hosts_by_key(target.server): + self._redis.lpush(target.server, hostname) def unregister(self, target, hostname): - key = self._target_to_key(target) + key = zmq_address.target_to_key(target) self._redis.lrem(key, 0, hostname) def get_hosts(self, target): - pattern = self._target_to_key(target) - if "*" not in pattern: - # pattern have no placeholders, so this is valid key - return self._get_hosts_by_key(pattern) - hosts = [] - for key in self._get_keys_by_pattern(pattern): - hosts.extend(self._get_hosts_by_key(key)) + key = zmq_address.target_to_key(target) + hosts.extend(self._get_hosts_by_key(key)) return hosts diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index c0a46d981..518d32e49 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -38,12 +38,17 @@ class ThreadingPoller(zmq_poller.ZmqPoller): self.recv_methods = {} def register(self, socket, recv_method=None): + if socket in self.recv_methods: + return if recv_method is not None: self.recv_methods[socket] = recv_method self.poller.register(socket, zmq.POLLIN) def poll(self, timeout=None): - timeout *= 1000 # zmq poller waits milliseconds + + if timeout: + timeout *= 1000 # zmq poller waits milliseconds + sockets = None try: diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 8bb2461e7..f002c45af 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -19,6 +19,7 @@ import six from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._i18n import _LE, _LI @@ -44,7 +45,7 @@ class ConsumerBase(object): self.sockets.append(socket) self.poller.register(socket, self.receive_message) LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"), - {"stype": socket_type, + {"stype": zmq_names.socket_type_str(socket_type), "addr": socket.bind_address, "port": socket.port}) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index f6016607e..9c529dac0 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -43,11 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage): """Reply is not needed for non-call messages""" def acknowledge(self): - LOG.info("Sending acknowledge for %s", self.msg_id) - ack_message = {zmq_names.FIELD_ID: self.msg_id} - self.socket.send(self.reply_id, zmq.SNDMORE) - self.socket.send(b'', zmq.SNDMORE) - self.socket.send_pyobj(ack_message) + LOG.info("Not sending acknowledge for %s", self.msg_id) def requeue(self): """Requeue is not supported""" @@ -61,11 +57,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): self.targets = [] self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.port) + LOG.info("[%s] Run ROUTER consumer" % self.host) def listen(self, target): - LOG.info("Listen to target %s on %s:%d" % - (target, self.address, self.port)) + LOG.info("[%s] Listen to target %s" % (self.host, target)) self.targets.append(target) self.matchmaker.register(target=target, @@ -76,21 +72,25 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): for target in self.targets: self.matchmaker.unregister(target, self.host) + def _receive_request(self, socket): + reply_id = socket.recv() + empty = socket.recv() + assert empty == b'', 'Bad format: empty delimiter expected' + request = socket.recv_pyobj() + return request, reply_id + def receive_message(self, socket): try: - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - request = socket.recv_pyobj() - - LOG.info(_LI("Received %(msg_type)s message %(msg)s") - % {"msg_type": request.msg_type, - "msg": str(request.message)}) + request, reply_id = self._receive_request(socket) + LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s") + % {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - self.server, request.context, request.message, socket, - reply_id, self.poller) + self.server, socket, reply_id, request, self.poller) elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: return RouterIncomingMessage( self.server, request.context, request.message, socket, @@ -100,3 +100,20 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s") % str(e)) + + +class RouterConsumerBroker(RouterConsumer): + + def __init__(self, conf, poller, server): + super(RouterConsumerBroker, self).__init__(conf, poller, server) + + def _receive_request(self, socket): + reply_id = socket.recv() + empty = socket.recv() + assert empty == b'', 'Bad format: empty delimiter expected' + envelope = socket.recv_pyobj() + request = socket.recv_pyobj() + + if zmq_names.FIELD_REPLY_ID in envelope: + request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID] + return request, reply_id diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index f43ec2325..5e932493f 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -28,10 +28,12 @@ zmq = zmq_async.import_zmq() class ZmqIncomingRequest(base.IncomingMessage): - def __init__(self, listener, context, message, socket, rep_id, poller): - super(ZmqIncomingRequest, self).__init__(listener, context, message) + def __init__(self, listener, socket, rep_id, request, poller): + super(ZmqIncomingRequest, self).__init__(listener, request.context, + request.message) self.reply_socket = socket self.reply_id = rep_id + self.request = request self.received = None self.poller = poller @@ -39,15 +41,21 @@ class ZmqIncomingRequest(base.IncomingMessage): if failure is not None: failure = rpc_common.serialize_remote_exception(failure, log_failure) - message_reply = {zmq_names.FIELD_REPLY: reply, + message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE, + zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_FAILURE: failure, - zmq_names.FIELD_LOG_FAILURE: log_failure} + zmq_names.FIELD_LOG_FAILURE: log_failure, + zmq_names.FIELD_ID: self.request.proxy_reply_id} - LOG.info("Replying %s REP", (str(message_reply))) + LOG.info("Replying %s REP", (str(self.request.message_id))) self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) + if self.request.proxy_reply_id: + self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE) + self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE) + self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send_pyobj(message_reply) self.poller.resume_polling(self.reply_socket) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index afe03b81b..0680e7fa5 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -31,8 +31,12 @@ class ZmqServer(base.Listener): super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - self.rpc_consumer = zmq_router_consumer.RouterConsumer( - conf, self.poller, self) + if conf.zmq_use_broker: + self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker( + conf, self.poller, self) + else: + self.rpc_consumer = zmq_router_consumer.RouterConsumer( + conf, self.poller, self) self.notify_consumer = self.rpc_consumer self.consumers = [self.rpc_consumer] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index e8c48291b..afc92490f 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -27,3 +27,14 @@ def get_tcp_random_address(conf): def get_broker_address(conf): return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir + + +def target_to_key(target): + if target.topic and target.server: + attributes = ['topic', 'server'] + key = ".".join(getattr(target, attr) for attr in attributes) + return key + if target.topic: + return target.topic + if target.server: + return target.server diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index a317456e7..f7401ab21 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -17,10 +17,23 @@ from oslo_messaging._drivers.zmq_driver import zmq_async zmq = zmq_async.import_zmq() +FIELD_TYPE = 'type' FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' FIELD_LOG_FAILURE = 'log_failure' FIELD_ID = 'id' +FIELD_MSG_ID = 'message_id' +FIELD_MSG_TYPE = 'msg_type' +FIELD_REPLY_ID = 'reply_id' +FIELD_TARGET = 'target' + + +IDX_REPLY_TYPE = 1 +IDX_REPLY_BODY = 2 + +MULTIPART_IDX_ENVELOPE = 0 +MULTIPART_IDX_BODY = 1 + CALL_TYPE = 'call' CAST_TYPE = 'cast' @@ -28,6 +41,9 @@ CAST_FANOUT_TYPE = 'cast-f' NOTIFY_TYPE = 'notify' NOTIFY_FANOUT_TYPE = 'notify-f' +REPLY_TYPE = 'reply' +ACK_TYPE = 'ack' + MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, CAST_FANOUT_TYPE, diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 2a4144c5a..8e51e30f1 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -57,6 +57,9 @@ class ZmqSocket(object): def send_pyobj(self, *args, **kwargs): self.handle.send_pyobj(*args, **kwargs) + def send_multipart(self, *args, **kwargs): + self.handle.send_multipart(*args, **kwargs) + def recv(self, *args, **kwargs): return self.handle.recv(*args, **kwargs) @@ -69,6 +72,9 @@ class ZmqSocket(object): def recv_pyobj(self, *args, **kwargs): return self.handle.recv_pyobj(*args, **kwargs) + def recv_multipart(self, *args, **kwargs): + return self.handle.recv_multipart(*args, **kwargs) + def close(self, *args, **kwargs): self.handle.close(*args, **kwargs)