rabbit: Set timeout on the underlying socket
They are some case where the underlying can be stuck until the system socket timeout is reached, but in oslo.messaging we very often known that is not needed to wait for ever because the upper layer (usualy the application) expect to return after a certain period. So this change set the timeout on the underlying socket each we can determine that is not needed to wait more. Closes-bug: #1436788 Change-Id: Ie71ab8147c56eaf672585da107bec8b22af9da6c
This commit is contained in:
parent
8af6b2ff82
commit
77f952a1f7
@ -282,7 +282,21 @@ class Publisher(object):
|
||||
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
|
||||
headers['ttl'] = timeout * 1000
|
||||
|
||||
producer.publish(msg, headers=headers)
|
||||
# NOTE(sileht): no need to wait more, caller expects
|
||||
# a answer before timeout is reached
|
||||
transport_timeout = timeout
|
||||
|
||||
heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold
|
||||
if (conn._heartbeat_supported_and_enabled() and (
|
||||
transport_timeout is None or
|
||||
transport_timeout > heartbeat_timeout)):
|
||||
# NOTE(sileht): we are supposed to send heartbeat every
|
||||
# heartbeat_timeout, no need to wait more otherwise will
|
||||
# disconnect us, so raise timeout earlier ourself
|
||||
transport_timeout = heartbeat_timeout
|
||||
|
||||
with conn._transport_socket_timeout(transport_timeout):
|
||||
producer.publish(msg, headers=headers)
|
||||
|
||||
|
||||
class DeclareQueuePublisher(Publisher):
|
||||
@ -580,10 +594,14 @@ class Connection(object):
|
||||
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)s'),
|
||||
self.connection.info())
|
||||
|
||||
# NOTE(sileht):
|
||||
# value choosen according the best practice from kombu:
|
||||
# NOTE(sileht): value choosen according the best practice from kombu
|
||||
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
||||
self._poll_timeout = 1
|
||||
# For heatbeat, we can set a bigger timeout, and check we receive the
|
||||
# heartbeat packets regulary
|
||||
if self._heartbeat_supported_and_enabled():
|
||||
self._poll_timeout = self._heartbeat_wait_timeout
|
||||
else:
|
||||
self._poll_timeout = 1
|
||||
|
||||
if self._url.startswith('memory://'):
|
||||
# Kludge to speed up tests.
|
||||
@ -814,6 +832,28 @@ class Connection(object):
|
||||
self._heartbeat_support_log_emitted = True
|
||||
return False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _transport_socket_timeout(self, timeout):
|
||||
# NOTE(sileht): they are some case where the heartbeat check
|
||||
# or the producer.send return only when the system socket
|
||||
# timeout if reach. kombu doesn't allow use to customise this
|
||||
# timeout so for py-amqp we tweak ourself
|
||||
sock = getattr(self.connection.transport, 'sock', None)
|
||||
if sock:
|
||||
orig_timeout = sock.gettimeout()
|
||||
sock.settimeout(timeout)
|
||||
yield
|
||||
if sock:
|
||||
sock.settimeout(orig_timeout)
|
||||
|
||||
def _heartbeat_check(self):
|
||||
# NOTE(sileht): we are suposed to send at least one heartbeat
|
||||
# every heartbeat_timeout_threshold, so no need to way more
|
||||
with self._transport_socket_timeout(
|
||||
self.driver_conf.heartbeat_timeout_threshold):
|
||||
self.connection.heartbeat_check(
|
||||
rate=self.driver_conf.heartbeat_rate)
|
||||
|
||||
def _heartbeat_start(self):
|
||||
if self._heartbeat_supported_and_enabled():
|
||||
self._heartbeat_exit_event = threading.Event()
|
||||
@ -842,8 +882,7 @@ class Connection(object):
|
||||
|
||||
try:
|
||||
try:
|
||||
self.connection.heartbeat_check(
|
||||
rate=self.driver_conf.heartbeat_rate)
|
||||
self._heartbeat_check()
|
||||
# NOTE(sileht): We need to drain event to receive
|
||||
# heartbeat from the broker but don't hold the
|
||||
# connection too much times. In amqpdriver a connection
|
||||
@ -927,8 +966,8 @@ class Connection(object):
|
||||
return
|
||||
|
||||
if self._heartbeat_supported_and_enabled():
|
||||
self.connection.heartbeat_check(
|
||||
rate=self.driver_conf.heartbeat_rate)
|
||||
self._heartbeat_check()
|
||||
|
||||
try:
|
||||
self.connection.drain_events(timeout=poll_timeout)
|
||||
return
|
||||
|
Loading…
x
Reference in New Issue
Block a user