[zmq] Properly stop ZmqServer

Each service being stopped should become
invisible for clients. For zmq listener that
means to cleanup it's redis target record
and stop updating it.

Starting of all consumers happen in init method.

Change-Id: Icd2d2fc73871de1d4845280432f5dec9eab8d381
Closes-Bug: #1590000
This commit is contained in:
ozamiatin 2016-06-13 16:43:59 +03:00
parent d8b6bb0884
commit 33c3f16f01
4 changed files with 34 additions and 17 deletions
oslo_messaging/_drivers/zmq_driver

@ -40,6 +40,10 @@ class ConsumerBase(object):
self.sockets = []
self.context = zmq.Context()
def stop(self):
"""Stop consumer polling/updates"""
pass
@abc.abstractmethod
def receive_message(self, target):
"""Method for poller - receiving message routine"""
@ -63,6 +67,9 @@ class SingleSocketConsumer(ConsumerBase):
self.target_updater = TargetUpdater(
conf, self.matchmaker, self.target, self.host, socket_type)
def stop(self):
self.target_updater.stop()
def subscribe_socket(self, socket_type):
try:
socket = zmq_socket.ZmqRandomPortSocket(
@ -113,3 +120,9 @@ class TargetUpdater(zmq_updater.UpdaterBase):
self.target, self.host,
zmq_names.socket_type_str(self.socket_type),
expire=self.conf.zmq_target_expire)
def stop(self):
super(TargetUpdater, self).stop()
self.matchmaker.unregister(
self.target, self.host,
zmq_names.socket_type_str(self.socket_type))

@ -77,24 +77,27 @@ class DealerIncomingRequest(base.RpcIncomingMessage):
"""Requeue is not supported"""
class DealerConsumer(zmq_consumer_base.ConsumerBase):
class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
super(DealerConsumer, self).__init__(conf, poller, server)
self.matchmaker = server.matchmaker
self.target = server.target
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_routers()
self.poller.register(self.socket, self.receive_message)
self.host = self.socket.handle.identity
self.target_updater = zmq_consumer_base.TargetUpdater(
conf, self.matchmaker, self.target, self.host,
zmq.DEALER)
conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
self.host = None
super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
self.connection_updater = ConsumerConnectionUpdater(
conf, self.matchmaker, self.socket)
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
def subscribe_socket(self, socket_type):
try:
socket = self.sockets_manager.get_socket_to_routers()
self.host = socket.handle.identity
self.poller.register(socket, self.receive_message)
return socket
except zmq.ZMQError as e:
LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e)
raise rpc_common.RPCException(str(e))
def receive_message(self, socket):
try:
empty = socket.recv()
@ -120,8 +123,6 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
super(DealerConsumer, self).cleanup()
self.matchmaker.unregister(self.target, self.host,
zmq_names.socket_type_str(zmq.DEALER))
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):

@ -62,10 +62,10 @@ class ZmqServer(base.PollStyleListener):
return message
def stop(self):
if self.router_consumer:
LOG.info(_LI("Stop server %(address)s:%(port)s"),
{'address': self.router_consumer.address,
'port': self.router_consumer.port})
self.poller.close()
LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
for consumer in self.consumers:
consumer.stop()
def cleanup(self):
self.poller.close()

@ -34,6 +34,9 @@ class UpdaterBase(object):
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()
def stop(self):
self.executor.stop()
def _update_loop(self):
self.update_method()
time.sleep(self.conf.zmq_target_update)