Get rid of proxy process in zmq

As far as we use redis as a name service we don't need a proxy,
becase we can pass binded port over name service too.

Change-Id: I59bbe2b34dcedfeef113ef06d6a988e1c413405e
This commit is contained in:
Oleksii Zamiatin 2015-07-23 14:05:04 +03:00 committed by Victor Sergeyev
parent e2c3e36d75
commit 315e56ae2b
24 changed files with 135 additions and 744 deletions

View File

@ -1 +0,0 @@

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import sys
from oslo_config import cfg
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_broker
from oslo_messaging._executors import base # FIXME(markmc)
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(base._pool_opts)
# TODO(ozamiatin): Move this option assignment to an external config file
# Use efficient zmq poller in real-world deployment
CONF.rpc_zmq_native = True
def main():
CONF(sys.argv[1:], project='oslo')
logging.basicConfig(level=logging.DEBUG)
with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor:
reactor.start()
reactor.wait()
if __name__ == "__main__":
main()

View File

@ -1 +0,0 @@
__author__ = 'ozamiatin'

View File

@ -1,163 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.common import RPCException
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_target
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
"""Base TCP-proxy.
TCP-proxy redirects messages received by TCP from clients to servers
over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
in async executor.
"""
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
self.context = context
self.executor = zmq_async.get_executor(
self.run, native_zmq=conf.rpc_zmq_native)
@abc.abstractmethod
def run(self):
"""Main execution point of the proxy"""
def start(self):
self.executor.execute()
def stop(self):
self.executor.stop()
def wait(self):
self.executor.wait()
@six.add_metaclass(abc.ABCMeta)
class BaseTcpFrontend(object):
"""Base frontend clause.
TCP-frontend is a part of TCP-proxy which receives incoming
messages from clients.
"""
def __init__(self, conf, poller, context,
socket_type=None,
port_number=None,
receive_meth=None):
"""Construct a TCP-frontend.
Its attributes are:
:param conf: Driver configuration object.
:type conf: ConfigOpts
:param poller: Messages poller-object green or threading.
:type poller: ZmqPoller
:param context: ZeroMQ context object.
:type context: zmq.Context
:param socket_type: ZeroMQ socket type.
:type socket_type: int
:param port_number: Current messaging pipeline port.
:type port_number: int
:param receive_meth: Receive method for poller.
:type receive_meth: method
"""
self.conf = conf
self.poller = poller
self.context = context
try:
self.frontend = self.context.socket(socket_type)
bind_address = zmq_target.get_tcp_bind_address(port_number)
LOG.info(_LI("Binding to TCP %s") % bind_address)
self.frontend.bind(bind_address)
self.poller.register(self.frontend, receive_meth)
except zmq.ZMQError as e:
errmsg = _LE("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use: %s") % str(e)
LOG.error(errmsg)
raise RPCException(errmsg)
def receive_incoming(self):
message, socket = self.poller.poll(1)
LOG.info(_LI("Message %s received."), message)
return message
def close(self):
self.frontend.close()
@six.add_metaclass(abc.ABCMeta)
class BaseBackendMatcher(object):
def __init__(self, conf, poller, context):
self.conf = conf
self.context = context
self.backends = {}
self.poller = poller
@abc.abstractmethod
def redirect_to_backend(self, message):
"""Redirect message"""
def close(self):
if self.backends:
for backend in self.backends.values():
backend.close()
@six.add_metaclass(abc.ABCMeta)
class DirectBackendMatcher(BaseBackendMatcher):
def redirect_to_backend(self, message):
backend, target = self._match_backend(message)
self._send_message(backend, message, target)
def _match_backend(self, message):
target = self._get_target(message)
ipc_address = self._get_ipc_address(target)
backend = self._create_backend(ipc_address)
return backend, target
@abc.abstractmethod
def _get_target(self, message):
"""Extract topic from message"""
@abc.abstractmethod
def _get_ipc_address(self, target):
"""Get ipc backend address from topic"""
@abc.abstractmethod
def _send_message(self, backend, message, target):
"""Backend specific sending logic"""
@abc.abstractmethod
def _create_backend(self, ipc_address):
"""Backend specific socket opening logic"""

View File

