Merge "[zmq] Add acks from proxy for PUB/SUB messages"
This commit is contained in:
commit
1aa35c1f62
oslo_messaging/_drivers/zmq_driver
@ -13,6 +13,9 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.central \
|
||||
import zmq_publisher_proxy
|
||||
@ -23,9 +26,10 @@ from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LI, _LE
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def check_message_format(func):
|
||||
def _check_message_format(*args, **kwargs):
|
||||
@ -39,27 +43,27 @@ def check_message_format(func):
|
||||
|
||||
class SingleRouterProxy(object):
|
||||
|
||||
PROXY_TYPE = "ROUTER"
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
super(SingleRouterProxy, self).__init__()
|
||||
self.matchmaker = matchmaker
|
||||
host = conf.zmq_proxy_opts.host
|
||||
|
||||
LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE)
|
||||
|
||||
self.poller = zmq_async.get_poller()
|
||||
|
||||
port = conf.zmq_proxy_opts.frontend_port
|
||||
self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
|
||||
conf, context, zmq.ROUTER, host,
|
||||
conf.zmq_proxy_opts.frontend_port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
|
||||
host)
|
||||
self.fe_router_socket = self._create_router_socket(conf, context, port)
|
||||
|
||||
self.poller.register(self.fe_router_socket, self._receive_message)
|
||||
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
conf, matchmaker)
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(conf, matchmaker)
|
||||
|
||||
self.router_sender = zmq_sender.CentralRouterSender()
|
||||
self.ack_sender = zmq_sender.CentralAckSender()
|
||||
|
||||
self._router_updater = self._create_router_updater()
|
||||
|
||||
def run(self):
|
||||
@ -67,15 +71,28 @@ class SingleRouterProxy(object):
|
||||
if message is None:
|
||||
return
|
||||
|
||||
msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
|
||||
message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
|
||||
if self.conf.oslo_messaging_zmq.use_pub_sub and \
|
||||
msg_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE):
|
||||
message_type in zmq_names.MULTISEND_TYPES:
|
||||
self.publisher.send_request(message)
|
||||
if socket is self.fe_router_socket and \
|
||||
self.conf.zmq_proxy_opts.ack_pub_sub:
|
||||
self.ack_sender.send_message(socket, message)
|
||||
else:
|
||||
self.router_sender.send_message(
|
||||
self._get_socket_to_dispatch_on(socket), message)
|
||||
|
||||
@staticmethod
|
||||
def _create_router_socket(conf, context, port):
|
||||
host = conf.zmq_proxy_opts.host
|
||||
identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4()))
|
||||
if port != 0:
|
||||
return zmq_socket.ZmqFixedPortSocket(conf, context, zmq.ROUTER,
|
||||
host, port, identity=identity)
|
||||
else:
|
||||
return zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
|
||||
host, identity=identity)
|
||||
|
||||
def _create_router_updater(self):
|
||||
return RouterUpdater(
|
||||
self.conf, self.matchmaker, self.publisher.host,
|
||||
@ -105,15 +122,11 @@ class SingleRouterProxy(object):
|
||||
|
||||
class DoubleRouterProxy(SingleRouterProxy):
|
||||
|
||||
PROXY_TYPE = "ROUTER-ROUTER"
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
LOG.info(_LI('Running double router proxy'))
|
||||
port = conf.zmq_proxy_opts.backend_port
|
||||
host = conf.zmq_proxy_opts.host
|
||||
self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
|
||||
conf, context, zmq.ROUTER, host,
|
||||
conf.zmq_proxy_opts.backend_port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(
|
||||
conf, context, zmq.ROUTER, host)
|
||||
self.be_router_socket = self._create_router_socket(conf, context, port)
|
||||
super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
|
||||
self.poller.register(self.be_router_socket, self._receive_message)
|
||||
|
||||
|
@ -23,9 +23,10 @@ from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._i18n import _LI
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
|
||||
|
||||
@ -46,6 +47,11 @@ zmq_proxy_opts = [
|
||||
|
||||
cfg.IntOpt('publisher_port', default=0,
|
||||
help='Publisher port number. Zero means random.'),
|
||||
|
||||
cfg.BoolOpt('ack_pub_sub', default=False,
|
||||
help='Use acknowledgements for notifying senders about '
|
||||
'receiving their fanout messages. '
|
||||
'The option is ignored if PUB/SUB is disabled.')
|
||||
]
|
||||
|
||||
|
||||
@ -70,9 +76,11 @@ def parse_command_line_args(conf):
|
||||
parser.add_argument('-p', '--publisher-port', dest='publisher_port',
|
||||
type=int,
|
||||
help='Front-end PUBLISHER port number')
|
||||
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
|
||||
action='store_true',
|
||||
help='Acknowledge PUB/SUB messages')
|
||||
|
||||
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
||||
default=False,
|
||||
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
|
||||
help='Turn on DEBUG logging level instead of INFO')
|
||||
|
||||
args = parser.parse_args()
|
||||
@ -87,7 +95,7 @@ def parse_command_line_args(conf):
|
||||
logging.basicConfig(**log_kwargs)
|
||||
|
||||
if args.host:
|
||||
conf.zmq_proxy_opts.host = args.host
|
||||
conf.set_override('host', args.host, group='zmq_proxy_opts')
|
||||
if args.frontend_port:
|
||||
conf.set_override('frontend_port', args.frontend_port,
|
||||
group='zmq_proxy_opts')
|
||||
@ -97,6 +105,9 @@ def parse_command_line_args(conf):
|
||||
if args.publisher_port:
|
||||
conf.set_override('publisher_port', args.publisher_port,
|
||||
group='zmq_proxy_opts')
|
||||
if args.ack_pub_sub:
|
||||
conf.set_override('ack_pub_sub', args.ack_pub_sub,
|
||||
group='zmq_proxy_opts')
|
||||
|
||||
|
||||
class ZmqProxy(object):
|
||||
|
@ -20,9 +20,10 @@ import six
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Sender(object):
|
||||
@ -39,25 +40,48 @@ class CentralRouterSender(Sender):
|
||||
routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
|
||||
reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
|
||||
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
|
||||
|
||||
socket.send(routing_key, zmq.SNDMORE)
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send(reply_id, zmq.SNDMORE)
|
||||
socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
|
||||
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
|
||||
|
||||
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
|
||||
"to -> %(rkey)s" %
|
||||
"-> to %(rkey)s",
|
||||
{"msg_type": zmq_names.message_type_str(message_type),
|
||||
"msg_id": message_id,
|
||||
"rkey": routing_key,
|
||||
"rid": reply_id})
|
||||
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
|
||||
|
||||
|
||||
class CentralAckSender(Sender):
|
||||
|
||||
def send_message(self, socket, multipart_message):
|
||||
message_type = zmq_names.ACK_TYPE
|
||||
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
|
||||
routing_key = socket.handle.identity
|
||||
reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
|
||||
|
||||
socket.send(reply_id, zmq.SNDMORE)
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send(routing_key, zmq.SNDMORE)
|
||||
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||
socket.send_string(message_id)
|
||||
|
||||
LOG.debug("Sending %(msg_type)s for %(msg_id)s to %(rid)s "
|
||||
"[from %(rkey)s]",
|
||||
{"msg_type": zmq_names.message_type_str(message_type),
|
||||
"msg_id": message_id,
|
||||
"rid": reply_id,
|
||||
"rkey": routing_key})
|
||||
|
||||
|
||||
class CentralPublisherSender(Sender):
|
||||
|
||||
def send_message(self, socket, multipart_message):
|
||||
message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
|
||||
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE), "Fanout expected!"
|
||||
assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
|
||||
topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
|
||||
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
|
||||
|
||||
|
@ -108,6 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
message_id, socket, message_type)
|
||||
except (zmq.ZMQError, AssertionError, ValueError) as e:
|
||||
LOG.error(_LE("Receiving message failure: %s"), str(e))
|
||||
# NOTE(gdavoian): drop the left parts of a broken message
|
||||
if socket.getsockopt(zmq.RCVMORE):
|
||||
socket.recv_multipart()
|
||||
|
||||
def cleanup(self):
|
||||
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
|
||||
|
@ -79,6 +79,9 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
message_id, socket, message_type)
|
||||
except (zmq.ZMQError, AssertionError, ValueError) as e:
|
||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||
# NOTE(gdavoian): drop the left parts of a broken message
|
||||
if socket.getsockopt(zmq.RCVMORE):
|
||||
socket.recv_multipart()
|
||||
|
||||
def cleanup(self):
|
||||
LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host)
|
||||
|
@ -213,10 +213,10 @@ class ZmqPortBusy(exceptions.MessagingException):
|
||||
class ZmqRandomPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, host=None,
|
||||
high_watermark=0):
|
||||
high_watermark=0, identity=None):
|
||||
super(ZmqRandomPortSocket, self).__init__(
|
||||
conf, context, socket_type, immediate=False,
|
||||
high_watermark=high_watermark)
|
||||
high_watermark=high_watermark, identity=identity)
|
||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||
if host is None:
|
||||
host = conf.oslo_messaging_zmq.rpc_zmq_host
|
||||
@ -235,10 +235,10 @@ class ZmqRandomPortSocket(ZmqSocket):
|
||||
class ZmqFixedPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, host, port,
|
||||
high_watermark=0):
|
||||
high_watermark=0, identity=None):
|
||||
super(ZmqFixedPortSocket, self).__init__(
|
||||
conf, context, socket_type, immediate=False,
|
||||
high_watermark=high_watermark)
|
||||
high_watermark=high_watermark, identity=identity)
|
||||
self.connect_address = zmq_address.combine_address(host, port)
|
||||
self.bind_address = zmq_address.get_tcp_direct_address(
|
||||
zmq_address.combine_address(
|
||||
|
Loading…
x
Reference in New Issue
Block a user