Merge "Fix target resolution mismatch in neutron, nova, heat"
This commit is contained in:
commit
07c328255b
oslo_messaging/_drivers
impl_zmq.py
zmq_driver
broker
client
matchmaker
poller
server
zmq_address.pyzmq_names.pyzmq_socket.py@ -229,7 +229,7 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
:param target: Message destination target
|
:param target: Message destination target
|
||||||
:type target: oslo_messaging.Target
|
:type target: oslo_messaging.Target
|
||||||
"""
|
"""
|
||||||
server = self.server.get()
|
server = zmq_server.ZmqServer(self, self.conf, self.matchmaker)
|
||||||
server.listen(target)
|
server.listen(target)
|
||||||
return server
|
return server
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from oslo_utils import excutils
|
from oslo_utils import excutils
|
||||||
import six
|
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
|
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
|
||||||
@ -51,11 +50,8 @@ class ZmqBroker(object):
|
|||||||
).driver(self.conf)
|
).driver(self.conf)
|
||||||
|
|
||||||
self.context = zmq.Context()
|
self.context = zmq.Context()
|
||||||
self.queue = six.moves.queue.Queue()
|
self.proxies = [zmq_queue_proxy.UniversalQueueProxy(
|
||||||
self.proxies = [zmq_queue_proxy.OutgoingQueueProxy(
|
conf, self.context, self.matchmaker)
|
||||||
conf, self.context, self.queue, self.matchmaker),
|
|
||||||
zmq_queue_proxy.IncomingQueueProxy(
|
|
||||||
conf, self.context, self.queue)
|
|
||||||
]
|
]
|
||||||
|
|
||||||
def _create_ipc_dirs(self):
|
def _create_ipc_dirs(self):
|
||||||
|
@ -14,65 +14,69 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
|
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
import zmq_dealer_publisher
|
import zmq_dealer_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._i18n import _LI
|
from oslo_messaging._i18n import _LI
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq(zmq_concurrency='native')
|
zmq = zmq_async.import_zmq(zmq_concurrency='native')
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class OutgoingQueueProxy(zmq_base_proxy.BaseProxy):
|
class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||||
|
|
||||||
|
def __init__(self, conf, context, matchmaker):
|
||||||
|
super(UniversalQueueProxy, self).__init__(conf, context)
|
||||||
|
self.poller = zmq_async.get_poller(zmq_concurrency='native')
|
||||||
|
|
||||||
|
self.router_socket = context.socket(zmq.ROUTER)
|
||||||
|
self.router_socket.bind(zmq_address.get_broker_address(conf))
|
||||||
|
|
||||||
|
self.poller.register(self.router_socket, self._receive_in_request)
|
||||||
|
LOG.info(_LI("Polling at universal proxy"))
|
||||||
|
|
||||||
def __init__(self, conf, context, queue, matchmaker):
|
|
||||||
super(OutgoingQueueProxy, self).__init__(conf, context)
|
|
||||||
self.queue = queue
|
|
||||||
self.matchmaker = matchmaker
|
self.matchmaker = matchmaker
|
||||||
self.publisher = zmq_dealer_publisher.DealerPublisher(
|
reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller)
|
||||||
conf, matchmaker)
|
self.publisher = zmq_dealer_publisher.DealerPublisherProxy(
|
||||||
LOG.info(_LI("Polling at outgoing proxy ..."))
|
conf, matchmaker, reply_receiver)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
|
||||||
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
if message is None:
|
||||||
LOG.info(_LI("Redirecting request %s to TCP publisher ...")
|
|
||||||
% request)
|
|
||||||
self.publisher.send_request(request)
|
|
||||||
except six.moves.queue.Empty:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if socket == self.router_socket:
|
||||||
|
self._redirect_in_request(message)
|
||||||
|
else:
|
||||||
|
self._redirect_reply(message)
|
||||||
|
|
||||||
class IncomingQueueProxy(zmq_base_proxy.BaseProxy):
|
def _redirect_in_request(self, request):
|
||||||
|
LOG.info(_LI("-> Redirecting request %s to TCP publisher")
|
||||||
|
% request)
|
||||||
|
self.publisher.send_request(request)
|
||||||
|
|
||||||
def __init__(self, conf, context, queue):
|
def _redirect_reply(self, reply):
|
||||||
super(IncomingQueueProxy, self).__init__(conf, context)
|
LOG.info(_LI("Reply proxy %s") % reply)
|
||||||
self.poller = zmq_async.get_poller(
|
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
|
||||||
zmq_concurrency='native')
|
LOG.info(_LI("Acknowledge dropped %s") % reply)
|
||||||
|
|
||||||
self.queue = queue
|
|
||||||
|
|
||||||
self.socket = context.socket(zmq.ROUTER)
|
|
||||||
self.socket.bind(zmq_address.get_broker_address(conf))
|
|
||||||
self.poller.register(self.socket, self.receive_request)
|
|
||||||
LOG.info(_LI("Polling at incoming proxy ..."))
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
request, socket = self.poller.poll(self.conf.rpc_poll_timeout)
|
|
||||||
if request is None:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
LOG.info(_LI("Received request and queue it: %s") % str(request))
|
LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s")
|
||||||
|
% reply[zmq_names.IDX_REPLY_BODY:])
|
||||||
|
|
||||||
self.queue.put(request)
|
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
|
||||||
|
|
||||||
def receive_request(self, socket):
|
def _receive_in_request(self, socket):
|
||||||
reply_id = socket.recv()
|
reply_id = socket.recv()
|
||||||
assert reply_id is not None, "Valid id expected"
|
assert reply_id is not None, "Valid id expected"
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
assert empty == b'', "Empty delimiter expected"
|
assert empty == b'', "Empty delimiter expected"
|
||||||
return socket.recv_pyobj()
|
envelope = socket.recv_pyobj()
|
||||||
|
if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE:
|
||||||
|
envelope[zmq_names.FIELD_REPLY_ID] = reply_id
|
||||||
|
payload = socket.recv_multipart()
|
||||||
|
payload.insert(0, envelope)
|
||||||
|
return payload
|
||||||
|
@ -29,12 +29,10 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
|
|
||||||
def __init__(self, conf, matchmaker):
|
def __init__(self, conf, matchmaker):
|
||||||
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
|
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
|
||||||
self.ack_receiver = AcknowledgementReceiver()
|
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
self._check_request_pattern(request)
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
|
||||||
|
|
||||||
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
||||||
|
|
||||||
@ -47,25 +45,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
% request.msg_type)
|
% request.msg_type)
|
||||||
return
|
return
|
||||||
|
|
||||||
self.ack_receiver.track_socket(dealer_socket.handle)
|
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
for _ in range(dealer_socket.connections_count()):
|
for _ in range(dealer_socket.connections_count()):
|
||||||
self._send_request(dealer_socket, request)
|
self._send_request(dealer_socket, request)
|
||||||
else:
|
else:
|
||||||
self._send_request(dealer_socket, request)
|
self._send_request(dealer_socket, request)
|
||||||
|
|
||||||
|
def _check_request_pattern(self, request):
|
||||||
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
def _send_request(self, socket, request):
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send_pyobj(request)
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
LOG.info(_LI("Sending message_id %(message)s to a target %(target)s")
|
||||||
% {"message": request.message,
|
% {"message": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.ack_receiver.cleanup()
|
|
||||||
super(DealerPublisher, self).cleanup()
|
super(DealerPublisher, self).cleanup()
|
||||||
|
|
||||||
|
|
||||||
@ -81,7 +80,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
|
|||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
|
envelope = request.create_envelope()
|
||||||
|
|
||||||
self.socket.send(b'', zmq.SNDMORE)
|
self.socket.send(b'', zmq.SNDMORE)
|
||||||
|
self.socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||||
self.socket.send_pyobj(request)
|
self.socket.send_pyobj(request)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
@ -89,6 +91,67 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
|
|||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
|
||||||
|
|
||||||
|
class DealerPublisherProxy(DealerPublisher):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker, reply_receiver):
|
||||||
|
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
|
||||||
|
self.reply_receiver = reply_receiver
|
||||||
|
|
||||||
|
def send_request(self, multipart_message):
|
||||||
|
|
||||||
|
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||||
|
|
||||||
|
LOG.info(_LI("Envelope: %s") % envelope)
|
||||||
|
|
||||||
|
target = envelope[zmq_names.FIELD_TARGET]
|
||||||
|
dealer_socket, hosts = self._check_hosts_connections(target)
|
||||||
|
|
||||||
|
if not dealer_socket.connections:
|
||||||
|
# NOTE(ozamiatin): Here we can provide
|
||||||
|
# a queue for keeping messages to send them later
|
||||||
|
# when some listener appears. However such approach
|
||||||
|
# being more reliable will consume additional memory.
|
||||||
|
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||||
|
% envelope[zmq_names.FIELD_MSG_TYPE])
|
||||||
|
return
|
||||||
|
|
||||||
|
self.reply_receiver.track_socket(dealer_socket.handle)
|
||||||
|
|
||||||
|
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
||||||
|
% {"message": envelope[zmq_names.FIELD_MSG_ID],
|
||||||
|
"target": envelope[zmq_names.FIELD_TARGET]})
|
||||||
|
|
||||||
|
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
|
||||||
|
for _ in range(dealer_socket.connections_count()):
|
||||||
|
self._send_request(dealer_socket, multipart_message)
|
||||||
|
else:
|
||||||
|
self._send_request(dealer_socket, multipart_message)
|
||||||
|
|
||||||
|
def _send_request(self, socket, multipart_message):
|
||||||
|
|
||||||
|
socket.send(b'', zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(
|
||||||
|
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
|
||||||
|
zmq.SNDMORE)
|
||||||
|
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
||||||
|
|
||||||
|
|
||||||
|
class ReplyReceiver(object):
|
||||||
|
|
||||||
|
def __init__(self, poller):
|
||||||
|
self.poller = poller
|
||||||
|
LOG.info(_LI("Reply waiter created in broker"))
|
||||||
|
|
||||||
|
def _receive_reply(self, socket):
|
||||||
|
return socket.recv_multipart()
|
||||||
|
|
||||||
|
def track_socket(self, socket):
|
||||||
|
self.poller.register(socket, self._receive_reply)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.poller.close()
|
||||||
|
|
||||||
|
|
||||||
class AcknowledgementReceiver(object):
|
class AcknowledgementReceiver(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -89,6 +89,13 @@ class PublisherBase(object):
|
|||||||
:param request: Message data and destination container object
|
:param request: Message data and destination container object
|
||||||
:type request: zmq_request.Request
|
:type request: zmq_request.Request
|
||||||
"""
|
"""
|
||||||
|
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
|
||||||
|
"%(target)s key: %(key)s, host:%(host)s")
|
||||||
|
% {"type": request.msg_type,
|
||||||
|
"message": request.message_id,
|
||||||
|
"target": request.target,
|
||||||
|
"key": zmq_address.target_to_key(request.target),
|
||||||
|
"host": request.host})
|
||||||
socket.send_pyobj(request)
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
@ -14,6 +14,9 @@
|
|||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
@ -40,24 +43,34 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
|||||||
if request.msg_type != zmq_names.CALL_TYPE:
|
if request.msg_type != zmq_names.CALL_TYPE:
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
socket = self._connect_to_host(request.target, request.timeout)
|
socket, connect_address = self._connect_to_host(request.target,
|
||||||
|
request.timeout)
|
||||||
|
request.host = connect_address
|
||||||
self._send_request(socket, request)
|
self._send_request(socket, request)
|
||||||
return self._receive_reply(socket, request)
|
return self._receive_reply(socket, request)
|
||||||
|
|
||||||
|
def _resolve_host_address(self, target, timeout=0):
|
||||||
|
host = self.matchmaker.get_single_host(target, timeout)
|
||||||
|
return zmq_address.get_tcp_direct_address(host)
|
||||||
|
|
||||||
def _connect_to_host(self, target, timeout=0):
|
def _connect_to_host(self, target, timeout=0):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.zmq_context = zmq.Context()
|
self.zmq_context = zmq.Context()
|
||||||
socket = self.zmq_context.socket(zmq.REQ)
|
socket = self.zmq_context.socket(zmq.REQ)
|
||||||
|
|
||||||
host = self.matchmaker.get_single_host(target, timeout)
|
if six.PY3:
|
||||||
connect_address = zmq_address.get_tcp_direct_address(host)
|
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
|
||||||
|
else:
|
||||||
|
socket.identity = str(uuid.uuid1())
|
||||||
|
|
||||||
|
connect_address = self._resolve_host_address(target, timeout)
|
||||||
|
|
||||||
LOG.info(_LI("Connecting REQ to %s") % connect_address)
|
LOG.info(_LI("Connecting REQ to %s") % connect_address)
|
||||||
|
|
||||||
socket.connect(connect_address)
|
socket.connect(connect_address)
|
||||||
self.outbound_sockets[str(target)] = socket
|
self.outbound_sockets[str(target)] = socket
|
||||||
return socket
|
return socket, connect_address
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Error connecting to socket: %s") % str(e)
|
errmsg = _LE("Error connecting to socket: %s") % str(e)
|
||||||
@ -77,6 +90,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
|||||||
if reply is None:
|
if reply is None:
|
||||||
raise oslo_messaging.MessagingTimeout(
|
raise oslo_messaging.MessagingTimeout(
|
||||||
"Timeout %s seconds was reached" % request.timeout)
|
"Timeout %s seconds was reached" % request.timeout)
|
||||||
|
LOG.info(_LI("Received reply %s") % reply)
|
||||||
if reply[zmq_names.FIELD_FAILURE]:
|
if reply[zmq_names.FIELD_FAILURE]:
|
||||||
raise rpc_common.deserialize_remote_exception(
|
raise rpc_common.deserialize_remote_exception(
|
||||||
reply[zmq_names.FIELD_FAILURE],
|
reply[zmq_names.FIELD_FAILURE],
|
||||||
@ -87,3 +101,26 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
|||||||
def close(self):
|
def close(self):
|
||||||
# For contextlib compatibility
|
# For contextlib compatibility
|
||||||
self.cleanup()
|
self.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
class ReqPublisherLight(ReqPublisher):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker):
|
||||||
|
super(ReqPublisherLight, self).__init__(conf, matchmaker)
|
||||||
|
|
||||||
|
def _resolve_host_address(self, target, timeout=0):
|
||||||
|
return zmq_address.get_broker_address(self.conf)
|
||||||
|
|
||||||
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
|
LOG.info(_LI("Sending %(type)s message_id %(message)s"
|
||||||
|
" to a target %(target)s, host:%(host)s")
|
||||||
|
% {"type": request.msg_type,
|
||||||
|
"message": request.message_id,
|
||||||
|
"target": request.target,
|
||||||
|
"host": request.host})
|
||||||
|
|
||||||
|
envelope = request.create_envelope()
|
||||||
|
|
||||||
|
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request)
|
||||||
|
@ -37,16 +37,18 @@ class ZmqClient(object):
|
|||||||
if self.conf.zmq_use_broker:
|
if self.conf.zmq_use_broker:
|
||||||
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||||
conf, zmq_address.get_broker_address(self.conf))
|
conf, zmq_address.get_broker_address(self.conf))
|
||||||
|
self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight
|
||||||
else:
|
else:
|
||||||
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
|
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
|
||||||
conf, matchmaker)
|
conf, matchmaker)
|
||||||
|
self.req_publisher_cls = zmq_req_publisher.ReqPublisher
|
||||||
|
|
||||||
def send_call(self, target, context, message, timeout=None, retry=None):
|
def send_call(self, target, context, message, timeout=None, retry=None):
|
||||||
with contextlib.closing(zmq_request.CallRequest(
|
with contextlib.closing(zmq_request.CallRequest(
|
||||||
target, context=context, message=message,
|
target, context=context, message=message,
|
||||||
timeout=timeout, retry=retry,
|
timeout=timeout, retry=retry,
|
||||||
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
||||||
with contextlib.closing(zmq_req_publisher.ReqPublisher(
|
with contextlib.closing(self.req_publisher_cls(
|
||||||
self.conf, self.matchmaker)) as req_publisher:
|
self.conf, self.matchmaker)) as req_publisher:
|
||||||
return req_publisher.send_request(request)
|
return req_publisher.send_request(request)
|
||||||
|
|
||||||
|
@ -63,6 +63,12 @@ class Request(object):
|
|||||||
self.message = message
|
self.message = message
|
||||||
self.retry = retry
|
self.retry = retry
|
||||||
self.message_id = str(uuid.uuid1())
|
self.message_id = str(uuid.uuid1())
|
||||||
|
self.proxy_reply_id = None
|
||||||
|
|
||||||
|
def create_envelope(self):
|
||||||
|
return {'msg_type': self.msg_type,
|
||||||
|
'message_id': self.message_id,
|
||||||
|
'target': self.target}
|
||||||
|
|
||||||
@abc.abstractproperty
|
@abc.abstractproperty
|
||||||
def msg_type(self):
|
def msg_type(self):
|
||||||
@ -86,6 +92,11 @@ class RpcRequest(Request):
|
|||||||
|
|
||||||
super(RpcRequest, self).__init__(*args, **kwargs)
|
super(RpcRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def create_envelope(self):
|
||||||
|
envelope = super(RpcRequest, self).create_envelope()
|
||||||
|
envelope['timeout'] = self.timeout
|
||||||
|
return envelope
|
||||||
|
|
||||||
|
|
||||||
class CallRequest(RpcRequest):
|
class CallRequest(RpcRequest):
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ from oslo_config import cfg
|
|||||||
from oslo_utils import importutils
|
from oslo_utils import importutils
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.matchmaker import base
|
from oslo_messaging._drivers.zmq_driver.matchmaker import base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
|
|
||||||
redis = importutils.try_import('redis')
|
redis = importutils.try_import('redis')
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -48,34 +49,30 @@ class RedisMatchMaker(base.MatchMakerBase):
|
|||||||
password=self.conf.matchmaker_redis.password,
|
password=self.conf.matchmaker_redis.password,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _target_to_key(self, target):
|
|
||||||
attributes = ['topic', 'exchange', 'server']
|
|
||||||
prefix = "ZMQ-target"
|
|
||||||
key = ":".join((getattr(target, attr) or "*") for attr in attributes)
|
|
||||||
return "%s-%s" % (prefix, key)
|
|
||||||
|
|
||||||
def _get_keys_by_pattern(self, pattern):
|
|
||||||
return self._redis.keys(pattern)
|
|
||||||
|
|
||||||
def _get_hosts_by_key(self, key):
|
def _get_hosts_by_key(self, key):
|
||||||
return self._redis.lrange(key, 0, -1)
|
return self._redis.lrange(key, 0, -1)
|
||||||
|
|
||||||
def register(self, target, hostname):
|
def register(self, target, hostname):
|
||||||
key = self._target_to_key(target)
|
|
||||||
if hostname not in self._get_hosts_by_key(key):
|
if target.topic and target.server:
|
||||||
self._redis.lpush(key, hostname)
|
key = zmq_address.target_to_key(target)
|
||||||
|
if hostname not in self._get_hosts_by_key(key):
|
||||||
|
self._redis.lpush(key, hostname)
|
||||||
|
|
||||||
|
if target.topic:
|
||||||
|
if hostname not in self._get_hosts_by_key(target.topic):
|
||||||
|
self._redis.lpush(target.topic, hostname)
|
||||||
|
|
||||||
|
if target.server:
|
||||||
|
if hostname not in self._get_hosts_by_key(target.server):
|
||||||
|
self._redis.lpush(target.server, hostname)
|
||||||
|
|
||||||
def unregister(self, target, hostname):
|
def unregister(self, target, hostname):
|
||||||
key = self._target_to_key(target)
|
key = zmq_address.target_to_key(target)
|
||||||
self._redis.lrem(key, 0, hostname)
|
self._redis.lrem(key, 0, hostname)
|
||||||
|
|
||||||
def get_hosts(self, target):
|
def get_hosts(self, target):
|
||||||
pattern = self._target_to_key(target)
|
|
||||||
if "*" not in pattern:
|
|
||||||
# pattern have no placeholders, so this is valid key
|
|
||||||
return self._get_hosts_by_key(pattern)
|
|
||||||
|
|
||||||
hosts = []
|
hosts = []
|
||||||
for key in self._get_keys_by_pattern(pattern):
|
key = zmq_address.target_to_key(target)
|
||||||
hosts.extend(self._get_hosts_by_key(key))
|
hosts.extend(self._get_hosts_by_key(key))
|
||||||
return hosts
|
return hosts
|
||||||
|
@ -38,12 +38,17 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
|
|||||||
self.recv_methods = {}
|
self.recv_methods = {}
|
||||||
|
|
||||||
def register(self, socket, recv_method=None):
|
def register(self, socket, recv_method=None):
|
||||||
|
if socket in self.recv_methods:
|
||||||
|
return
|
||||||
if recv_method is not None:
|
if recv_method is not None:
|
||||||
self.recv_methods[socket] = recv_method
|
self.recv_methods[socket] = recv_method
|
||||||
self.poller.register(socket, zmq.POLLIN)
|
self.poller.register(socket, zmq.POLLIN)
|
||||||
|
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
timeout *= 1000 # zmq poller waits milliseconds
|
|
||||||
|
if timeout:
|
||||||
|
timeout *= 1000 # zmq poller waits milliseconds
|
||||||
|
|
||||||
sockets = None
|
sockets = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -19,6 +19,7 @@ import six
|
|||||||
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
from oslo_messaging._i18n import _LE, _LI
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
@ -44,7 +45,7 @@ class ConsumerBase(object):
|
|||||||
self.sockets.append(socket)
|
self.sockets.append(socket)
|
||||||
self.poller.register(socket, self.receive_message)
|
self.poller.register(socket, self.receive_message)
|
||||||
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
|
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
|
||||||
{"stype": socket_type,
|
{"stype": zmq_names.socket_type_str(socket_type),
|
||||||
"addr": socket.bind_address,
|
"addr": socket.bind_address,
|
||||||
"port": socket.port})
|
"port": socket.port})
|
||||||
return socket
|
return socket
|
||||||
|
@ -43,11 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage):
|
|||||||
"""Reply is not needed for non-call messages"""
|
"""Reply is not needed for non-call messages"""
|
||||||
|
|
||||||
def acknowledge(self):
|
def acknowledge(self):
|
||||||
LOG.info("Sending acknowledge for %s", self.msg_id)
|
LOG.info("Not sending acknowledge for %s", self.msg_id)
|
||||||
ack_message = {zmq_names.FIELD_ID: self.msg_id}
|
|
||||||
self.socket.send(self.reply_id, zmq.SNDMORE)
|
|
||||||
self.socket.send(b'', zmq.SNDMORE)
|
|
||||||
self.socket.send_pyobj(ack_message)
|
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
"""Requeue is not supported"""
|
"""Requeue is not supported"""
|
||||||
@ -61,11 +57,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
self.targets = []
|
self.targets = []
|
||||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||||
self.port)
|
self.port)
|
||||||
|
LOG.info("[%s] Run ROUTER consumer" % self.host)
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
|
|
||||||
LOG.info("Listen to target %s on %s:%d" %
|
LOG.info("[%s] Listen to target %s" % (self.host, target))
|
||||||
(target, self.address, self.port))
|
|
||||||
|
|
||||||
self.targets.append(target)
|
self.targets.append(target)
|
||||||
self.matchmaker.register(target=target,
|
self.matchmaker.register(target=target,
|
||||||
@ -76,21 +72,25 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
for target in self.targets:
|
for target in self.targets:
|
||||||
self.matchmaker.unregister(target, self.host)
|
self.matchmaker.unregister(target, self.host)
|
||||||
|
|
||||||
|
def _receive_request(self, socket):
|
||||||
|
reply_id = socket.recv()
|
||||||
|
empty = socket.recv()
|
||||||
|
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||||
|
request = socket.recv_pyobj()
|
||||||
|
return request, reply_id
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
reply_id = socket.recv()
|
request, reply_id = self._receive_request(socket)
|
||||||
empty = socket.recv()
|
LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s")
|
||||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
% {"host": self.host,
|
||||||
request = socket.recv_pyobj()
|
"type": request.msg_type,
|
||||||
|
"id": request.message_id,
|
||||||
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
|
"target": request.target})
|
||||||
% {"msg_type": request.msg_type,
|
|
||||||
"msg": str(request.message)})
|
|
||||||
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
return zmq_incoming_message.ZmqIncomingRequest(
|
return zmq_incoming_message.ZmqIncomingRequest(
|
||||||
self.server, request.context, request.message, socket,
|
self.server, socket, reply_id, request, self.poller)
|
||||||
reply_id, self.poller)
|
|
||||||
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
|
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
|
||||||
return RouterIncomingMessage(
|
return RouterIncomingMessage(
|
||||||
self.server, request.context, request.message, socket,
|
self.server, request.context, request.message, socket,
|
||||||
@ -100,3 +100,20 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
||||||
|
|
||||||
|
|
||||||
|
class RouterConsumerBroker(RouterConsumer):
|
||||||
|
|
||||||
|
def __init__(self, conf, poller, server):
|
||||||
|
super(RouterConsumerBroker, self).__init__(conf, poller, server)
|
||||||
|
|
||||||
|
def _receive_request(self, socket):
|
||||||
|
reply_id = socket.recv()
|
||||||
|
empty = socket.recv()
|
||||||
|
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||||
|
envelope = socket.recv_pyobj()
|
||||||
|
request = socket.recv_pyobj()
|
||||||
|
|
||||||
|
if zmq_names.FIELD_REPLY_ID in envelope:
|
||||||
|
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
|
||||||
|
return request, reply_id
|
||||||
|
@ -28,10 +28,12 @@ zmq = zmq_async.import_zmq()
|
|||||||
|
|
||||||
class ZmqIncomingRequest(base.IncomingMessage):
|
class ZmqIncomingRequest(base.IncomingMessage):
|
||||||
|
|
||||||
def __init__(self, listener, context, message, socket, rep_id, poller):
|
def __init__(self, listener, socket, rep_id, request, poller):
|
||||||
super(ZmqIncomingRequest, self).__init__(listener, context, message)
|
super(ZmqIncomingRequest, self).__init__(listener, request.context,
|
||||||
|
request.message)
|
||||||
self.reply_socket = socket
|
self.reply_socket = socket
|
||||||
self.reply_id = rep_id
|
self.reply_id = rep_id
|
||||||
|
self.request = request
|
||||||
self.received = None
|
self.received = None
|
||||||
self.poller = poller
|
self.poller = poller
|
||||||
|
|
||||||
@ -39,15 +41,21 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
|||||||
if failure is not None:
|
if failure is not None:
|
||||||
failure = rpc_common.serialize_remote_exception(failure,
|
failure = rpc_common.serialize_remote_exception(failure,
|
||||||
log_failure)
|
log_failure)
|
||||||
message_reply = {zmq_names.FIELD_REPLY: reply,
|
message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE,
|
||||||
|
zmq_names.FIELD_REPLY: reply,
|
||||||
zmq_names.FIELD_FAILURE: failure,
|
zmq_names.FIELD_FAILURE: failure,
|
||||||
zmq_names.FIELD_LOG_FAILURE: log_failure}
|
zmq_names.FIELD_LOG_FAILURE: log_failure,
|
||||||
|
zmq_names.FIELD_ID: self.request.proxy_reply_id}
|
||||||
|
|
||||||
LOG.info("Replying %s REP", (str(message_reply)))
|
LOG.info("Replying %s REP", (str(self.request.message_id)))
|
||||||
|
|
||||||
self.received = True
|
self.received = True
|
||||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||||
|
if self.request.proxy_reply_id:
|
||||||
|
self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE)
|
||||||
|
self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE)
|
||||||
|
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||||
self.reply_socket.send_pyobj(message_reply)
|
self.reply_socket.send_pyobj(message_reply)
|
||||||
self.poller.resume_polling(self.reply_socket)
|
self.poller.resume_polling(self.reply_socket)
|
||||||
|
|
||||||
|
@ -31,8 +31,12 @@ class ZmqServer(base.Listener):
|
|||||||
super(ZmqServer, self).__init__(driver)
|
super(ZmqServer, self).__init__(driver)
|
||||||
self.matchmaker = matchmaker
|
self.matchmaker = matchmaker
|
||||||
self.poller = zmq_async.get_poller()
|
self.poller = zmq_async.get_poller()
|
||||||
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
|
if conf.zmq_use_broker:
|
||||||
conf, self.poller, self)
|
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
|
||||||
|
conf, self.poller, self)
|
||||||
|
else:
|
||||||
|
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
|
||||||
|
conf, self.poller, self)
|
||||||
self.notify_consumer = self.rpc_consumer
|
self.notify_consumer = self.rpc_consumer
|
||||||
self.consumers = [self.rpc_consumer]
|
self.consumers = [self.rpc_consumer]
|
||||||
|
|
||||||
|
@ -27,3 +27,14 @@ def get_tcp_random_address(conf):
|
|||||||
|
|
||||||
def get_broker_address(conf):
|
def get_broker_address(conf):
|
||||||
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
|
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
|
||||||
|
|
||||||
|
|
||||||
|
def target_to_key(target):
|
||||||
|
if target.topic and target.server:
|
||||||
|
attributes = ['topic', 'server']
|
||||||
|
key = ".".join(getattr(target, attr) for attr in attributes)
|
||||||
|
return key
|
||||||
|
if target.topic:
|
||||||
|
return target.topic
|
||||||
|
if target.server:
|
||||||
|
return target.server
|
||||||
|
@ -17,10 +17,23 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
|
|||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
FIELD_TYPE = 'type'
|
||||||
FIELD_FAILURE = 'failure'
|
FIELD_FAILURE = 'failure'
|
||||||
FIELD_REPLY = 'reply'
|
FIELD_REPLY = 'reply'
|
||||||
FIELD_LOG_FAILURE = 'log_failure'
|
FIELD_LOG_FAILURE = 'log_failure'
|
||||||
FIELD_ID = 'id'
|
FIELD_ID = 'id'
|
||||||
|
FIELD_MSG_ID = 'message_id'
|
||||||
|
FIELD_MSG_TYPE = 'msg_type'
|
||||||
|
FIELD_REPLY_ID = 'reply_id'
|
||||||
|
FIELD_TARGET = 'target'
|
||||||
|
|
||||||
|
|
||||||
|
IDX_REPLY_TYPE = 1
|
||||||
|
IDX_REPLY_BODY = 2
|
||||||
|
|
||||||
|
MULTIPART_IDX_ENVELOPE = 0
|
||||||
|
MULTIPART_IDX_BODY = 1
|
||||||
|
|
||||||
|
|
||||||
CALL_TYPE = 'call'
|
CALL_TYPE = 'call'
|
||||||
CAST_TYPE = 'cast'
|
CAST_TYPE = 'cast'
|
||||||
@ -28,6 +41,9 @@ CAST_FANOUT_TYPE = 'cast-f'
|
|||||||
NOTIFY_TYPE = 'notify'
|
NOTIFY_TYPE = 'notify'
|
||||||
NOTIFY_FANOUT_TYPE = 'notify-f'
|
NOTIFY_FANOUT_TYPE = 'notify-f'
|
||||||
|
|
||||||
|
REPLY_TYPE = 'reply'
|
||||||
|
ACK_TYPE = 'ack'
|
||||||
|
|
||||||
MESSAGE_TYPES = (CALL_TYPE,
|
MESSAGE_TYPES = (CALL_TYPE,
|
||||||
CAST_TYPE,
|
CAST_TYPE,
|
||||||
CAST_FANOUT_TYPE,
|
CAST_FANOUT_TYPE,
|
||||||
|
@ -57,6 +57,9 @@ class ZmqSocket(object):
|
|||||||
def send_pyobj(self, *args, **kwargs):
|
def send_pyobj(self, *args, **kwargs):
|
||||||
self.handle.send_pyobj(*args, **kwargs)
|
self.handle.send_pyobj(*args, **kwargs)
|
||||||
|
|
||||||
|
def send_multipart(self, *args, **kwargs):
|
||||||
|
self.handle.send_multipart(*args, **kwargs)
|
||||||
|
|
||||||
def recv(self, *args, **kwargs):
|
def recv(self, *args, **kwargs):
|
||||||
return self.handle.recv(*args, **kwargs)
|
return self.handle.recv(*args, **kwargs)
|
||||||
|
|
||||||
@ -69,6 +72,9 @@ class ZmqSocket(object):
|
|||||||
def recv_pyobj(self, *args, **kwargs):
|
def recv_pyobj(self, *args, **kwargs):
|
||||||
return self.handle.recv_pyobj(*args, **kwargs)
|
return self.handle.recv_pyobj(*args, **kwargs)
|
||||||
|
|
||||||
|
def recv_multipart(self, *args, **kwargs):
|
||||||
|
return self.handle.recv_multipart(*args, **kwargs)
|
||||||
|
|
||||||
def close(self, *args, **kwargs):
|
def close(self, *args, **kwargs):
|
||||||
self.handle.close(*args, **kwargs)
|
self.handle.close(*args, **kwargs)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user