@ -1,71 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from oslo_utils import excutils
from oslo_messaging._drivers.zmq_driver.broker import zmq_universal_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
class ZmqBroker(object):
"""Local messaging IPC broker (nodes are still peers).
The main purpose is to have one TCP connection
(one TCP port assigned for ZMQ messaging) per node.
There could be a number of services running on a node.
Without such broker a number of opened TCP ports used for
messaging become unpredictable for the engine.
All messages are coming to TCP ROUTER socket and then
distributed between their targets by topic via IPC.
"""
def __init__(self, conf):
super(ZmqBroker, self).__init__()
zmq = zmq_async.import_zmq(native_zmq=conf.rpc_zmq_native)
self.conf = conf
self.context = zmq.Context()
proxy = zmq_universal_proxy.UniversalProxy(conf, self.context)
self.proxies = [proxy]
self._create_ipc_dirs()
def _create_ipc_dirs(self):
ipc_dir = self.conf.rpc_zmq_ipc_dir
try:
os.makedirs("%s/fanout" % ipc_dir)
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_LE("Required IPC directory does not exist at"
" %s"), ipc_dir)
def start(self):
for proxy in self.proxies:
proxy.start()
def wait(self):
for proxy in self.proxies:
proxy.wait()
def close(self):
LOG.info(_LI("Broker shutting down ..."))
for proxy in self.proxies:
proxy.stop()

View File

@ -1,106 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.common import RPCException
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_target
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerBackend(base_proxy.DirectBackendMatcher):
def __init__(self, conf, context, poller=None):
if poller is None:
poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
super(DealerBackend, self).__init__(conf, poller, context)
def _get_target(self, message):
return zmq_serializer.get_target_from_call_message(message)
def _get_ipc_address(self, target):
return zmq_target.get_ipc_address_call(self.conf, target)
def _send_message(self, backend, message, topic):
# Empty needed for awaiting REP socket to work properly
# (DEALER-REP usage specific)
backend.send(b'', zmq.SNDMORE)
backend.send(message.pop(0), zmq.SNDMORE)
backend.send_string(message.pop(0), zmq.SNDMORE)
message.pop(0) # Drop target unneeded any more
backend.send_multipart(message)
def _create_backend(self, ipc_address):
if ipc_address in self.backends:
return self.backends[ipc_address]
backend = self.context.socket(zmq.DEALER)
backend.connect(ipc_address)
self.poller.register(backend)
self.backends[ipc_address] = backend
return backend
class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context, poller=None):
if poller is None:
poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
super(FrontendTcpRouter, self).__init__(
conf, poller, context,
socket_type=zmq.ROUTER,
port_number=conf.rpc_zmq_port,
receive_meth=self._receive_message)
def _receive_message(self, socket):
try:
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
msg_type = socket.recv_string()
target_dict = socket.recv_json()
target = zmq_target.target_from_dict(target_dict)
other = socket.recv_multipart()
except zmq.ZMQError as e:
LOG.error(_LE("Error receiving message %s") % str(e))
return None
if msg_type == zmq_serializer.FANOUT_TYPE:
other.insert(0, zmq_target.target_to_str(target).encode("utf-8"))
return [reply_id, msg_type, target] + other
@staticmethod
def _reduce_empty(reply):
reply.pop(0)
return reply
def redirect_outgoing_reply(self, reply):
self._reduce_empty(reply)
try:
self.frontend.send_multipart(reply)
LOG.info(_LI("Redirecting reply to client %s") % reply)
except zmq.ZMQError:
errmsg = _LE("Failed redirecting reply to client %s") % reply
LOG.error(errmsg)
raise RPCException(errmsg)

View File

@ -1,38 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_target
zmq = zmq_async.import_zmq()
class PublisherBackend(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native)
super(PublisherBackend, self).__init__(conf, poller, context)
self.backend = self.context.socket(zmq.PUB)
self.backend.bind(zmq_target.get_ipc_address_fanout(conf))
def redirect_to_backend(self, message):
target_pos = zmq_serializer.MESSAGE_CALL_TARGET_POSITION + 1
msg = message[target_pos:]
self.backend.send_multipart(msg)
def close(self):
self.backend.close()

View File

