Merge "kafka: ensure topics are created"
This commit is contained in:
commit
4ec746dfba
oslo_messaging
@ -276,6 +276,11 @@ class KafkaListener(base.PollStyleListener):
|
|||||||
self.conn = conn
|
self.conn = conn
|
||||||
self.incoming_queue = []
|
self.incoming_queue = []
|
||||||
|
|
||||||
|
# FIXME(sileht): We do a first poll to ensure we topics are created
|
||||||
|
# This is a workaround mainly for functional tests, in real life
|
||||||
|
# this is fine if topics are not created synchroneously
|
||||||
|
self.poll(5)
|
||||||
|
|
||||||
@base.batch_poll_helper
|
@base.batch_poll_helper
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
while not self._stopped.is_set():
|
while not self._stopped.is_set():
|
||||||
|
@ -69,9 +69,15 @@ class ServerThreadHelper(threading.Thread):
|
|||||||
self.daemon = True
|
self.daemon = True
|
||||||
self._server = server
|
self._server = server
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
|
self._start_event = threading.Event()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
super(ServerThreadHelper, self).start()
|
||||||
|
self._start_event.wait()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self._server.start()
|
self._server.start()
|
||||||
|
self._start_event.set()
|
||||||
self._stop_event.wait()
|
self._stop_event.wait()
|
||||||
# Check start() does nothing with a running listener
|
# Check start() does nothing with a running listener
|
||||||
self._server.start()
|
self._server.start()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user