diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index a8825e4c8..61655deee 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -276,6 +276,11 @@ class KafkaListener(base.PollStyleListener): self.conn = conn self.incoming_queue = [] + # FIXME(sileht): We do a first poll to ensure we topics are created + # This is a workaround mainly for functional tests, in real life + # this is fine if topics are not created synchroneously + self.poll(5) + @base.batch_poll_helper def poll(self, timeout=None): while not self._stopped.is_set(): diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py index d8c0f4dba..c0459c8ec 100644 --- a/oslo_messaging/tests/utils.py +++ b/oslo_messaging/tests/utils.py @@ -69,9 +69,15 @@ class ServerThreadHelper(threading.Thread): self.daemon = True self._server = server self._stop_event = threading.Event() + self._start_event = threading.Event() + + def start(self): + super(ServerThreadHelper, self).start() + self._start_event.wait() def run(self): self._server.start() + self._start_event.set() self._stop_event.wait() # Check start() does nothing with a running listener self._server.start()