From 33c3f16f0101ae3ef9966f9188bff2429d0d76d8 Mon Sep 17 00:00:00 2001
From: ozamiatin <ozamiatin@mirantis.com>
Date: Mon, 13 Jun 2016 16:43:59 +0300
Subject: [PATCH] [zmq] Properly stop ZmqServer

Each service being stopped should become
invisible for clients. For zmq listener that
means to cleanup it's redis target record
and stop updating it.

Starting of all consumers happen in init method.

Change-Id: Icd2d2fc73871de1d4845280432f5dec9eab8d381
Closes-Bug: #1590000
---
 .../server/consumers/zmq_consumer_base.py     | 13 +++++++++
 .../server/consumers/zmq_dealer_consumer.py   | 27 ++++++++++---------
 .../_drivers/zmq_driver/server/zmq_server.py  |  8 +++---
 .../_drivers/zmq_driver/zmq_updater.py        |  3 +++
 4 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index 69c6958db..d413a988c 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -40,6 +40,10 @@ class ConsumerBase(object):
         self.sockets = []
         self.context = zmq.Context()
 
+    def stop(self):
+        """Stop consumer polling/updates"""
+        pass
+
     @abc.abstractmethod
     def receive_message(self, target):
         """Method for poller - receiving message routine"""
@@ -63,6 +67,9 @@ class SingleSocketConsumer(ConsumerBase):
         self.target_updater = TargetUpdater(
             conf, self.matchmaker, self.target, self.host, socket_type)
 
+    def stop(self):
+        self.target_updater.stop()
+
     def subscribe_socket(self, socket_type):
         try:
             socket = zmq_socket.ZmqRandomPortSocket(
@@ -113,3 +120,9 @@ class TargetUpdater(zmq_updater.UpdaterBase):
             self.target, self.host,
             zmq_names.socket_type_str(self.socket_type),
             expire=self.conf.zmq_target_expire)
+
+    def stop(self):
+        super(TargetUpdater, self).stop()
+        self.matchmaker.unregister(
+            self.target, self.host,
+            zmq_names.socket_type_str(self.socket_type))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index f0fd11177..c7792df82 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -77,24 +77,27 @@ class DealerIncomingRequest(base.RpcIncomingMessage):
         """Requeue is not supported"""
 
 
-class DealerConsumer(zmq_consumer_base.ConsumerBase):
+class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
 
     def __init__(self, conf, poller, server):
-        super(DealerConsumer, self).__init__(conf, poller, server)
-        self.matchmaker = server.matchmaker
-        self.target = server.target
         self.sockets_manager = zmq_publisher_base.SocketsManager(
-            conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
-        self.socket = self.sockets_manager.get_socket_to_routers()
-        self.poller.register(self.socket, self.receive_message)
-        self.host = self.socket.handle.identity
-        self.target_updater = zmq_consumer_base.TargetUpdater(
-            conf, self.matchmaker, self.target, self.host,
-            zmq.DEALER)
+            conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
+        self.host = None
+        super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
         self.connection_updater = ConsumerConnectionUpdater(
             conf, self.matchmaker, self.socket)
         LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
 
+    def subscribe_socket(self, socket_type):
+        try:
+            socket = self.sockets_manager.get_socket_to_routers()
+            self.host = socket.handle.identity
+            self.poller.register(socket, self.receive_message)
+            return socket
+        except zmq.ZMQError as e:
+            LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e)
+            raise rpc_common.RPCException(str(e))
+
     def receive_message(self, socket):
         try:
             empty = socket.recv()
@@ -120,8 +123,6 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
     def cleanup(self):
         LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
         super(DealerConsumer, self).cleanup()
-        self.matchmaker.unregister(self.target, self.host,
-                                   zmq_names.socket_type_str(zmq.DEALER))
 
 
 class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index c963c452f..fa7b0bc9c 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -62,10 +62,10 @@ class ZmqServer(base.PollStyleListener):
         return message
 
     def stop(self):
-        if self.router_consumer:
-            LOG.info(_LI("Stop server %(address)s:%(port)s"),
-                     {'address': self.router_consumer.address,
-                      'port': self.router_consumer.port})
+        self.poller.close()
+        LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
+        for consumer in self.consumers:
+            consumer.stop()
 
     def cleanup(self):
         self.poller.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
index 0b4594a33..a8ea82279 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -34,6 +34,9 @@ class UpdaterBase(object):
         self.executor = zmq_async.get_executor(method=self._update_loop)
         self.executor.execute()
 
+    def stop(self):
+        self.executor.stop()
+
     def _update_loop(self):
         self.update_method()
         time.sleep(self.conf.zmq_target_update)