@ -1,72 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
class UniversalProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(UniversalProxy, self).__init__(conf, context)
self.poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(
conf, context, poller=self.poller)
self.backend_matcher = BackendMatcher(
conf, context, poller=self.poller)
call = zmq_serializer.CALL_TYPE
self.call_backend = self.backend_matcher.backends[call]
LOG.info(_LI("Starting universal-proxy thread"))
def run(self):
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if message is None:
return
LOG.info(_LI("Received message at universal proxy: %s") % str(message))
if socket == self.tcp_frontend.frontend:
self.backend_matcher.redirect_to_backend(message)
else:
self.tcp_frontend.redirect_outgoing_reply(message)
def stop(self):
self.poller.close()
super(UniversalProxy, self).stop()
self.tcp_frontend.close()
self.backend_matcher.close()
class BackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context, poller=None):
super(BackendMatcher, self).__init__(conf, poller, context)
direct_backend = zmq_call_proxy.DealerBackend(conf, context, poller)
self.backends[zmq_serializer.CALL_TYPE] = direct_backend
self.backends[zmq_serializer.CAST_TYPE] = direct_backend
fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context)
self.backends[zmq_serializer.FANOUT_TYPE] = fanout_backend
def redirect_to_backend(self, message):
message_type = zmq_serializer.get_msg_type(message)
self.backends[message_type].redirect_to_backend(message)

View File

@ -46,7 +46,7 @@ class MatchMakerBase(object):
if len(hosts) == 0:
LOG.warning(_LW("No hosts were found for target %s. Using "
"localhost") % target)
return "localhost"
return "localhost:" + str(self.conf.rpc_zmq_port)
elif len(hosts) == 1:
LOG.info(_LI("A single host found for target %s.") % target)
return hosts[0]

View File

@ -62,12 +62,15 @@ class GreenPoller(zmq_poller.ZmqPoller):
for thread in self.threads:
thread.kill()
self.threads = []
class HoldReplyPoller(GreenPoller):
def __init__(self):
super(HoldReplyPoller, self).__init__()
self.event_by_socket = {}
self._is_running = threading.Event()
def register(self, socket, recv_method=None):
super(HoldReplyPoller, self).register(socket, recv_method)
@ -79,7 +82,7 @@ class HoldReplyPoller(GreenPoller):
def _socket_receive(self, socket, recv_method=None):
pause = self.event_by_socket[socket]
while True:
while not self._is_running.is_set():
pause.clear()
if recv_method:
incoming = recv_method(socket)
@ -88,6 +91,14 @@ class HoldReplyPoller(GreenPoller):
self.incoming_queue.put((incoming, socket))
pause.wait()
def close(self):
self._is_running.set()
for pause in self.event_by_socket.values():
pause.set()
eventlet.sleep()
super(HoldReplyPoller, self).close()
class GreenExecutor(zmq_poller.Executor):

View File

