rabbit: Add logging on blocked connection
When the broker will block the connection for a server-side issue like disk full, it notifies the client. This change adds the callback methods when this occurs to inform the deployer about the reason of this blocking. Change-Id: I5164b9e1b720f022b45a5718258df036ba8808ed Closes-bug: #1454449
This commit is contained in:
parent
85c069e154
commit
1f8ccd3ac5
@ -477,7 +477,12 @@ class Connection(object):
|
|||||||
login_method=self.login_method,
|
login_method=self.login_method,
|
||||||
failover_strategy="shuffle",
|
failover_strategy="shuffle",
|
||||||
heartbeat=self.heartbeat_timeout_threshold,
|
heartbeat=self.heartbeat_timeout_threshold,
|
||||||
transport_options={'confirm_publish': True})
|
transport_options={
|
||||||
|
'confirm_publish': True,
|
||||||
|
'on_blocked': self._on_connection_blocked,
|
||||||
|
'on_unblocked': self._on_connection_unblocked,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
|
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
|
||||||
self.connection.info())
|
self.connection.info())
|
||||||
@ -581,6 +586,14 @@ class Connection(object):
|
|||||||
return ssl_params or True
|
return ssl_params or True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _on_connection_blocked(reason):
|
||||||
|
LOG.error(_LE("The broker has blocked the connection: %s"), reason)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _on_connection_unblocked():
|
||||||
|
LOG.info(_LI("The broker has unblocked the connection"))
|
||||||
|
|
||||||
def ensure_connection(self):
|
def ensure_connection(self):
|
||||||
self.ensure(method=lambda: True)
|
self.ensure(method=lambda: True)
|
||||||
|
|
||||||
@ -829,7 +842,7 @@ class Connection(object):
|
|||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': consumer.routing_key, 'err_str': exc}
|
log_info = {'topic': consumer.routing_key, 'err_str': exc}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s"), log_info)
|
"%(err_str)s"), log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
consumer.declare(self)
|
consumer.declare(self)
|
||||||
|
@ -169,7 +169,9 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
transport._driver._get_connection()
|
transport._driver._get_connection()
|
||||||
connection_klass.assert_called_once_with(
|
connection_klass.assert_called_once_with(
|
||||||
'memory:///', transport_options={'confirm_publish': True},
|
'memory:///', transport_options={'confirm_publish': True,
|
||||||
|
'on_blocked': mock.ANY,
|
||||||
|
'on_unblocked': mock.ANY},
|
||||||
ssl=self.expected, login_method='AMQPLAIN',
|
ssl=self.expected, login_method='AMQPLAIN',
|
||||||
heartbeat=0, failover_strategy="shuffle")
|
heartbeat=0, failover_strategy="shuffle")
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user