diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 511ea91dc..9935690d5 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -66,8 +66,8 @@ class AMQPIncomingMessage(base.IncomingMessage): class AMQPListener(base.Listener): - def __init__(self, driver, target, conn): - super(AMQPListener, self).__init__(driver, target) + def __init__(self, driver, conn): + super(AMQPListener, self).__init__(driver) self.conn = conn self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] @@ -395,7 +395,7 @@ class AMQPDriverBase(base.BaseDriver): def listen(self, target): conn = self._get_connection(pooled=False) - listener = AMQPListener(self, target, conn) + listener = AMQPListener(self, conn) conn.declare_topic_consumer(target.topic, listener) conn.declare_topic_consumer('%s.%s' % (target.topic, target.server), diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 15f8d32f6..b34997f4a 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -41,10 +41,9 @@ class IncomingMessage(object): @six.add_metaclass(abc.ABCMeta) class Listener(object): - def __init__(self, driver, target): + def __init__(self, driver): self.conf = driver.conf self.driver = driver - self.target = target @abc.abstractmethod def poll(self): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 913f6a739..06943f6a2 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -39,15 +39,17 @@ class FakeIncomingMessage(base.IncomingMessage): class FakeListener(base.Listener): - def __init__(self, driver, target, exchange): - super(FakeListener, self).__init__(driver, target) + def __init__(self, driver, exchange, targets): + super(FakeListener, self).__init__(driver) self._exchange = exchange + self._targets = targets def poll(self): while True: - (ctxt, message, reply_q) = self._exchange.poll(self.target) - if message is not None: - return FakeIncomingMessage(self, ctxt, message, reply_q) + for target in self._targets: + (ctxt, message, reply_q) = self._exchange.poll(target) + if message is not None: + return FakeIncomingMessage(self, ctxt, message, reply_q) time.sleep(.05) @@ -80,8 +82,9 @@ class FakeExchange(object): def poll(self, target): with self._queues_lock: - queue = self._get_server_queue(target.topic, target.server) - if not queue: + if target.server: + queue = self._get_server_queue(target.topic, target.server) + else: queue = self._get_topic_queue(target.topic) return queue.pop(0) if queue else (None, None, None) @@ -152,7 +155,11 @@ class FakeDriver(base.BaseDriver): exchange = self._get_exchange(target.exchange or self._default_exchange) - return FakeListener(self, target, exchange) + listener = FakeListener(self, exchange, + [messaging.Target(topic=target.topic, + server=target.server), + messaging.Target(topic=target.topic)]) + return listener def cleanup(self): pass diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index ca0ae8499..0e65f9e66 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -846,8 +846,8 @@ class ZmqIncomingMessage(base.IncomingMessage): class ZmqListener(base.Listener): - def __init__(self, driver, target): - super(ZmqListener, self).__init__(driver, target) + def __init__(self, driver): + super(ZmqListener, self).__init__(driver) self.incoming_queue = moves.queue.Queue() def dispatch(self, ctxt, version, method, namespace, **kwargs): @@ -948,7 +948,7 @@ class ZmqDriver(base.BaseDriver): def listen(self, target): conn = create_connection(self.conf) - listener = ZmqListener(self, target) + listener = ZmqListener(self) conn.create_consumer(target.topic, listener) conn.create_consumer('%s.%s' % (target.topic, target.server),