diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 43e0cec5d..0a8c02ca2 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -38,6 +38,11 @@ from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils +import logging as l +l.basicConfig(level=l.INFO) +l.getLogger("kafka").setLevel(l.WARN) +l.getLogger("stevedore").setLevel(l.WARN) + if eventletutils.is_monkey_patched('select'): # monkeypatch the vendored SelectSelector._select like eventlet does # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32 @@ -55,18 +60,11 @@ LOG = logging.getLogger(__name__) def unpack_message(msg): context = {} message = None - try: - if msg: - msg = json.loads(msg) - message = driver_common.deserialize_msg(msg) - if 'context' in message: - context = message['context'] - del message['context'] - except ValueError as e: - LOG.info("Invalid format of consumed message: %s" % e) - except Exception: - LOG.warning(_LW("Exception during message unpacking")) - return message, context + msg = json.loads(msg) + message = driver_common.deserialize_msg(msg) + context = message['_context'] + del message['_context'] + return context, message def pack_message(ctxt, msg): @@ -76,7 +74,7 @@ def pack_message(ctxt, msg): context_d = ctxt else: context_d = ctxt.to_dict() - msg['context'] = context_d + msg['_context'] = context_d msg = driver_common.serialize_msg(msg) @@ -181,7 +179,15 @@ class Connection(object): @with_reconnect() def _poll_messages(self, timeout): - return self.consumer.poll(timeout * 1000.0) + messages = self.consumer.poll(timeout * 1000.0) + messages = [record.value + for records in messages.values() if records + for record in records] + if not messages: + # NOTE(sileht): really ? you return payload but no messages... + # simulate timeout to consume message again + raise kafka.errors.ConsumerTimeout() + return messages def consume(self, timeout=None): """Receive up to 'max_fetch_messages' messages. @@ -275,26 +281,17 @@ class KafkaListener(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): - # TODO(sileht): use batch capability of kafka while not self._stopped.is_set(): if self.incoming_queue: return self.incoming_queue.pop(0) try: - messages = self.conn.consume(timeout=timeout) - if messages: - self._put_messages_to_queue(messages) + messages = self.conn.consume(timeout=timeout) or [] + for message in messages: + msg = OsloKafkaMessage(*unpack_message(message)) + self.incoming_queue.append(msg) except driver_common.Timeout: return None - def _put_messages_to_queue(self, messages): - for topic, records in messages.items(): - if records: - for record in records: - message, context = unpack_message(record.value) - if message: - self.incoming_queue.append( - OsloKafkaMessage(ctxt=context, message=message)) - def stop(self): self._stopped.set() self.conn.stop_consuming()