From 930e6189e25425576ed3a2f570bda3fd9d310bd6 Mon Sep 17 00:00:00 2001
From: Dan Smith <dansmith@redhat.com>
Date: Sat, 3 Mar 2018 13:48:11 -0800
Subject: [PATCH] Add heartbeat() method to RpcIncomingMessage

This adds a heartbeat() method to RpcIncomingMessage to be used by a
subsequent patch implementation of active-call heartbeating. This is
unimplemented in all drivers for the moment.

Change-Id: If8ab0dc16e3bef69d5a826c31c0fe35e403ac6a1
---
 oslo_messaging/_drivers/amqpdriver.py               |  3 +++
 oslo_messaging/_drivers/base.py                     | 13 +++++++++++++
 oslo_messaging/_drivers/impl_amqp1.py               |  3 +++
 oslo_messaging/_drivers/impl_fake.py                |  3 +++
 oslo_messaging/_drivers/impl_kafka.py               |  3 +++
 .../zmq_driver/server/zmq_incoming_message.py       |  3 +++
 6 files changed, 28 insertions(+)

diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 8fc72dbbb..abda9b0ba 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -178,6 +178,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
         # the end.
         self._message_operations_handler.do(self.message.requeue)
 
+    def heartbeat(self):
+        LOG.debug("Message heartbeat not implemented")
+
 
 class ObsoleteReplyQueuesCache(object):
     """Cache of reply queue id that doesn't exists anymore.
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index c09ab6f42..f02344126 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -154,6 +154,19 @@ class RpcIncomingMessage(IncomingMessage):
         :raises: Does not raise an exception
         """
 
+    @abc.abstractmethod
+    def heartbeat(self):
+        """Called by the server to send an RPC heartbeat message back to
+        the calling client.
+
+        If the client (is new enough to have) passed its timeout value during
+        the RPC call, this method will be called periodically by the server
+        to update the client's timeout timer while a long-running call is
+        executing.
+
+        :raises: Does not raise an exception
+        """
+
 
 @six.add_metaclass(abc.ABCMeta)
 class PollStyleListener(object):
diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py
index 5c1973d02..5714a6124 100644
--- a/oslo_messaging/_drivers/impl_amqp1.py
+++ b/oslo_messaging/_drivers/impl_amqp1.py
@@ -98,6 +98,9 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
         self._correlation_id = message.id
         self._disposition = disposition
 
+    def heartbeat(self):
+        LOG.debug("Message heartbeat not implemented")
+
     def reply(self, reply=None, failure=None):
         """Schedule an RPCReplyTask to send the reply."""
         if self._reply_to:
diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py
index 6898350e7..fd66133e9 100644
--- a/oslo_messaging/_drivers/impl_fake.py
+++ b/oslo_messaging/_drivers/impl_fake.py
@@ -38,6 +38,9 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
     def requeue(self):
         self.requeue_callback()
 
+    def heartbeat(self):
+        """Heartbeat is not supported."""
+
 
 class FakeListener(base.PollStyleListener):
 
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 007727c9f..c63fe1b0a 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -315,6 +315,9 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
     def reply(self, reply=None, failure=None):
         LOG.warning(_LW("reply is not supported"))
 
+    def heartbeat(self):
+        LOG.warning(_LW("heartbeat is not supported"))
+
 
 class KafkaListener(base.PollStyleListener):
 
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
index a7ddd091c..9810388de 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
@@ -36,3 +36,6 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
 
     def requeue(self):
         """Requeue is not supported."""
+
+    def heartbeat(self):
+        """Heartbeat is not supported."""