Merge "Add parameter to customize Qpid receiver capacity"
This commit is contained in:
commit
ecddb22cc5
@ -69,6 +69,9 @@ qpid_opts = [
|
|||||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||||
default=True,
|
default=True,
|
||||||
help='Whether to disable the Nagle algorithm.'),
|
help='Whether to disable the Nagle algorithm.'),
|
||||||
|
cfg.IntOpt('qpid_receiver_capacity',
|
||||||
|
default=1,
|
||||||
|
help='The number of prefetched messages held by receiver.'),
|
||||||
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
||||||
# this file could probably use some additional refactoring so that the
|
# this file could probably use some additional refactoring so that the
|
||||||
# differences between each version are split into different classes.
|
# differences between each version are split into different classes.
|
||||||
@ -125,6 +128,7 @@ class ConsumerBase(object):
|
|||||||
"""
|
"""
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
self.receiver = None
|
self.receiver = None
|
||||||
|
self.rcv_capacity = conf.qpid_receiver_capacity
|
||||||
self.session = None
|
self.session = None
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
if conf.qpid_topology_version == 1:
|
||||||
@ -178,7 +182,7 @@ class ConsumerBase(object):
|
|||||||
def _declare_receiver(self, session):
|
def _declare_receiver(self, session):
|
||||||
self.session = session
|
self.session = session
|
||||||
self.receiver = session.receiver(self.address)
|
self.receiver = session.receiver(self.address)
|
||||||
self.receiver.capacity = 1
|
self.receiver.capacity = self.rcv_capacity
|
||||||
|
|
||||||
def _unpack_json_msg(self, msg):
|
def _unpack_json_msg(self, msg):
|
||||||
"""Load the JSON data in msg if msg.content_type indicates that it
|
"""Load the JSON data in msg if msg.content_type indicates that it
|
||||||
|
Loading…
x
Reference in New Issue
Block a user