Merge "Fix rabbitmq driver with blocking executor"
This commit is contained in:
commit
570fb93a2e
oslo_messaging
@ -56,6 +56,13 @@ class MessageOperationsHandler(object):
|
||||
target=self._process_in_background)
|
||||
self._shutdown_thread.daemon = True
|
||||
|
||||
# HACK(sileht): this is set by the server.Server temporary
|
||||
# to not have to rewrite the entire internal API to pass
|
||||
# executor everywhere to make Listener aware of the server
|
||||
# executor. All this hack is only for the blocking executor.
|
||||
# And it's deprecated so...
|
||||
self._executor = None
|
||||
|
||||
def stop(self):
|
||||
self._shutdown.set()
|
||||
|
||||
@ -85,9 +92,16 @@ class MessageOperationsHandler(object):
|
||||
|
||||
def do(self, task):
|
||||
"Put the task in the queue and waits until the task is completed."
|
||||
event = threading.Event()
|
||||
self._tasks.put((task, event))
|
||||
event.wait()
|
||||
if self._executor is None:
|
||||
raise RuntimeError("Unexpected error, no executor is setuped")
|
||||
elif self._executor == "blocking":
|
||||
# NOTE(sileht): Blocking will hang forever if we waiting the
|
||||
# polling thread
|
||||
task()
|
||||
else:
|
||||
event = threading.Event()
|
||||
self._tasks.put((task, event))
|
||||
event.wait()
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
|
@ -417,6 +417,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ServerListenError(self.target, ex)
|
||||
|
||||
# HACK(sileht): We temporary pass the executor to the rabbit
|
||||
# listener to fix a race with the deprecated blocking executor.
|
||||
# We do this hack because this is need only for 'synchronous'
|
||||
# executor like blocking. And this one is deprecated. Making
|
||||
# driver working in an sync and an async way is complicated
|
||||
# and blocking have 0% tests coverage.
|
||||
if hasattr(self.listener, '_poll_style_listener'):
|
||||
l = self.listener._poll_style_listener
|
||||
if hasattr(l, "_message_operations_handler"):
|
||||
l._message_operations_handler._executor = (
|
||||
self.executor_type)
|
||||
|
||||
self.listener.start(self._on_incoming)
|
||||
|
||||
@ordered(after='start')
|
||||
|
Loading…
x
Reference in New Issue
Block a user