Merge "The executor doesn't need to set the timeout"
This commit is contained in:
commit
cb4c5bce0b
oslo/messaging
_drivers
_executors
tests/executors
@ -17,7 +17,6 @@ __all__ = ['AMQPDriverBase']
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from six import moves
|
from six import moves
|
||||||
@ -94,6 +93,7 @@ class AMQPListener(base.Listener):
|
|||||||
self.conn = conn
|
self.conn = conn
|
||||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||||
self.incoming = []
|
self.incoming = []
|
||||||
|
self._stopped = threading.Event()
|
||||||
|
|
||||||
def __call__(self, message):
|
def __call__(self, message):
|
||||||
# FIXME(markmc): logging isn't driver specific
|
# FIXME(markmc): logging isn't driver specific
|
||||||
@ -110,23 +110,17 @@ class AMQPListener(base.Listener):
|
|||||||
ctxt.reply_q))
|
ctxt.reply_q))
|
||||||
|
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
if timeout is not None:
|
while not self._stopped.is_set():
|
||||||
deadline = time.time() + timeout
|
|
||||||
else:
|
|
||||||
deadline = None
|
|
||||||
while True:
|
|
||||||
if self.incoming:
|
if self.incoming:
|
||||||
return self.incoming.pop(0)
|
return self.incoming.pop(0)
|
||||||
if deadline is not None:
|
try:
|
||||||
timeout = deadline - time.time()
|
self.conn.consume(limit=1, timeout=timeout)
|
||||||
if timeout < 0:
|
except rpc_common.Timeout:
|
||||||
return None
|
return None
|
||||||
try:
|
|
||||||
self.conn.consume(limit=1, timeout=timeout)
|
def stop(self):
|
||||||
except rpc_common.Timeout:
|
self._stopped.set()
|
||||||
return None
|
self.conn.stop_consuming()
|
||||||
else:
|
|
||||||
self.conn.consume(limit=1)
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
# Closes listener connection
|
# Closes listener connection
|
||||||
|
@ -56,9 +56,15 @@ class Listener(object):
|
|||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
"""Blocking until a message is pending and return IncomingMessage.
|
"""Blocking until a message is pending and return IncomingMessage.
|
||||||
Return None after timeout seconds if timeout is set and no message is
|
Return None after timeout seconds if timeout is set and no message is
|
||||||
ending.
|
ending or if the listener have been stopped.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop listener.
|
||||||
|
Stop the listener message polling
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
"""Cleanup listener.
|
"""Cleanup listener.
|
||||||
Close connection used by listener if any. For some listeners like
|
Close connection used by listener if any. For some listeners like
|
||||||
|
@ -46,6 +46,7 @@ class FakeListener(base.Listener):
|
|||||||
self._exchange_manager = exchange_manager
|
self._exchange_manager = exchange_manager
|
||||||
self._targets = targets
|
self._targets = targets
|
||||||
self._pool = pool
|
self._pool = pool
|
||||||
|
self._stopped = threading.Event()
|
||||||
|
|
||||||
# NOTE(sileht): Ensure that all needed queues exists even the listener
|
# NOTE(sileht): Ensure that all needed queues exists even the listener
|
||||||
# have not been polled yet
|
# have not been polled yet
|
||||||
@ -58,7 +59,7 @@ class FakeListener(base.Listener):
|
|||||||
deadline = time.time() + timeout
|
deadline = time.time() + timeout
|
||||||
else:
|
else:
|
||||||
deadline = None
|
deadline = None
|
||||||
while True:
|
while not self._stopped.is_set():
|
||||||
for target in self._targets:
|
for target in self._targets:
|
||||||
exchange = self._exchange_manager.get_exchange(target.exchange)
|
exchange = self._exchange_manager.get_exchange(target.exchange)
|
||||||
(ctxt, message, reply_q, requeue) = exchange.poll(target,
|
(ctxt, message, reply_q, requeue) = exchange.poll(target,
|
||||||
@ -77,6 +78,9 @@ class FakeListener(base.Listener):
|
|||||||
time.sleep(pause)
|
time.sleep(pause)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stopped.set()
|
||||||
|
|
||||||
|
|
||||||
class FakeExchange(object):
|
class FakeExchange(object):
|
||||||
|
|
||||||
|
@ -460,6 +460,8 @@ class Connection(object):
|
|||||||
self.consumers = {}
|
self.consumers = {}
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
self.brokers_params = []
|
self.brokers_params = []
|
||||||
if url.hosts:
|
if url.hosts:
|
||||||
for host in url.hosts:
|
for host in url.hosts:
|
||||||
@ -651,8 +653,16 @@ class Connection(object):
|
|||||||
LOG.exception(_('Failed to consume message from queue: %s'), exc)
|
LOG.exception(_('Failed to consume message from queue: %s'), exc)
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
|
# NOTE(sileht):
|
||||||
|
# maximun value choosen according the best practice from kombu:
|
||||||
|
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
||||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
if self._consume_loop_stopped:
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
try:
|
try:
|
||||||
nxt_receiver = self.session.next_receiver(
|
nxt_receiver = self.session.next_receiver(
|
||||||
timeout=poll_timeout)
|
timeout=poll_timeout)
|
||||||
@ -745,6 +755,9 @@ class Connection(object):
|
|||||||
except StopIteration:
|
except StopIteration:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def stop_consuming(self):
|
||||||
|
self._consume_loop_stopped = True
|
||||||
|
|
||||||
|
|
||||||
class QpidDriver(amqpdriver.AMQPDriverBase):
|
class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||||
|
|
||||||
|
@ -497,6 +497,7 @@ class Connection(object):
|
|||||||
self._initial_pid = os.getpid()
|
self._initial_pid = os.getpid()
|
||||||
|
|
||||||
self.do_consume = True
|
self.do_consume = True
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
self.channel = None
|
self.channel = None
|
||||||
self.connection = kombu.connection.Connection(
|
self.connection = kombu.connection.Connection(
|
||||||
@ -715,8 +716,15 @@ class Connection(object):
|
|||||||
queues_tail.consume(nowait=False)
|
queues_tail.consume(nowait=False)
|
||||||
self.do_consume = False
|
self.do_consume = False
|
||||||
|
|
||||||
|
# NOTE(sileht):
|
||||||
|
# maximun value choosen according the best practice from kombu:
|
||||||
|
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
||||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||||
while True:
|
while True:
|
||||||
|
if self._consume_loop_stopped:
|
||||||
|
self._consume_loop_stopped = False
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self.connection.drain_events(timeout=poll_timeout)
|
return self.connection.drain_events(timeout=poll_timeout)
|
||||||
except socket.timeout as exc:
|
except socket.timeout as exc:
|
||||||
@ -790,6 +798,9 @@ class Connection(object):
|
|||||||
except StopIteration:
|
except StopIteration:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def stop_consuming(self):
|
||||||
|
self._consume_loop_stopped = True
|
||||||
|
|
||||||
|
|
||||||
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||||
|
|
||||||
|
@ -16,10 +16,6 @@ import abc
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
# NOTE(sileht): value choosen according the best practice from kombu
|
|
||||||
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
|
||||||
POLL_TIMEOUT = 1
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class ExecutorBase(object):
|
class ExecutorBase(object):
|
||||||
|
@ -42,7 +42,7 @@ class BlockingExecutor(base.ExecutorBase):
|
|||||||
self._running = True
|
self._running = True
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
|
incoming = self.listener.poll()
|
||||||
if incoming is not None:
|
if incoming is not None:
|
||||||
with self.dispatcher(incoming) as callback:
|
with self.dispatcher(incoming) as callback:
|
||||||
callback()
|
callback()
|
||||||
@ -51,6 +51,7 @@ class BlockingExecutor(base.ExecutorBase):
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self.listener.stop()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
pass
|
pass
|
||||||
|
@ -85,7 +85,7 @@ class EventletExecutor(base.ExecutorBase):
|
|||||||
def _executor_thread():
|
def _executor_thread():
|
||||||
try:
|
try:
|
||||||
while self._running:
|
while self._running:
|
||||||
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
|
incoming = self.listener.poll()
|
||||||
if incoming is not None:
|
if incoming is not None:
|
||||||
spawn_with(ctxt=self.dispatcher(incoming),
|
spawn_with(ctxt=self.dispatcher(incoming),
|
||||||
pool=self._greenpool)
|
pool=self._greenpool)
|
||||||
@ -99,6 +99,7 @@ class EventletExecutor(base.ExecutorBase):
|
|||||||
if self._thread is None:
|
if self._thread is None:
|
||||||
return
|
return
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self.listener.stop()
|
||||||
self._thread.cancel()
|
self._thread.cancel()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
@ -39,12 +39,10 @@ class TestExecutor(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def generate_scenarios(cls):
|
def generate_scenarios(cls):
|
||||||
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
|
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))]
|
||||||
stop_before_return=True))]
|
|
||||||
if impl_eventlet is not None:
|
if impl_eventlet is not None:
|
||||||
impl.append(
|
impl.append(
|
||||||
('eventlet', dict(executor=impl_eventlet.EventletExecutor,
|
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
|
||||||
stop_before_return=False)))
|
|
||||||
cls.scenarios = testscenarios.multiply_scenarios(impl)
|
cls.scenarios = testscenarios.multiply_scenarios(impl)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -72,13 +70,9 @@ class TestExecutor(test_utils.BaseTestCase):
|
|||||||
message={'payload': 'data'})
|
message={'payload': 'data'})
|
||||||
|
|
||||||
def fake_poll(timeout=None):
|
def fake_poll(timeout=None):
|
||||||
if self.stop_before_return:
|
if listener.poll.call_count == 1:
|
||||||
executor.stop()
|
|
||||||
return incoming_message
|
return incoming_message
|
||||||
else:
|
executor.stop()
|
||||||
if listener.poll.call_count == 1:
|
|
||||||
return incoming_message
|
|
||||||
executor.stop()
|
|
||||||
|
|
||||||
listener.poll.side_effect = fake_poll
|
listener.poll.side_effect = fake_poll
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user