The executor doesn't need to set the timeout
It's up to the driver to set a suitable timeout for polling the broker, this one can be different that the one requested by the driver caller as long as the caller timeout is respected. This change also adds a new driver listener API, to be able to stop it cleanly, specially in case of timeout=None. Closes bug: #1400268 Closes bug: #1399257 Change-Id: I674c0def1efb420c293897d49683593a0b10e291
This commit is contained in:
parent
43a9dc1de5
commit
15aa5cbda8
oslo/messaging
_drivers
_executors
tests/executors
@ -17,7 +17,6 @@ __all__ = ['AMQPDriverBase']
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from six import moves
|
from six import moves
|
||||||
@ -94,6 +93,7 @@ class AMQPListener(base.Listener):
|
|||||||
self.conn = conn
|
self.conn = conn
|
||||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||||
self.incoming = []
|
self.incoming = []
|
||||||
|
self._stopped = threading.Event()
|
||||||
|
|
||||||
def __call__(self, message):
|
def __call__(self, message):
|
||||||
# FIXME(markmc): logging isn't driver specific
|
# FIXME(markmc): logging isn't driver specific
|
||||||
@ -110,23 +110,17 @@ class AMQPListener(base.Listener):
|
|||||||
ctxt.reply_q))
|
ctxt.reply_q))
|
||||||
|
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
if timeout is not None:
|
while not self._stopped.is_set():
|
||||||
deadline = time.time() + timeout
|
|
||||||
else:
|
|
||||||
deadline = None
|
|
||||||
while True:
|
|
||||||
if self.incoming:
|
if self.incoming:
|
||||||
return self.incoming.pop(0)
|
return self.incoming.pop(0)
|
||||||
if deadline is not None:
|
try:
|
||||||
timeout = deadline - time.time()
|
self.conn.consume(limit=1, timeout=timeout)
|
||||||
if timeout < 0:
|
except rpc_common.Timeout:
|
||||||
return None
|
return None
|
||||||
try:
|
|
||||||
self.conn.consume(limit=1, timeout=timeout)
|
def stop(self):
|
||||||
except rpc_common.Timeout:
|
self._stopped.set()
|
||||||
return None
|
self.conn.stop_consuming()
|
||||||
else:
|
|
||||||
self.conn.consume(limit=1)
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
# Closes listener connection
|
# Closes listener connection
|
||||||
|
@ -56,9 +56,15 @@ class Listener(object):
|
|||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
"""Blocking until a message is pending and return IncomingMessage.
|
"""Blocking until a message is pending and return IncomingMessage.
|
||||||
Return None after timeout seconds if timeout is set and no message is
|
Return None after timeout seconds if timeout is set and no message is
|
||||||
ending.
|
ending or if the listener have been stopped.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop listener.
|
||||||
|
Stop the listener message polling
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
"""Cleanup listener.
|
"""Cleanup listener.
|
||||||
Close connection used by listener if any. For some listeners like
|
Close connection used by listener if any. For some listeners like
|
||||||
|
@ -46,6 +46,7 @@ class FakeListener(base.Listener):
|
|||||||
self._exchange_manager = exchange_manager
|
self._exchange_manager = exchange_manager
|
||||||
self._targets = targets
|
self._targets = targets
|
||||||
self._pool = pool
|
self._pool = pool
|
||||||
|
self._stopped = threading.Event()
|
||||||
|
|
||||||
# NOTE(sileht): Ensure that all needed queues exists even the listener
|
# NOTE(sileht): Ensure that all needed queues exists even the listener
|
||||||
# have not been polled yet
|
# have not been polled yet
|
||||||
@ -58,7 +59,7 @@ class FakeListener(base.Listener):
|
|||||||
deadline = time.time() + timeout
|
deadline = time.time() + timeout
|
||||||
else:
|
else:
|
||||||
deadline = None
|
deadline = None
|
||||||
while True:
|
while not self._stopped.is_set():
|
||||||
for target in self._targets:
|
for target in self._targets:
|
||||||
exchange = self._exchange_manager.get_exchange(target.exchange)
|
exchange = self._exchange_manager.get_exchange(target.exchange)
|
||||||
(ctxt, message, reply_q, requeue) = exchange.poll(target,
|
(ctxt, message, reply_q, requeue) = exchange.poll(target,
|
||||||
@ -77,6 +78,9 @@ class FakeListener(base.Listener):
|
|||||||
time.sleep(pause)
|
time.sleep(pause)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stopped.set()
|
||||||
|
|
||||||
|
|
||||||
class FakeExchange(object):
|
class FakeExchange(object):
|
||||||
|
|
||||||
|
@ -460,6 +460,8 @@ class Connection(object):
|
|||||||
self.consumers = {}
|
self.consumers = {}
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
self.brokers_params = []
|
self.brokers_params = []
|
||||||
if url.hosts:
|
if url.hosts:
|
||||||
for host in url.hosts:
|
for host in url.hosts:
|
||||||
@ -651,8 +653,16 @@ class Connection(object):
|
|||||||
LOG.exception(_('Failed to consume message from queue: %s'), exc)
|
LOG.exception(_('Failed to consume message from queue: %s'), exc)
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
|
# NOTE(sileht):
|
||||||
|
# maximun value choosen according the best practice from kombu:
|
||||||
|
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
||||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
if self._consume_loop_stopped:
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
try:
|
try:
|
||||||
nxt_receiver = self.session.next_receiver(
|
nxt_receiver = self.session.next_receiver(
|
||||||
timeout=poll_timeout)
|
timeout=poll_timeout)
|
||||||
@ -745,6 +755,9 @@ class Connection(object):
|
|||||||
except StopIteration:
|
except StopIteration:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def stop_consuming(self):
|
||||||
|
self._consume_loop_stopped = True
|
||||||
|
|
||||||
|
|
||||||
class QpidDriver(amqpdriver.AMQPDriverBase):
|
class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||||
|
|
||||||
|
@ -497,6 +497,7 @@ class Connection(object):
|
|||||||
self._initial_pid = os.getpid()
|
self._initial_pid = os.getpid()
|
||||||
|
|
||||||
self.do_consume = True
|
self.do_consume = True
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
self.channel = None
|
self.channel = None
|
||||||
self.connection = kombu.connection.Connection(
|
self.connection = kombu.connection.Connection(
|
||||||
@ -715,8 +716,15 @@ class Connection(object):
|
|||||||
queues_tail.consume(nowait=False)
|
queues_tail.consume(nowait=False)
|
||||||
self.do_consume = False
|
self.do_consume = False
|
||||||
|
|
||||||
|
# NOTE(sileht):
|
||||||
|
# maximun value choosen according the best practice from kombu:
|
||||||
|
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
||||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||||
while True:
|
while True:
|
||||||
|
if self._consume_loop_stopped:
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self.connection.drain_events(timeout=poll_timeout)
|
return self.connection.drain_events(timeout=poll_timeout)
|
||||||
except socket.timeout as exc:
|
except socket.timeout as exc:
|
||||||
@ -790,6 +798,9 @@ class Connection(object):
|
|||||||
except StopIteration:
|
except StopIteration:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def stop_consuming(self):
|
||||||
|
self._consume_loop_stopped = True
|
||||||
|
|
||||||
|
|
||||||
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||||
|
|
||||||
|
@ -16,10 +16,6 @@ import abc
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
# NOTE(sileht): value choosen according the best practice from kombu
|
|
||||||
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
|
||||||
POLL_TIMEOUT = 1
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class ExecutorBase(object):
|
class ExecutorBase(object):
|
||||||
|
@ -42,7 +42,7 @@ class BlockingExecutor(base.ExecutorBase):
|
|||||||
self._running = True
|
self._running = True
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
|
incoming = self.listener.poll()
|
||||||
if incoming is not None:
|
if incoming is not None:
|
||||||
with self.dispatcher(incoming) as callback:
|
with self.dispatcher(incoming) as callback:
|
||||||
callback()
|
callback()
|
||||||
@ -51,6 +51,7 @@ class BlockingExecutor(base.ExecutorBase):
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self.listener.stop()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
pass
|
pass
|
||||||
|
@ -85,7 +85,7 @@ class EventletExecutor(base.ExecutorBase):
|
|||||||
def _executor_thread():
|
def _executor_thread():
|
||||||
try:
|
try:
|
||||||
while self._running:
|
while self._running:
|
||||||
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
|
incoming = self.listener.poll()
|
||||||
if incoming is not None:
|
if incoming is not None:
|
||||||
spawn_with(ctxt=self.dispatcher(incoming),
|
spawn_with(ctxt=self.dispatcher(incoming),
|
||||||
pool=self._greenpool)
|
pool=self._greenpool)
|
||||||
@ -99,6 +99,7 @@ class EventletExecutor(base.ExecutorBase):
|
|||||||
if self._thread is None:
|
if self._thread is None:
|
||||||
return
|
return
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self.listener.stop()
|
||||||
self._thread.cancel()
|
self._thread.cancel()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
@ -39,12 +39,10 @@ class TestExecutor(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def generate_scenarios(cls):
|
def generate_scenarios(cls):
|
||||||
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
|
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))]
|
||||||
stop_before_return=True))]
|
|
||||||
if impl_eventlet is not None:
|
if impl_eventlet is not None:
|
||||||
impl.append(
|
impl.append(
|
||||||
('eventlet', dict(executor=impl_eventlet.EventletExecutor,
|
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
|
||||||
stop_before_return=False)))
|
|
||||||
cls.scenarios = testscenarios.multiply_scenarios(impl)
|
cls.scenarios = testscenarios.multiply_scenarios(impl)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -72,13 +70,9 @@ class TestExecutor(test_utils.BaseTestCase):
|
|||||||
message={'payload': 'data'})
|
message={'payload': 'data'})
|
||||||
|
|
||||||
def fake_poll(timeout=None):
|
def fake_poll(timeout=None):
|
||||||
if self.stop_before_return:
|
if listener.poll.call_count == 1:
|
||||||
executor.stop()
|
|
||||||
return incoming_message
|
return incoming_message
|
||||||
else:
|
executor.stop()
|
||||||
if listener.poll.call_count == 1:
|
|
||||||
return incoming_message
|
|
||||||
executor.stop()
|
|
||||||
|
|
||||||
listener.poll.side_effect = fake_poll
|
listener.poll.side_effect = fake_poll
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user