diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index c5613b07a..539e48b0b 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -56,6 +56,13 @@ class MessageOperationsHandler(object): target=self._process_in_background) self._shutdown_thread.daemon = True + # HACK(sileht): this is set by the server.Server temporary + # to not have to rewrite the entire internal API to pass + # executor everywhere to make Listener aware of the server + # executor. All this hack is only for the blocking executor. + # And it's deprecated so... + self._executor = None + def stop(self): self._shutdown.set() @@ -85,9 +92,16 @@ class MessageOperationsHandler(object): def do(self, task): "Put the task in the queue and waits until the task is completed." - event = threading.Event() - self._tasks.put((task, event)) - event.wait() + if self._executor is None: + raise RuntimeError("Unexpected error, no executor is setuped") + elif self._executor == "blocking": + # NOTE(sileht): Blocking will hang forever if we waiting the + # polling thread + task() + else: + event = threading.Event() + self._tasks.put((task, event)) + event.wait() class AMQPIncomingMessage(base.RpcIncomingMessage): diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index d2e50ac46..c8e77a673 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -417,6 +417,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) + # HACK(sileht): We temporary pass the executor to the rabbit + # listener to fix a race with the deprecated blocking executor. + # We do this hack because this is need only for 'synchronous' + # executor like blocking. And this one is deprecated. Making + # driver working in an sync and an async way is complicated + # and blocking have 0% tests coverage. + if hasattr(self.listener, '_poll_style_listener'): + l = self.listener._poll_style_listener + if hasattr(l, "_message_operations_handler"): + l._message_operations_handler._executor = ( + self.executor_type) + self.listener.start(self._on_incoming) @ordered(after='start')