Adds comment for pika_pooler.py
Also this patch: 1) fixed import's order 2) removes PikaDriverCompatibleWithOldRabbit (tests, show that after retry implementation all works fine) Change-Id: Ib34f6db569cadb5c27d8865f13ba32ef9a6c73e9
This commit is contained in:
parent
438a808c91
commit
bee303cf6f
oslo_messaging/_drivers
setup.cfg@ -12,8 +12,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_messaging import exceptions
|
||||
import pika_pool
|
||||
import retrying
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
@ -21,13 +26,6 @@ from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
import pika_pool
|
||||
|
||||
import retrying
|
||||
import time
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
pika_opts = [
|
||||
@ -144,7 +142,6 @@ class PikaDriver(object):
|
||||
conf.register_opts(notification_opts, group=opt_group)
|
||||
|
||||
self.conf = conf
|
||||
self._allowed_remote_exmods = allowed_remote_exmods
|
||||
|
||||
self._pika_engine = pika_drv_engine.PikaEngine(
|
||||
conf, url, default_exchange, allowed_remote_exmods
|
||||
@ -277,27 +274,3 @@ class PikaDriver(object):
|
||||
|
||||
def cleanup(self):
|
||||
self._reply_listener.cleanup()
|
||||
|
||||
|
||||
class PikaDriverCompatibleWithRabbitDriver(PikaDriver):
|
||||
"""Old RabbitMQ driver creates exchange before sending message.
|
||||
In this case if no rpc service listen this exchange message will be sent
|
||||
to /dev/null but client will know anything about it. That is strange.
|
||||
But for now we need to keep original behaviour
|
||||
"""
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
try:
|
||||
return super(PikaDriverCompatibleWithRabbitDriver, self).send(
|
||||
target=target,
|
||||
ctxt=ctxt,
|
||||
message=message,
|
||||
wait_for_reply=wait_for_reply,
|
||||
timeout=timeout,
|
||||
retry=retry
|
||||
)
|
||||
except exceptions.MessageDeliveryFailure:
|
||||
if wait_for_reply:
|
||||
raise exceptions.MessagingTimeout()
|
||||
else:
|
||||
return None
|
||||
|
@ -11,22 +11,20 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
import pika
|
||||
from pika.adapters import select_connection
|
||||
from pika import credentials as pika_credentials
|
||||
|
||||
import pika_pool
|
||||
|
||||
import six
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import threading
|
||||
import time
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -12,15 +12,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_log import log as logging
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -111,13 +111,16 @@ class RpcReplyPikaListener(object):
|
||||
except BaseException:
|
||||
LOG.exception("Unexpected exception during reply polling")
|
||||
|
||||
def register_reply_waiter(self, msg_id, future):
|
||||
def register_reply_waiter(self, msg_id):
|
||||
"""Register reply waiter. Should be called before message sending to
|
||||
the server
|
||||
:param msg_id: String, message_id of expected reply
|
||||
:param future: Future, container for expected reply to be returned over
|
||||
:return future: Future, container for expected reply to be returned
|
||||
over
|
||||
"""
|
||||
future = futures.Future()
|
||||
self._reply_waiting_futures[msg_id] = future
|
||||
return future
|
||||
|
||||
def unregister_reply_waiter(self, msg_id):
|
||||
"""Unregister reply waiter. Should be called if client has not got
|
||||
|
@ -564,11 +564,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
msg_dict["_reply_q"] = reply_listener.get_reply_qname(
|
||||
expiration_time - time.time()
|
||||
)
|
||||
future = futures.Future()
|
||||
|
||||
reply_listener.register_reply_waiter(
|
||||
msg_id=msg_id, future=future
|
||||
)
|
||||
future = reply_listener.register_reply_waiter(msg_id=msg_id)
|
||||
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
|
||||
|
@ -13,21 +13,32 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
from oslo_log import log as logging
|
||||
import pika_pool
|
||||
import retrying
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PikaPoller(object):
|
||||
"""Provides user friendly functionality for RabbitMQ message consuming,
|
||||
handles low level connectivity problems and restore connection if some
|
||||
connectivity related problem detected
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, prefetch_count):
|
||||
"""Initialize required fields
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._pika_engine = pika_engine
|
||||
|
||||
self._connection = None
|
||||
@ -43,6 +54,9 @@ class PikaPoller(object):
|
||||
self._message_queue = collections.deque()
|
||||
|
||||
def _reconnect(self):
|
||||
"""Performs reconnection to the broker. It is unsafe method for
|
||||
internal use only
|
||||
"""
|
||||
self._connection = self._pika_engine.create_connection(
|
||||
for_listening=True
|
||||
)
|
||||
@ -50,17 +64,31 @@ class PikaPoller(object):
|
||||
self._channel.basic_qos(prefetch_count=self._prefetch_count)
|
||||
|
||||
if self._queues_to_consume is None:
|
||||
self._declare_queue_binding()
|
||||
self._queues_to_consume = self._declare_queue_binding()
|
||||
|
||||
for queue, no_ack in self._queues_to_consume.iteritems():
|
||||
self._start_consuming(queue, no_ack)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Is called by recovering connection logic if target RabbitMQ
|
||||
exchange and (or) queue do not exist. Should be overridden in child
|
||||
classes
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"It is base class. Please declare exchanges and queues here"
|
||||
)
|
||||
|
||||
def _start_consuming(self, queue, no_ack):
|
||||
"""Is called by recovering connection logic for starting consumption
|
||||
of the RabbitMQ queue
|
||||
|
||||
:param queue: String, RabbitMQ queue name for consuming
|
||||
:param no_ack: Boolean, Choose consuming acknowledgement mode. If True,
|
||||
acknowledges are not needed. RabbitMQ considers message consumed
|
||||
after sending it to consumer immediately
|
||||
"""
|
||||
on_message_no_ack_callback = (
|
||||
self._on_message_no_ack_callback if no_ack
|
||||
else self._on_message_with_ack_callback
|
||||
@ -74,16 +102,25 @@ class PikaPoller(object):
|
||||
raise
|
||||
|
||||
def _on_message_no_ack_callback(self, unused, method, properties, body):
|
||||
"""Is called by Pika when message was received from queue listened with
|
||||
no_ack=True mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
(self._channel, method, properties, body, True)
|
||||
)
|
||||
|
||||
def _on_message_with_ack_callback(self, unused, method, properties, body):
|
||||
"""Is called by Pika when message was received from queue listened with
|
||||
no_ack=False mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
(self._channel, method, properties, body, False)
|
||||
)
|
||||
|
||||
def _cleanup(self):
|
||||
"""Cleanup allocated resources (channel, connection, etc). It is unsafe
|
||||
method for internal use only
|
||||
"""
|
||||
if self._channel:
|
||||
try:
|
||||
self._channel.close()
|
||||
@ -101,6 +138,13 @@ class PikaPoller(object):
|
||||
self._connection = None
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Main method of this class - consumes message from RabbitMQ
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: tuple, RabbitMQ message related data
|
||||
(channel, method, properties, body, no_ack)
|
||||
"""
|
||||
expiration_time = time.time() + timeout if timeout else None
|
||||
|
||||
while not self._message_queue:
|
||||
@ -129,9 +173,16 @@ class PikaPoller(object):
|
||||
return self._message_queue.popleft()
|
||||
|
||||
def start(self):
|
||||
"""Starts poller. Should be called before polling to allow message
|
||||
consuming
|
||||
"""
|
||||
self._started = True
|
||||
|
||||
def stop(self):
|
||||
"""Stops poller. Should be called when polling is not needed anymore to
|
||||
stop new message consuming. After that it is necessary to poll already
|
||||
prefetched messages
|
||||
"""
|
||||
with self._lock:
|
||||
if not self._started:
|
||||
return
|
||||
@ -140,6 +191,7 @@ class PikaPoller(object):
|
||||
self._cleanup()
|
||||
|
||||
def reconnect(self):
|
||||
"""Safe version of _reconnect. Performs reconnection to the broker."""
|
||||
with self._lock:
|
||||
self._cleanup()
|
||||
try:
|
||||
@ -149,18 +201,39 @@ class PikaPoller(object):
|
||||
raise
|
||||
|
||||
def cleanup(self):
|
||||
"""Safe version of _cleanup. Cleans up allocated resources (channel,
|
||||
connection, etc).
|
||||
"""
|
||||
with self._lock:
|
||||
self._cleanup()
|
||||
|
||||
|
||||
class RpcServicePikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling RPC messages. Overrides base
|
||||
functionality according to RPC specific
|
||||
"""
|
||||
def __init__(self, pika_engine, target, prefetch_count):
|
||||
"""Adds target parameter for declaring RPC specific exchanges and
|
||||
queues
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param target: Target, oslo.messaging Target object which defines RPC
|
||||
endpoint
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._target = target
|
||||
|
||||
super(RpcServicePikaPoller, self).__init__(
|
||||
pika_engine, prefetch_count=prefetch_count)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchanges
|
||||
and queues which correspond to oslo.messaging RPC target
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
queue_expiration = (
|
||||
self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
|
||||
)
|
||||
@ -199,9 +272,16 @@ class RpcServicePikaPoller(PikaPoller):
|
||||
queue=server_queue, routing_key="", exchange_type='fanout',
|
||||
queue_expiration=queue_expiration
|
||||
)
|
||||
self._queues_to_consume = queues_to_consume
|
||||
return queues_to_consume
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
RpcPikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: RpcPikaIncomingMessage, consumed RPC message
|
||||
"""
|
||||
msg = super(RpcServicePikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
@ -211,7 +291,20 @@ class RpcServicePikaPoller(PikaPoller):
|
||||
|
||||
|
||||
class RpcReplyPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling RPC reply messages. Overrides
|
||||
base functionality according to RPC reply specific
|
||||
"""
|
||||
def __init__(self, pika_engine, exchange, queue, prefetch_count):
|
||||
"""Adds exchange and queue parameter for declaring exchange and queue
|
||||
used for RPC reply delivery
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param exchange: String, exchange name used for RPC reply delivery
|
||||
:param queue: String, queue name used for RPC reply delivery
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._exchange = exchange
|
||||
self._queue = queue
|
||||
|
||||
@ -220,6 +313,11 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchange
|
||||
and queue used for RPC reply delivery
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
queue_expiration = (
|
||||
self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
|
||||
)
|
||||
@ -232,9 +330,15 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
durable=False
|
||||
)
|
||||
|
||||
self._queues_to_consume = {self._queue: False}
|
||||
return {self._queue: False}
|
||||
|
||||
def start(self, timeout=None):
|
||||
"""Overrides default behaviour of start method. Base start method
|
||||
does not create connection to RabbitMQ during start method (uses
|
||||
lazy connecting during first poll method call). This class should be
|
||||
connected after start call to ensure that exchange and queue for reply
|
||||
delivery are created before RPC request sending
|
||||
"""
|
||||
super(RpcReplyPikaPoller, self).start()
|
||||
|
||||
def on_exception(ex):
|
||||
@ -252,6 +356,13 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
retrier(self.reconnect)()
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
RpcReplyPikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: RpcReplyPikaIncomingMessage, consumed RPC reply message
|
||||
"""
|
||||
msg = super(RpcReplyPikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
@ -261,8 +372,23 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
|
||||
|
||||
class NotificationPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling Notification messages. Overrides
|
||||
base functionality according to Notification specific
|
||||
"""
|
||||
def __init__(self, pika_engine, targets_and_priorities,
|
||||
queue_name=None, prefetch_count=100):
|
||||
"""Adds exchange and queue parameter for declaring exchange and queue
|
||||
used for RPC reply delivery
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param targets_and_priorities: list of (target, priority), defines
|
||||
default queue names for corresponding notification types
|
||||
:param queue: String, alternative queue name used for this poller
|
||||
instead of default queue name
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._targets_and_priorities = targets_and_priorities
|
||||
self._queue_name = queue_name
|
||||
|
||||
@ -271,6 +397,11 @@ class NotificationPikaPoller(PikaPoller):
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchanges
|
||||
and queues used for notification delivery
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
queues_to_consume = {}
|
||||
for target, priority in self._targets_and_priorities:
|
||||
routing_key = '%s.%s' % (target.topic, priority)
|
||||
@ -289,9 +420,16 @@ class NotificationPikaPoller(PikaPoller):
|
||||
)
|
||||
queues_to_consume[queue] = False
|
||||
|
||||
self._queues_to_consume = queues_to_consume
|
||||
return queues_to_consume
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
PikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: PikaIncomingMessage, consumed Notification message
|
||||
"""
|
||||
msg = super(NotificationPikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
|
@ -35,7 +35,7 @@ oslo.messaging.drivers =
|
||||
|
||||
# This is just for internal testing
|
||||
fake = oslo_messaging._drivers.impl_fake:FakeDriver
|
||||
pika = oslo_messaging._drivers.impl_pika:PikaDriverCompatibleWithRabbitDriver
|
||||
pika = oslo_messaging._drivers.impl_pika:PikaDriver
|
||||
|
||||
oslo.messaging.executors =
|
||||
aioeventlet = oslo_messaging._executors.impl_aioeventlet:AsyncioEventletExecutor
|
||||
|
Loading…
x
Reference in New Issue
Block a user