diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 69c6958db..d413a988c 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -40,6 +40,10 @@ class ConsumerBase(object): self.sockets = [] self.context = zmq.Context() + def stop(self): + """Stop consumer polling/updates""" + pass + @abc.abstractmethod def receive_message(self, target): """Method for poller - receiving message routine""" @@ -63,6 +67,9 @@ class SingleSocketConsumer(ConsumerBase): self.target_updater = TargetUpdater( conf, self.matchmaker, self.target, self.host, socket_type) + def stop(self): + self.target_updater.stop() + def subscribe_socket(self, socket_type): try: socket = zmq_socket.ZmqRandomPortSocket( @@ -113,3 +120,9 @@ class TargetUpdater(zmq_updater.UpdaterBase): self.target, self.host, zmq_names.socket_type_str(self.socket_type), expire=self.conf.zmq_target_expire) + + def stop(self): + super(TargetUpdater, self).stop() + self.matchmaker.unregister( + self.target, self.host, + zmq_names.socket_type_str(self.socket_type)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index f0fd11177..c7792df82 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -77,24 +77,27 @@ class DealerIncomingRequest(base.RpcIncomingMessage): """Requeue is not supported""" -class DealerConsumer(zmq_consumer_base.ConsumerBase): +class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): - super(DealerConsumer, self).__init__(conf, poller, server) - self.matchmaker = server.matchmaker - self.target = server.target self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, self.matchmaker, zmq.ROUTER, zmq.DEALER) - self.socket = self.sockets_manager.get_socket_to_routers() - self.poller.register(self.socket, self.receive_message) - self.host = self.socket.handle.identity - self.target_updater = zmq_consumer_base.TargetUpdater( - conf, self.matchmaker, self.target, self.host, - zmq.DEALER) + conf, server.matchmaker, zmq.ROUTER, zmq.DEALER) + self.host = None + super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER) self.connection_updater = ConsumerConnectionUpdater( conf, self.matchmaker, self.socket) LOG.info(_LI("[%s] Run DEALER consumer"), self.host) + def subscribe_socket(self, socket_type): + try: + socket = self.sockets_manager.get_socket_to_routers() + self.host = socket.handle.identity + self.poller.register(socket, self.receive_message) + return socket + except zmq.ZMQError as e: + LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e) + raise rpc_common.RPCException(str(e)) + def receive_message(self, socket): try: empty = socket.recv() @@ -120,8 +123,6 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): def cleanup(self): LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host) super(DealerConsumer, self).cleanup() - self.matchmaker.unregister(self.target, self.host, - zmq_names.socket_type_str(zmq.DEALER)) class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index c963c452f..fa7b0bc9c 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -62,10 +62,10 @@ class ZmqServer(base.PollStyleListener): return message def stop(self): - if self.router_consumer: - LOG.info(_LI("Stop server %(address)s:%(port)s"), - {'address': self.router_consumer.address, - 'port': self.router_consumer.port}) + self.poller.close() + LOG.info(_LI("Stop server %(target)s"), {'target': self.target}) + for consumer in self.consumers: + consumer.stop() def cleanup(self): self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index 0b4594a33..a8ea82279 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -34,6 +34,9 @@ class UpdaterBase(object): self.executor = zmq_async.get_executor(method=self._update_loop) self.executor.execute() + def stop(self): + self.executor.stop() + def _update_loop(self): self.update_method() time.sleep(self.conf.zmq_target_update)