[zmq] Add acks from proxy for PUB/SUB messages

Change-Id: I7e870154c7f45dac44a8bd5c6811616292232f04
This commit is contained in:
Gevorg Davoian 2016-09-26 17:15:24 +03:00
parent 2b47281a7e
commit 22c93b299a
6 changed files with 87 additions and 33 deletions

@ -13,6 +13,9 @@
# under the License.
import logging
import uuid
import six
from oslo_messaging._drivers.zmq_driver.proxy.central \
import zmq_publisher_proxy
@ -23,9 +26,10 @@ from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LI, _LE
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
def check_message_format(func):
def _check_message_format(*args, **kwargs):
@ -39,27 +43,27 @@ def check_message_format(func):
class SingleRouterProxy(object):
PROXY_TYPE = "ROUTER"
def __init__(self, conf, context, matchmaker):
self.conf = conf
self.context = context
super(SingleRouterProxy, self).__init__()
self.matchmaker = matchmaker
host = conf.zmq_proxy_opts.host
LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE)
self.poller = zmq_async.get_poller()
port = conf.zmq_proxy_opts.frontend_port
self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
conf, context, zmq.ROUTER, host,
conf.zmq_proxy_opts.frontend_port) if port != 0 else \
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
host)
self.fe_router_socket = self._create_router_socket(conf, context, port)
self.poller.register(self.fe_router_socket, self._receive_message)
self.publisher = zmq_publisher_proxy.PublisherProxy(
conf, matchmaker)
self.publisher = zmq_publisher_proxy.PublisherProxy(conf, matchmaker)
self.router_sender = zmq_sender.CentralRouterSender()
self.ack_sender = zmq_sender.CentralAckSender()
self._router_updater = self._create_router_updater()
def run(self):
@ -67,15 +71,28 @@ class SingleRouterProxy(object):
if message is None:
return
msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
if self.conf.oslo_messaging_zmq.use_pub_sub and \
msg_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE):
message_type in zmq_names.MULTISEND_TYPES:
self.publisher.send_request(message)
if socket is self.fe_router_socket and \
self.conf.zmq_proxy_opts.ack_pub_sub:
self.ack_sender.send_message(socket, message)
else:
self.router_sender.send_message(
self._get_socket_to_dispatch_on(socket), message)
@staticmethod
def _create_router_socket(conf, context, port):
host = conf.zmq_proxy_opts.host
identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4()))
if port != 0:
return zmq_socket.ZmqFixedPortSocket(conf, context, zmq.ROUTER,
host, port, identity=identity)
else:
return zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
host, identity=identity)
def _create_router_updater(self):
return RouterUpdater(
self.conf, self.matchmaker, self.publisher.host,
@ -105,15 +122,11 @@ class SingleRouterProxy(object):
class DoubleRouterProxy(SingleRouterProxy):
PROXY_TYPE = "ROUTER-ROUTER"
def __init__(self, conf, context, matchmaker):
LOG.info(_LI('Running double router proxy'))
port = conf.zmq_proxy_opts.backend_port
host = conf.zmq_proxy_opts.host
self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
conf, context, zmq.ROUTER, host,
conf.zmq_proxy_opts.backend_port) if port != 0 else \
zmq_socket.ZmqRandomPortSocket(
conf, context, zmq.ROUTER, host)
self.be_router_socket = self._create_router_socket(conf, context, port)
super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
self.poller.register(self.be_router_socket, self._receive_message)

@ -23,9 +23,10 @@ from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
@ -46,6 +47,11 @@ zmq_proxy_opts = [
cfg.IntOpt('publisher_port', default=0,
help='Publisher port number. Zero means random.'),
cfg.BoolOpt('ack_pub_sub', default=False,
help='Use acknowledgements for notifying senders about '
'receiving their fanout messages. '
'The option is ignored if PUB/SUB is disabled.')
]
@ -70,9 +76,11 @@ def parse_command_line_args(conf):
parser.add_argument('-p', '--publisher-port', dest='publisher_port',
type=int,
help='Front-end PUBLISHER port number')
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
action='store_true',
help='Acknowledge PUB/SUB messages')
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
help='Turn on DEBUG logging level instead of INFO')
args = parser.parse_args()
@ -87,7 +95,7 @@ def parse_command_line_args(conf):
logging.basicConfig(**log_kwargs)
if args.host:
conf.zmq_proxy_opts.host = args.host
conf.set_override('host', args.host, group='zmq_proxy_opts')
if args.frontend_port:
conf.set_override('frontend_port', args.frontend_port,
group='zmq_proxy_opts')
@ -97,6 +105,9 @@ def parse_command_line_args(conf):
if args.publisher_port:
conf.set_override('publisher_port', args.publisher_port,
group='zmq_proxy_opts')
if args.ack_pub_sub:
conf.set_override('ack_pub_sub', args.ack_pub_sub,
group='zmq_proxy_opts')
class ZmqProxy(object):

@ -20,9 +20,10 @@ import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class Sender(object):
@ -39,25 +40,48 @@ class CentralRouterSender(Sender):
routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
socket.send(routing_key, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
"to -> %(rkey)s" %
"-> to %(rkey)s",
{"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id,
"rkey": routing_key,
"rid": reply_id})
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
class CentralAckSender(Sender):
def send_message(self, socket, multipart_message):
message_type = zmq_names.ACK_TYPE
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
routing_key = socket.handle.identity
reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
socket.send(reply_id, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send(routing_key, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
socket.send_string(message_id)
LOG.debug("Sending %(msg_type)s for %(msg_id)s to %(rid)s "
"[from %(rkey)s]",
{"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id,
"rid": reply_id,
"rkey": routing_key})
class CentralPublisherSender(Sender):
def send_message(self, socket, multipart_message):
message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE), "Fanout expected!"
assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]

@ -108,6 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
message_id, socket, message_type)
except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
# NOTE(gdavoian): drop the left parts of a broken message
if socket.getsockopt(zmq.RCVMORE):
socket.recv_multipart()
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)

@ -79,6 +79,9 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
message_id, socket, message_type)
except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
# NOTE(gdavoian): drop the left parts of a broken message
if socket.getsockopt(zmq.RCVMORE):
socket.recv_multipart()
def cleanup(self):
LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host)

@ -210,10 +210,10 @@ class ZmqPortBusy(exceptions.MessagingException):
class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host=None,
high_watermark=0):
high_watermark=0, identity=None):
super(ZmqRandomPortSocket, self).__init__(
conf, context, socket_type, immediate=False,
high_watermark=high_watermark)
high_watermark=high_watermark, identity=identity)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
if host is None:
host = conf.oslo_messaging_zmq.rpc_zmq_host
@ -232,10 +232,10 @@ class ZmqRandomPortSocket(ZmqSocket):
class ZmqFixedPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host, port,
high_watermark=0):
high_watermark=0, identity=None):
super(ZmqFixedPortSocket, self).__init__(
conf, context, socket_type, immediate=False,
high_watermark=high_watermark)
high_watermark=high_watermark, identity=identity)
self.connect_address = zmq_address.combine_address(host, port)
self.bind_address = zmq_address.get_tcp_direct_address(
zmq_address.combine_address(