diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 5cf92999e..e754c86e1 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -122,6 +122,39 @@ zmq_opts = [ 'even if server is disconnected, when the server ' 'appears we send all accumulated messages to it.'), + cfg.IntOpt('zmq_tcp_keepalive', default=-1, + help='Enable/disable TCP keepalive (KA) mechanism. ' + 'The default value of -1 (or any other negative value) ' + 'means to skip any overrides and leave it to OS default; ' + '0 and 1 (or any other positive value) mean to ' + 'disable and enable the option respectively.'), + + cfg.IntOpt('zmq_tcp_keepalive_idle', default=-1, + help='The duration between two keepalive transmissions in ' + 'idle condition. ' + 'The unit is platform dependent, for example, ' + 'seconds in Linux, milliseconds in Windows etc. ' + 'The default value of -1 (or any other negative value ' + 'and 0) means to skip any overrides and leave it ' + 'to OS default.'), + + cfg.IntOpt('zmq_tcp_keepalive_cnt', default=-1, + help='The number of retransmissions to be carried out before ' + 'declaring that remote end is not available. ' + 'The default value of -1 (or any other negative value ' + 'and 0) means to skip any overrides and leave it ' + 'to OS default.'), + + cfg.IntOpt('zmq_tcp_keepalive_intvl', default=-1, + help='The duration between two successive keepalive ' + 'retransmissions, if acknowledgement to the previous ' + 'keepalive transmission is not received. ' + 'The unit is platform dependent, for example, ' + 'seconds in Linux, milliseconds in Windows etc. ' + 'The default value of -1 (or any other negative value ' + 'and 0) means to skip any overrides and leave it ' + 'to OS default.'), + cfg.IntOpt('rpc_thread_pool_size', default=100, help='Maximum number of (green) threads to work concurrently.'), diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 92e80785b..567d8ceed 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -52,10 +52,36 @@ class ZmqSocket(object): self.close_linger = \ self.conf.oslo_messaging_zmq.rpc_zmq_linger * 1000 self.handle.setsockopt(zmq.LINGER, self.close_linger) + # Put messages to only connected queues self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) - self.handle.identity = six.b(str(uuid.uuid4())) if identity is None \ - else identity + + # Configure TCP KA + keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive + if keepalive < 0: + keepalive = -1 + elif keepalive > 0: + keepalive = 1 + self.handle.setsockopt(zmq.TCP_KEEPALIVE, keepalive) + + keepalive_idle = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_idle + if keepalive_idle <= 0: + keepalive_idle = -1 + self.handle.setsockopt(zmq.TCP_KEEPALIVE_IDLE, keepalive_idle) + + keepalive_cnt = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_cnt + if keepalive_cnt <= 0: + keepalive_cnt = -1 + self.handle.setsockopt(zmq.TCP_KEEPALIVE_CNT, keepalive_cnt) + + keepalive_intvl = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_intvl + if keepalive_intvl <= 0: + keepalive_intvl = -1 + self.handle.setsockopt(zmq.TCP_KEEPALIVE_INTVL, keepalive_intvl) + + self.handle.identity = \ + six.b(str(uuid.uuid4())) if identity is None else identity + self.connections = set() def _get_serializer(self, serialization):