@ -42,25 +42,26 @@ class CallRequest(Request):
message, socket,
zmq_serializer.CALL_TYPE,
timeout, retry)
self.host = self.matchmaker.get_single_host(self.target)
self.connect_address = zmq_target.get_tcp_address_call(conf,
self.connect_address = zmq_target.get_tcp_direct_address(
self.host)
LOG.info(_LI("Connecting REQ to %s") % self.connect_address)
self.socket.connect(self.connect_address)
self.reply_poller.register(
self.socket, recv_method=lambda socket: socket.recv_json())
except zmq.ZMQError as e:
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise
errmsg = _LE("Error connecting to socket: %s") % str(e)
LOG.error(errmsg)
raise rpc_common.RPCException(errmsg)
def close(self):
self.reply_poller.close()
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
self.reply_poller.register(
self.socket, recv_method=lambda socket: socket.recv_json())
reply, socket = self.reply_poller.poll(timeout=self.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(

View File

@ -14,6 +14,7 @@
import logging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
@ -58,7 +59,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
def cast(self, target, context,
message, timeout=None, retry=None):
host = self.matchmaker.get_single_host(target)
connect_address = zmq_target.get_tcp_address_call(self.conf, host)
connect_address = zmq_target.get_tcp_direct_address(host)
dealer_socket = self._create_socket(connect_address)
request = CastRequest(self.conf, target, context, message,
dealer_socket, connect_address, timeout, retry)
@ -73,11 +74,14 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
dealer_socket.connect(address)
self.outbound_sockets[address] = dealer_socket
return dealer_socket
except zmq.ZMQError:
LOG.error(_LE("Failed connecting DEALER to %s") % address)
raise
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\
% (address, e)
LOG.error(errmsg)
raise rpc_common.RPCException(errmsg)
def cleanup(self):
if self.outbound_sockets:
for socket in self.outbound_sockets.values():
socket.setsockopt(zmq.LINGER, 0)
socket.close()

View File

@ -62,7 +62,6 @@ class Request(object):
def send_request(self):
self.socket.send_string(self.msg_type, zmq.SNDMORE)
self.socket.send_json(self.target.__dict__, zmq.SNDMORE)
self.socket.send_string(self.msg_id, zmq.SNDMORE)
self.socket.send_json(self.context, zmq.SNDMORE)
self.socket.send_json(self.message)

View File

@ -1,37 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ConsumerBase(object):
def __init__(self, listener, conf, zmq_poller, context):
self.listener = listener
self.conf = conf
self.poller = zmq_poller
self.context = context
self.sockets_per_target = {}
def cleanup(self):
if self.sockets_per_target:
for socket in self.sockets_per_target.values():
socket.close()
@abc.abstractmethod
def listen(self, target):
"""Listen for target"""

View File

@ -1,74 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import six
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_target as topic_utils
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqFanoutMessage(base.IncomingMessage):
def __init__(self, listener, context, message, socket, poller):
super(ZmqFanoutMessage, self).__init__(listener, context, message)
poller.resume_polling(socket)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for fanout(cast) messages"""
def acknowledge(self):
pass
def requeue(self):
pass
class FanoutConsumer(zmq_base_consumer.ConsumerBase):
def _receive_message(self, socket):
try:
topic = socket.recv_string()
assert topic is not None, 'Bad format: Topic is expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqFanoutMessage(self.listener, context, message,
socket, self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
def listen(self, target):
topic = topic_utils.target_to_str(target)
ipc_address = topic_utils.get_ipc_address_fanout(self.conf)
sub_socket = self.context.socket(zmq.SUB)
sub_socket.connect(ipc_address)
if six.PY3:
sub_socket.setsockopt_string(zmq.SUBSCRIBE, str(topic))
else:
sub_socket.setsockopt(zmq.SUBSCRIBE, str(topic))
self.poller.register(sub_socket, self._receive_message)

View File

@ -17,11 +17,8 @@ import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_target
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -59,31 +56,17 @@ class ZmqIncomingRequest(base.IncomingMessage):
pass
class CallResponder(zmq_base_consumer.ConsumerBase):
class ZmqFanoutMessage(base.IncomingMessage):
def _receive_message(self, socket):
try:
reply_id = socket.recv()
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqIncomingRequest(self.listener,
context,
message, socket,
reply_id,
self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))
def __init__(self, listener, context, message, socket, poller):
super(ZmqFanoutMessage, self).__init__(listener, context, message)
poller.resume_polling(socket)
def listen(self, target):
ipc_rep_address = zmq_target.get_ipc_address_call(self.conf, target)
rep_socket = self.context.socket(zmq.REP)
rep_socket.bind(ipc_rep_address)
str_target = zmq_target.target_to_str(target)
self.sockets_per_target[str_target] = rep_socket
self.poller.register(rep_socket, self._receive_message)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for fanout(cast) messages"""
def acknowledge(self):
pass
def requeue(self):
pass

View File

@ -15,9 +15,12 @@
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_fanout_consumer
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_target
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -27,36 +30,64 @@ zmq = zmq_async.import_zmq()
class ZmqServer(base.Listener):
def __init__(self, conf, matchmaker=None):
LOG.info("[Server] __init__")
self.conf = conf
try:
self.context = zmq.Context()
self.poller = zmq_async.get_reply_poller()
self.socket = self.context.socket(zmq.ROUTER)
self.address = zmq_target.get_tcp_random_address(conf)
self.port = self.socket.bind_to_random_port(self.address)
LOG.info("Run server on tcp://%s:%d" %
(self.address, self.port))
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
% (self.port, e)
LOG.error(errmsg)
raise rpc_common.RPCException(errmsg)
self.poller = zmq_async.get_poller()
self.poller.register(self.socket, self._receive_message)
self.matchmaker = matchmaker
self.call_resp = zmq_call_responder.CallResponder(self, conf,
self.poller,
self.context)
self.fanout_resp = zmq_fanout_consumer.FanoutConsumer(self, conf,
self.poller,
self.context)
def poll(self, timeout=None):
incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout)
return incoming[0]
def stop(self):
LOG.info("[Server] Stop")
LOG.info("Stop server tcp://%s:%d" % (self.address, self.port))
def cleanup(self):
self.poller.close()
self.call_resp.cleanup()
self.fanout_resp.cleanup()
if not self.socket.closed:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
LOG.info("Listen to Target %s on tcp://%s:%d" %
(target, self.address, self.port))
host = zmq_target.combine_address(self.conf.rpc_zmq_host, self.port)
self.matchmaker.register(target=target,
hostname=self.conf.rpc_zmq_host)
if target.fanout:
self.fanout_resp.listen(target)
else:
self.call_resp.listen(target)
hostname=host)
def _receive_message(self, socket):
try:
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
target_dict = socket.recv_json()
assert target_dict is not None, 'Bad format: target expected'
context = socket.recv_json()
message = socket.recv_json()
LOG.debug("Received CALL message %s" % str(message))
direct_type = (zmq_serializer.CALL_TYPE, zmq_serializer.CAST_TYPE)
if msg_type in direct_type:
return zmq_incoming_message.ZmqIncomingRequest(
self, context, message, socket, reply_id, self.poller)
elif msg_type == zmq_serializer.FANOUT_TYPE:
return zmq_incoming_message.ZmqFanoutMessage(
self, context, message, socket, self.poller)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))

