Merge "Properly reconnect subscribing clients when QPID broker restarts"
This commit is contained in:
commit
e58636c26a
@ -17,7 +17,6 @@ import functools
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
@ -123,7 +122,6 @@ class ConsumerBase(object):
|
||||
},
|
||||
},
|
||||
"link": {
|
||||
"name": link_name,
|
||||
"durable": True,
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
@ -132,6 +130,8 @@ class ConsumerBase(object):
|
||||
},
|
||||
},
|
||||
}
|
||||
if link_name:
|
||||
addr_opts["link"]["name"] = link_name
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
addr_opts = {
|
||||
@ -278,30 +278,16 @@ class FanoutConsumer(ConsumerBase):
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s_fanout" % topic
|
||||
node_opts = {"durable": False, "type": "fanout"}
|
||||
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/fanout/%s" % topic
|
||||
node_opts = {}
|
||||
link_name = ""
|
||||
else:
|
||||
raise_invalid_topology_version(conf)
|
||||
|
||||
super(FanoutConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, link_name,
|
||||
node_name, node_opts, None,
|
||||
link_opts)
|
||||
|
||||
def reconnect(self, session):
|
||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
||||
params = {
|
||||
'session': session,
|
||||
'topic': topic,
|
||||
'callback': self.callback,
|
||||
}
|
||||
|
||||
self.__init__(conf=self.conf, **params)
|
||||
|
||||
super(FanoutConsumer, self).reconnect(session)
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
"""Base Publisher class."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user