Merge "kafka: Remove Producer singleton"
This commit is contained in:
commit
654be38629
oslo_messaging
@ -115,35 +115,6 @@ def with_reconnect(retries=None):
|
||||
return decorator
|
||||
|
||||
|
||||
class Producer(object):
|
||||
_producer = None
|
||||
_servers = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
@staticmethod
|
||||
@with_reconnect()
|
||||
def connect(servers, **kwargs):
|
||||
return kafka.KafkaProducer(
|
||||
bootstrap_servers=servers,
|
||||
selector=KAFKA_SELECTOR,
|
||||
**kwargs)
|
||||
|
||||
@classmethod
|
||||
def producer(cls, servers, **kwargs):
|
||||
with cls._lock:
|
||||
if not cls._producer or cls._servers != servers:
|
||||
cls._servers = servers
|
||||
cls._producer = cls.connect(servers, **kwargs)
|
||||
return cls._producer
|
||||
|
||||
@classmethod
|
||||
def cleanup(cls):
|
||||
with cls._lock:
|
||||
if cls._producer:
|
||||
cls._producer.close()
|
||||
cls._producer = None
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self, conf, url, purpose):
|
||||
@ -154,6 +125,7 @@ class Connection(object):
|
||||
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
||||
self.conf = conf
|
||||
self.producer = None
|
||||
self.producer_lock = threading.Lock()
|
||||
self.consumer = None
|
||||
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
|
||||
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
||||
@ -189,25 +161,24 @@ class Connection(object):
|
||||
:param msg: messages for publishing
|
||||
:param retry: the number of retry
|
||||
"""
|
||||
|
||||
message = pack_message(ctxt, msg)
|
||||
self._ensure_connection()
|
||||
self._send_and_retry(message, topic, retry)
|
||||
|
||||
def _send_and_retry(self, message, topic, retry):
|
||||
if not isinstance(message, str):
|
||||
message = jsonutils.dumps(message)
|
||||
retry = retry if retry >= 0 else None
|
||||
message = pack_message(ctxt, msg)
|
||||
message = jsonutils.dumps(message)
|
||||
|
||||
@with_reconnect(retries=retry)
|
||||
def _send(topic, message):
|
||||
def wrapped_with_reconnect():
|
||||
self._ensure_producer()
|
||||
# NOTE(sileht): This returns a future, we can use get()
|
||||
# if we want to block like other driver
|
||||
self.producer.send(topic, message)
|
||||
|
||||
try:
|
||||
_send(topic, message)
|
||||
wrapped_with_reconnect()
|
||||
except Exception:
|
||||
Producer.cleanup()
|
||||
LOG.exception(_LE("Failed to send message"))
|
||||
# NOTE(sileht): if something goes wrong close the producer
|
||||
# connection
|
||||
self._close_producer()
|
||||
raise
|
||||
|
||||
@with_reconnect()
|
||||
def _poll_messages(self, timeout):
|
||||
@ -239,12 +210,10 @@ class Connection(object):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
if self.producer:
|
||||
self.producer.close()
|
||||
self.producer = None
|
||||
self._close_producer()
|
||||
if self.consumer:
|
||||
self.consumer.close()
|
||||
self.consumer = None
|
||||
self.consumer = None
|
||||
|
||||
def commit(self):
|
||||
"""Commit is used by subscribers belonging to the same group.
|
||||
@ -257,14 +226,23 @@ class Connection(object):
|
||||
"""
|
||||
self.consumer.commit()
|
||||
|
||||
def _ensure_connection(self):
|
||||
try:
|
||||
self.producer = Producer.producer(self.hostaddrs,
|
||||
linger_ms=self.linger_ms,
|
||||
batch_size=self.batch_size)
|
||||
except kafka.errors.KafkaError as e:
|
||||
LOG.exception(_LE("KafkaProducer could not be initialized: %s"), e)
|
||||
raise
|
||||
def _close_producer(self):
|
||||
with self.producer_lock:
|
||||
if self.producer:
|
||||
self.producer.close()
|
||||
self.producer = None
|
||||
|
||||
def _ensure_producer(self):
|
||||
if self.producer:
|
||||
return
|
||||
with self.producer_lock:
|
||||
if self.producer:
|
||||
return
|
||||
self.producer = kafka.KafkaProducer(
|
||||
bootstrap_servers=self.hostaddrs,
|
||||
linger_ms=self.linger_ms,
|
||||
batch_size=self.batch_size,
|
||||
selector=KAFKA_SELECTOR)
|
||||
|
||||
@with_reconnect()
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
|
@ -74,7 +74,6 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
self.addCleanup(kafka_driver.Producer.cleanup)
|
||||
|
||||
def test_send(self):
|
||||
target = oslo_messaging.Target(topic="topic_test")
|
||||
@ -87,8 +86,10 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
with mock.patch("kafka.KafkaProducer") as fake_producer_class:
|
||||
fake_producer = fake_producer_class.return_value
|
||||
fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable
|
||||
self.driver.send_notification(target, {}, {"payload": ["test_1"]},
|
||||
None, retry=3)
|
||||
self.assertRaises(kafka.errors.NoBrokersAvailable,
|
||||
self.driver.send_notification,
|
||||
target, {}, {"payload": ["test_1"]},
|
||||
None, retry=3)
|
||||
self.assertEqual(3, fake_producer.send.call_count)
|
||||
|
||||
def test_listen(self):
|
||||
@ -127,10 +128,11 @@ class TestKafkaConnection(test_utils.BaseTestCase):
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_send_and_retry')
|
||||
def test_notify(self, fake_send, fake_ensure_connection):
|
||||
def test_notify(self):
|
||||
conn = self.driver._get_connection(common_driver.PURPOSE_SEND)
|
||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_1"}, 10)
|
||||
self.assertEqual(1, len(fake_send.mock_calls))
|
||||
|
||||
with mock.patch("kafka.KafkaProducer") as fake_producer_class:
|
||||
fake_producer = fake_producer_class.return_value
|
||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_1"}, 10)
|
||||
self.assertEqual(1, len(fake_producer.send.mock_calls))
|
||||
|
Loading…
x
Reference in New Issue
Block a user