View File

@ -32,6 +32,9 @@ class ZmqPoller(object):
def close(self):
"""Terminate polling"""
def resume_polling(self, socket):
"""Resume with polling"""
@six.add_metaclass(abc.ABCMeta)
class Executor(object):

View File

@ -44,7 +44,7 @@ def get_msg_type(message):
if type not in MESSAGE_TYPES:
errmsg = _LE("Unknown message type: %s") % str(type)
LOG.error(errmsg)
rpc_common.RPCException(errmsg)
raise rpc_common.RPCException(errmsg)
return type

View File

@ -15,11 +15,6 @@
from oslo_messaging import target
def get_ipc_address_call(conf, target):
target_addr = target_to_str(target)
return "ipc://%s/%s" % (conf.rpc_zmq_ipc_dir, target_addr)
def get_tcp_bind_address(port):
return "tcp://*:%s" % port
@ -28,19 +23,27 @@ def get_tcp_address_call(conf, host):
return "tcp://%s:%s" % (host, conf.rpc_zmq_port)
def get_ipc_address_cast(conf, target):
target_addr = target_to_str(target)
return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, target_addr)
def combine_address(host, port):
return "%s:%s" % (host, port)
def get_ipc_address_fanout(conf):
return "ipc://%s/fanout_general" % conf.rpc_zmq_ipc_dir
def get_tcp_direct_address(host):
return "tcp://%s" % (host)
def get_tcp_random_address(conf):
return "tcp://*"
def target_to_str(target):
if target.server is None:
return target.topic
return "%s.%s" % (target.server, target.topic)
items = []
if target.topic:
items.append(target.topic)
if target.exchange:
items.append(target.exchange)
if target.server:
items.append(target.server)
return '.'.join(items)
def target_from_dict(target_dict):

View File

@ -77,4 +77,4 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
def test_get_single_host_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual(self.test_matcher.get_single_host(target),
"localhost")
"localhost:9501")

View File

@ -13,7 +13,6 @@
# under the License.
import logging
import socket
import threading
import fixtures
@ -21,7 +20,6 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
@ -61,40 +59,26 @@ class TestRPCServerListener(object):
self.executor.stop()
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.broker = ZmqBroker(self.conf)
self.broker.start()
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestRPCServerListener(self.driver)
@ -118,8 +102,6 @@ class stopRpc(object):
self.attrs = attrs
def __call__(self):
if self.attrs['broker']:
self.attrs['broker'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
if self.attrs['listener']:
@ -151,7 +133,7 @@ class TestZmqBasics(ZmqBaseTestCase):
def test_send_noreply(self):
"""Cast() with topic."""
target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
result = self.driver.send(
target, {},

View File

@ -22,6 +22,4 @@ EOF
redis-server --port $ZMQ_REDIS_PORT &
oslo-messaging-zmq-receiver --config-file ${DATADIR}/zmq.conf > ${DATADIR}/receiver.log 2>&1 &
$*

View File

@ -13,7 +13,6 @@
# under the License.
import logging
import socket
import threading
import fixtures
@ -21,7 +20,6 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
@ -61,40 +59,26 @@ class TestRPCServerListener(object):
self.executor.stop()
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.broker = ZmqBroker(self.conf)
self.broker.start()
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestRPCServerListener(self.driver)
@ -118,8 +102,6 @@ class stopRpc(object):
self.attrs = attrs
def __call__(self):
if self.attrs['broker']:
self.attrs['broker'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
if self.attrs['listener']:
@ -146,12 +128,12 @@ class TestZmqBasics(ZmqBaseTestCase):
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertIsNotNone(result)
self.assertTrue(result)
def test_send_noreply(self):
"""Cast() with topic."""
target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
result = self.driver.send(
target, {},
@ -165,20 +147,21 @@ class TestZmqBasics(ZmqBaseTestCase):
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
@testtools.skip("Not implemented feature")
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
self.assertEqual(msg_pattern, self.listener.message)
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""