Merge "[zmq] Properly stop ZmqServer"
This commit is contained in:
commit
cca3091ba4
oslo_messaging/_drivers/zmq_driver
@ -40,6 +40,10 @@ class ConsumerBase(object):
|
|||||||
self.sockets = []
|
self.sockets = []
|
||||||
self.context = zmq.Context()
|
self.context = zmq.Context()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop consumer polling/updates"""
|
||||||
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def receive_message(self, target):
|
def receive_message(self, target):
|
||||||
"""Method for poller - receiving message routine"""
|
"""Method for poller - receiving message routine"""
|
||||||
@ -63,6 +67,9 @@ class SingleSocketConsumer(ConsumerBase):
|
|||||||
self.target_updater = TargetUpdater(
|
self.target_updater = TargetUpdater(
|
||||||
conf, self.matchmaker, self.target, self.host, socket_type)
|
conf, self.matchmaker, self.target, self.host, socket_type)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.target_updater.stop()
|
||||||
|
|
||||||
def subscribe_socket(self, socket_type):
|
def subscribe_socket(self, socket_type):
|
||||||
try:
|
try:
|
||||||
socket = zmq_socket.ZmqRandomPortSocket(
|
socket = zmq_socket.ZmqRandomPortSocket(
|
||||||
@ -113,3 +120,9 @@ class TargetUpdater(zmq_updater.UpdaterBase):
|
|||||||
self.target, self.host,
|
self.target, self.host,
|
||||||
zmq_names.socket_type_str(self.socket_type),
|
zmq_names.socket_type_str(self.socket_type),
|
||||||
expire=self.conf.zmq_target_expire)
|
expire=self.conf.zmq_target_expire)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super(TargetUpdater, self).stop()
|
||||||
|
self.matchmaker.unregister(
|
||||||
|
self.target, self.host,
|
||||||
|
zmq_names.socket_type_str(self.socket_type))
|
||||||
|
@ -77,24 +77,27 @@ class DealerIncomingRequest(base.RpcIncomingMessage):
|
|||||||
"""Requeue is not supported"""
|
"""Requeue is not supported"""
|
||||||
|
|
||||||
|
|
||||||
class DealerConsumer(zmq_consumer_base.ConsumerBase):
|
class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||||
|
|
||||||
def __init__(self, conf, poller, server):
|
def __init__(self, conf, poller, server):
|
||||||
super(DealerConsumer, self).__init__(conf, poller, server)
|
|
||||||
self.matchmaker = server.matchmaker
|
|
||||||
self.target = server.target
|
|
||||||
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
|
conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
self.socket = self.sockets_manager.get_socket_to_routers()
|
self.host = None
|
||||||
self.poller.register(self.socket, self.receive_message)
|
super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
|
||||||
self.host = self.socket.handle.identity
|
|
||||||
self.target_updater = zmq_consumer_base.TargetUpdater(
|
|
||||||
conf, self.matchmaker, self.target, self.host,
|
|
||||||
zmq.DEALER)
|
|
||||||
self.connection_updater = ConsumerConnectionUpdater(
|
self.connection_updater = ConsumerConnectionUpdater(
|
||||||
conf, self.matchmaker, self.socket)
|
conf, self.matchmaker, self.socket)
|
||||||
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
||||||
|
|
||||||
|
def subscribe_socket(self, socket_type):
|
||||||
|
try:
|
||||||
|
socket = self.sockets_manager.get_socket_to_routers()
|
||||||
|
self.host = socket.handle.identity
|
||||||
|
self.poller.register(socket, self.receive_message)
|
||||||
|
return socket
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e)
|
||||||
|
raise rpc_common.RPCException(str(e))
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
@ -120,8 +123,6 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
|
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
|
||||||
super(DealerConsumer, self).cleanup()
|
super(DealerConsumer, self).cleanup()
|
||||||
self.matchmaker.unregister(self.target, self.host,
|
|
||||||
zmq_names.socket_type_str(zmq.DEALER))
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
|
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||||
|
@ -62,10 +62,10 @@ class ZmqServer(base.PollStyleListener):
|
|||||||
return message
|
return message
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.router_consumer:
|
self.poller.close()
|
||||||
LOG.info(_LI("Stop server %(address)s:%(port)s"),
|
LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
|
||||||
{'address': self.router_consumer.address,
|
for consumer in self.consumers:
|
||||||
'port': self.router_consumer.port})
|
consumer.stop()
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.poller.close()
|
self.poller.close()
|
||||||
|
@ -34,6 +34,9 @@ class UpdaterBase(object):
|
|||||||
self.executor = zmq_async.get_executor(method=self._update_loop)
|
self.executor = zmq_async.get_executor(method=self._update_loop)
|
||||||
self.executor.execute()
|
self.executor.execute()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.executor.stop()
|
||||||
|
|
||||||
def _update_loop(self):
|
def _update_loop(self):
|
||||||
self.update_method()
|
self.update_method()
|
||||||
time.sleep(self.conf.zmq_target_update)
|
time.sleep(self.conf.zmq_target_update)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user