Adds tests for pika_message.py
Also small mistakes were fixed, msg_id, unique_id and reply_q fields were moved to corresponding AMQP properties Change-Id: I5147c35c1a2ce0205e08ca81db164a3cc879fb0a
This commit is contained in:
parent
3976a2ff81
commit
5149461fd2
@ -198,8 +198,8 @@ class PikaDriver(object):
|
||||
"Timeout for current operation was expired."
|
||||
)
|
||||
try:
|
||||
with self._pika_engine.connection_pool.acquire(
|
||||
timeout=timeout) as conn:
|
||||
with (self._pika_engine.connection_without_confirmation_pool
|
||||
.acquire)(timeout=timeout) as conn:
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
conn.channel,
|
||||
exchange=(
|
||||
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
@ -44,7 +45,7 @@ def _is_eventlet_monkey_patched(module):
|
||||
return eventlet.patcher.is_monkey_patched(module)
|
||||
|
||||
|
||||
def _create__select_poller_connection_impl(
|
||||
def _create_select_poller_connection_impl(
|
||||
parameters, on_open_callback, on_open_error_callback,
|
||||
on_close_callback, stop_ioloop_on_close):
|
||||
"""Used for disabling autochoise of poller ('select', 'poll', 'epool', etc)
|
||||
@ -198,7 +199,6 @@ class PikaEngine(object):
|
||||
|
||||
self._connection_host_param_list = []
|
||||
self._connection_host_status_list = []
|
||||
self._next_connection_host_num = 0
|
||||
|
||||
for transport_host in url.hosts:
|
||||
pika_params = common_pika_params.copy()
|
||||
@ -215,9 +215,13 @@ class PikaEngine(object):
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0
|
||||
})
|
||||
|
||||
self._next_connection_host_num = random.randint(
|
||||
0, len(self._connection_host_param_list) - 1
|
||||
)
|
||||
|
||||
# initializing 2 connection pools: 1st for connections without
|
||||
# confirmations, 2nd - with confirmations
|
||||
self.connection_pool = pika_pool.QueuedPool(
|
||||
self.connection_without_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
@ -336,7 +340,7 @@ class PikaEngine(object):
|
||||
),
|
||||
**base_host_params
|
||||
),
|
||||
_impl_class=(_create__select_poller_connection_impl
|
||||
_impl_class=(_create_select_poller_connection_impl
|
||||
if self._force_select_poller_use else None)
|
||||
)
|
||||
|
||||
|
@ -95,40 +95,36 @@ class PikaIncomingMessage(object):
|
||||
self._pika_engine = pika_engine
|
||||
self._no_ack = no_ack
|
||||
self._channel = channel
|
||||
self.delivery_tag = method.delivery_tag
|
||||
self._delivery_tag = method.delivery_tag
|
||||
|
||||
self.version = version
|
||||
self._version = version
|
||||
|
||||
self.content_type = getattr(properties, "content_type",
|
||||
"application/json")
|
||||
self.content_encoding = getattr(properties, "content_encoding",
|
||||
"utf-8")
|
||||
self._content_type = properties.content_type
|
||||
self._content_encoding = properties.content_encoding
|
||||
self.unique_id = properties.message_id
|
||||
|
||||
self.expiration_time = (
|
||||
None if properties.expiration is None else
|
||||
time.time() + float(properties.expiration) / 1000
|
||||
)
|
||||
|
||||
if self.content_type != "application/json":
|
||||
if self._content_type != "application/json":
|
||||
raise NotImplementedError(
|
||||
"Content-type['{}'] is not valid, "
|
||||
"'application/json' only is supported.".format(
|
||||
self.content_type
|
||||
self._content_type
|
||||
)
|
||||
)
|
||||
|
||||
message_dict = jsonutils.loads(body, encoding=self.content_encoding)
|
||||
message_dict = jsonutils.loads(body, encoding=self._content_encoding)
|
||||
|
||||
context_dict = {}
|
||||
|
||||
for key in list(message_dict.keys()):
|
||||
key = six.text_type(key)
|
||||
if key.startswith('_context_'):
|
||||
if key.startswith('_$_'):
|
||||
value = message_dict.pop(key)
|
||||
context_dict[key[9:]] = value
|
||||
elif key.startswith('_'):
|
||||
value = message_dict.pop(key)
|
||||
setattr(self, key[1:], value)
|
||||
context_dict[key[3:]] = value
|
||||
self.message = message_dict
|
||||
self.ctxt = context_dict
|
||||
|
||||
@ -138,7 +134,7 @@ class PikaIncomingMessage(object):
|
||||
message anymore)
|
||||
"""
|
||||
if not self._no_ack:
|
||||
self._channel.basic_ack(delivery_tag=self.delivery_tag)
|
||||
self._channel.basic_ack(delivery_tag=self._delivery_tag)
|
||||
|
||||
def requeue(self):
|
||||
"""Rollback the message. Should be called by message processing logic
|
||||
@ -146,7 +142,7 @@ class PikaIncomingMessage(object):
|
||||
later if it is possible
|
||||
"""
|
||||
if not self._no_ack:
|
||||
return self._channel.basic_nack(delivery_tag=self.delivery_tag,
|
||||
return self._channel.basic_nack(delivery_tag=self._delivery_tag,
|
||||
requeue=True)
|
||||
|
||||
|
||||
@ -170,58 +166,30 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
:param no_ack: Boolean, defines should this message be acked by
|
||||
consumer or not
|
||||
"""
|
||||
self.msg_id = None
|
||||
self.reply_q = None
|
||||
|
||||
super(RpcPikaIncomingMessage, self).__init__(
|
||||
pika_engine, channel, method, properties, body, no_ack
|
||||
)
|
||||
self.reply_q = properties.reply_to
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
"""Send back reply to the RPC client
|
||||
:param reply - Dictionary, reply. In case of exception should be None
|
||||
:param failure - Exception, exception, raised during processing RPC
|
||||
request. Should be None if RPC request was successfully processed
|
||||
:param log_failure, Boolean, not used in this implementation.
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param log_failure: Boolean, not used in this implementation.
|
||||
It present here to be compatible with driver API
|
||||
|
||||
:return RpcReplyPikaIncomingMessage, message with reply
|
||||
"""
|
||||
if not (self.msg_id and self.reply_q):
|
||||
|
||||
if self.reply_q is None:
|
||||
return
|
||||
|
||||
msg = {
|
||||
'_msg_id': self.msg_id,
|
||||
}
|
||||
|
||||
if failure is not None:
|
||||
if isinstance(failure, RemoteExceptionMixin):
|
||||
failure_data = {
|
||||
'class': failure.clazz,
|
||||
'module': failure.module,
|
||||
'message': failure.message,
|
||||
'tb': failure.trace
|
||||
}
|
||||
else:
|
||||
tb = traceback.format_exception(*failure)
|
||||
failure = failure[1]
|
||||
|
||||
cls_name = six.text_type(failure.__class__.__name__)
|
||||
mod_name = six.text_type(failure.__class__.__module__)
|
||||
|
||||
failure_data = {
|
||||
'class': cls_name,
|
||||
'module': mod_name,
|
||||
'message': six.text_type(failure),
|
||||
'tb': tb
|
||||
}
|
||||
|
||||
msg['_failure'] = failure_data
|
||||
|
||||
if reply is not None:
|
||||
msg['_result'] = reply
|
||||
|
||||
reply_outgoing_message = PikaOutgoingMessage(
|
||||
self._pika_engine, msg, self.ctxt, content_type=self.content_type,
|
||||
content_encoding=self.content_encoding
|
||||
reply_outgoing_message = RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
|
||||
content_type=self._content_type,
|
||||
content_encoding=self._content_encoding
|
||||
)
|
||||
|
||||
def on_exception(ex):
|
||||
@ -242,11 +210,7 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
|
||||
try:
|
||||
reply_outgoing_message.send(
|
||||
exchange=self._pika_engine.rpc_reply_exchange,
|
||||
routing_key=self.reply_q,
|
||||
confirm=True,
|
||||
mandatory=False,
|
||||
persistent=False,
|
||||
reply_q=self.reply_q,
|
||||
expiration_time=self.expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
@ -282,18 +246,20 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
|
||||
:param no_ack: Boolean, defines should this message be acked by
|
||||
consumer or not
|
||||
"""
|
||||
self.result = None
|
||||
self.failure = None
|
||||
|
||||
super(RpcReplyPikaIncomingMessage, self).__init__(
|
||||
pika_engine, channel, method, properties, body, no_ack
|
||||
)
|
||||
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
self.result = self.message.get("s", None)
|
||||
self.failure = self.message.get("e", None)
|
||||
|
||||
if self.failure is not None:
|
||||
trace = self.failure.get('tb', [])
|
||||
message = self.failure.get('message', "")
|
||||
class_name = self.failure.get('class')
|
||||
module_name = self.failure.get('module')
|
||||
trace = self.failure.get('t', [])
|
||||
message = self.failure.get('s', "")
|
||||
class_name = self.failure.get('c')
|
||||
module_name = self.failure.get('m')
|
||||
|
||||
res_exc = None
|
||||
|
||||
@ -343,14 +309,14 @@ class PikaOutgoingMessage(object):
|
||||
|
||||
self._pika_engine = pika_engine
|
||||
|
||||
self.content_type = content_type
|
||||
self.content_encoding = content_encoding
|
||||
self._content_type = content_type
|
||||
self._content_encoding = content_encoding
|
||||
|
||||
if self.content_type != "application/json":
|
||||
if self._content_type != "application/json":
|
||||
raise NotImplementedError(
|
||||
"Content-type['{}'] is not valid, "
|
||||
"'application/json' only is supported.".format(
|
||||
self.content_type
|
||||
self._content_type
|
||||
)
|
||||
)
|
||||
|
||||
@ -362,23 +328,21 @@ class PikaOutgoingMessage(object):
|
||||
def _prepare_message_to_send(self):
|
||||
"""Combine user's message fields an system fields (_unique_id,
|
||||
context's data etc)
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param message: Dictionary, user's message fields
|
||||
:param context: Dictionary, request context's fields
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
msg = self.message.copy()
|
||||
|
||||
msg['_unique_id'] = self.unique_id
|
||||
if self.context:
|
||||
for key, value in six.iteritems(self.context):
|
||||
key = six.text_type(key)
|
||||
msg['_$_' + key] = value
|
||||
|
||||
for key, value in self.context.iteritems():
|
||||
key = six.text_type(key)
|
||||
msg['_context_' + key] = value
|
||||
return msg
|
||||
props = pika_spec.BasicProperties(
|
||||
content_encoding=self._content_encoding,
|
||||
content_type=self._content_type,
|
||||
headers={_VERSION_HEADER: _VERSION},
|
||||
message_id=self.unique_id,
|
||||
)
|
||||
return msg, props
|
||||
|
||||
@staticmethod
|
||||
def _publish(pool, exchange, routing_key, body, properties, mandatory,
|
||||
@ -456,14 +420,15 @@ class PikaOutgoingMessage(object):
|
||||
"Socket timeout exceeded."
|
||||
)
|
||||
|
||||
def _do_send(self, exchange, routing_key, msg_dict, confirm=True,
|
||||
mandatory=True, persistent=False, expiration_time=None,
|
||||
retrier=None):
|
||||
def _do_send(self, exchange, routing_key, msg_dict, msg_props,
|
||||
confirm=True, mandatory=True, persistent=False,
|
||||
expiration_time=None, retrier=None):
|
||||
"""Send prepared message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param msg_dict: Dictionary, message payload
|
||||
:param msg_props: Properties, message properties
|
||||
:param confirm: Boolean, enable publisher confirmation if True
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
@ -474,29 +439,26 @@ class PikaOutgoingMessage(object):
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
properties = pika_spec.BasicProperties(
|
||||
content_encoding=self.content_encoding,
|
||||
content_type=self.content_type,
|
||||
headers={_VERSION_HEADER: _VERSION},
|
||||
delivery_mode=2 if persistent else 1
|
||||
)
|
||||
msg_props.delivery_mode = 2 if persistent else 1
|
||||
|
||||
pool = (self._pika_engine.connection_with_confirmation_pool
|
||||
if confirm else self._pika_engine.connection_pool)
|
||||
if confirm else
|
||||
self._pika_engine.connection_without_confirmation_pool)
|
||||
|
||||
body = jsonutils.dumps(msg_dict, encoding=self.content_encoding)
|
||||
body = jsonutils.dump_as_bytes(msg_dict,
|
||||
encoding=self._content_encoding)
|
||||
|
||||
LOG.debug(
|
||||
"Sending message:[body:{}; properties: {}] to target: "
|
||||
"[exchange:{}; routing_key:{}]".format(
|
||||
body, properties, exchange, routing_key
|
||||
body, msg_props, exchange, routing_key
|
||||
)
|
||||
)
|
||||
|
||||
publish = (self._publish if retrier is None else
|
||||
retrier(self._publish))
|
||||
|
||||
return publish(pool, exchange, routing_key, body, properties,
|
||||
return publish(pool, exchange, routing_key, body, msg_props,
|
||||
mandatory, expiration_time)
|
||||
|
||||
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
|
||||
@ -515,10 +477,11 @@ class PikaOutgoingMessage(object):
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict = self._prepare_message_to_send()
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
return self._do_send(exchange, routing_key, msg_dict, confirm,
|
||||
mandatory, persistent, expiration_time, retrier)
|
||||
return self._do_send(exchange, routing_key, msg_dict, msg_props,
|
||||
confirm, mandatory, persistent, expiration_time,
|
||||
retrier)
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
@ -554,23 +517,25 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
target.topic, target.server, retrier is None
|
||||
)
|
||||
|
||||
msg_dict = self._prepare_message_to_send()
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
if reply_listener:
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg_dict["_msg_id"] = msg_id
|
||||
LOG.debug('MSG_ID is %s', msg_id)
|
||||
self.msg_id = uuid.uuid4().hex
|
||||
msg_props.correlation_id = self.msg_id
|
||||
LOG.debug('MSG_ID is %s', self.msg_id)
|
||||
|
||||
msg_dict["_reply_q"] = reply_listener.get_reply_qname(
|
||||
self.reply_q = reply_listener.get_reply_qname(
|
||||
expiration_time - time.time()
|
||||
)
|
||||
msg_props.reply_to = self.reply_q
|
||||
|
||||
future = reply_listener.register_reply_waiter(msg_id=msg_id)
|
||||
future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
|
||||
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
|
||||
confirm=True, mandatory=True, persistent=False,
|
||||
expiration_time=expiration_time, retrier=retrier
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
try:
|
||||
@ -580,10 +545,78 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
if isinstance(e, futures.TimeoutError):
|
||||
e = exceptions.MessagingTimeout()
|
||||
raise e
|
||||
|
||||
else:
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
|
||||
confirm=True, mandatory=True, persistent=False,
|
||||
expiration_time=expiration_time, retrier=retrier
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
"""PikaOutgoingMessage implementation for RPC reply messages. It sets
|
||||
correlation_id AMQP property to link this reply with response
|
||||
"""
|
||||
def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
"""Initialize with reply information for sending
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param msg_id: String, msg_id of RPC request, which waits for reply
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure_info: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
self.msg_id = msg_id
|
||||
|
||||
if failure_info is not None:
|
||||
ex_class = failure_info[0]
|
||||
ex = failure_info[1]
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
if issubclass(ex_class, RemoteExceptionMixin):
|
||||
failure_data = {
|
||||
'c': ex.clazz,
|
||||
'm': ex.module,
|
||||
's': ex.message,
|
||||
't': tb
|
||||
}
|
||||
else:
|
||||
failure_data = {
|
||||
'c': six.text_type(ex_class.__name__),
|
||||
'm': six.text_type(ex_class.__module__),
|
||||
's': six.text_type(ex),
|
||||
't': tb
|
||||
}
|
||||
|
||||
msg = {'e': failure_data}
|
||||
else:
|
||||
msg = {'s': reply}
|
||||
|
||||
super(RpcReplyPikaOutgoingMessage, self).__init__(
|
||||
pika_engine, msg, None, content_type, content_encoding
|
||||
)
|
||||
|
||||
def send(self, reply_q, expiration_time=None, retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param reply_q: String, queue name for sending reply
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
msg_props.correlation_id = self.msg_id
|
||||
|
||||
self._do_send(
|
||||
exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
|
||||
msg_dict=msg_dict, msg_props=msg_props, confirm=True,
|
||||
mandatory=True, persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
@ -18,6 +18,7 @@ import time
|
||||
from oslo_log import log as logging
|
||||
import pika_pool
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
|
||||
@ -68,7 +69,7 @@ class PikaPoller(object):
|
||||
if self._queues_to_consume is None:
|
||||
self._queues_to_consume = self._declare_queue_binding()
|
||||
|
||||
for queue, no_ack in self._queues_to_consume.iteritems():
|
||||
for queue, no_ack in six.iteritems(self._queues_to_consume):
|
||||
self._start_consuming(queue, no_ack)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
|
0
oslo_messaging/tests/drivers/pika/__init__.py
Normal file
0
oslo_messaging/tests/drivers/pika/__init__.py
Normal file
622
oslo_messaging/tests/drivers/pika/test_message.py
Normal file
622
oslo_messaging/tests/drivers/pika/test_message.py
Normal file
@ -0,0 +1,622 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import functools
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from concurrent import futures
|
||||
from mock import mock, patch
|
||||
from oslo_serialization import jsonutils
|
||||
import pika
|
||||
from pika import spec
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
|
||||
|
||||
class PikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
headers={"version": "1.0"},
|
||||
)
|
||||
self._body = (
|
||||
b'{"_$_key_context":"context_value",'
|
||||
b'"payload_key": "payload_value"}'
|
||||
)
|
||||
|
||||
def test_message_body_parsing(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
def test_message_acknowledge(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, False
|
||||
)
|
||||
|
||||
message.acknowledge()
|
||||
|
||||
self.assertEqual(1, self._channel.basic_ack.call_count)
|
||||
self.assertEqual({"delivery_tag": self._delivery_tag},
|
||||
self._channel.basic_ack.call_args[1])
|
||||
|
||||
def test_message_acknowledge_no_ack(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
message.acknowledge()
|
||||
|
||||
self.assertEqual(0, self._channel.basic_ack.call_count)
|
||||
|
||||
def test_message_requeue(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, False
|
||||
)
|
||||
|
||||
message.requeue()
|
||||
|
||||
self.assertEqual(1, self._channel.basic_nack.call_count)
|
||||
self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
|
||||
self._channel.basic_nack.call_args[1])
|
||||
|
||||
def test_message_requeue_no_ack(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
message.requeue()
|
||||
|
||||
self.assertEqual(0, self._channel.basic_nack.call_count)
|
||||
|
||||
|
||||
class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._pika_engine.rpc_reply_retry_attempts = 3
|
||||
self._pika_engine.rpc_reply_retry_delay = 0.25
|
||||
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
self._body = (
|
||||
b'{"_$_key_context":"context_value",'
|
||||
b'"payload_key":"payload_value"}'
|
||||
)
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
content_encoding="utf-8",
|
||||
headers={"version": "1.0"},
|
||||
)
|
||||
|
||||
def test_call_message_body_parsing(self):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
def test_cast_message_body_parsing(self):
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, None)
|
||||
self.assertEqual(message.reply_q, None)
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
@patch(("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"PikaOutgoingMessage.send"))
|
||||
def test_reply_for_cast_message(self, send_reply_mock):
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, None)
|
||||
self.assertEqual(message.reply_q, None)
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
message.reply(reply=object())
|
||||
|
||||
self.assertEqual(send_reply_mock.call_count, 0)
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
def test_positive_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
reply = "all_fine"
|
||||
message.reply(reply=reply)
|
||||
|
||||
outgoing_message_mock.assert_called_once_with(
|
||||
self._pika_engine, 123456789, failure_info=None, reply='all_fine',
|
||||
content_encoding='utf-8', content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
)
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
def test_negative_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
failure_info = object()
|
||||
message.reply(failure=failure_info)
|
||||
|
||||
outgoing_message_mock.assert_called_once_with(
|
||||
self._pika_engine, 123456789,
|
||||
failure_info=failure_info,
|
||||
reply=None,
|
||||
content_encoding='utf-8',
|
||||
content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._pika_engine.allowed_remote_exmods = [
|
||||
pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
|
||||
]
|
||||
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
content_encoding="utf-8",
|
||||
headers={"version": "1.0"},
|
||||
correlation_id=123456789
|
||||
)
|
||||
|
||||
def test_positive_reply_message_body_parsing(self):
|
||||
|
||||
body = b'{"s": "all fine"}'
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertIsNone(message.failure)
|
||||
self.assertEquals(message.result, "all fine")
|
||||
|
||||
def test_negative_reply_message_body_parsing(self):
|
||||
|
||||
body = (b'{'
|
||||
b' "e": {'
|
||||
b' "s": "Error message",'
|
||||
b' "t": ["TRACE HERE"],'
|
||||
b' "c": "MessagingException",'
|
||||
b' "m": "oslo_messaging.exceptions"'
|
||||
b' }'
|
||||
b'}')
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
body, True
|
||||
)
|
||||
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertIsNone(message.result)
|
||||
self.assertEquals(
|
||||
str(message.failure),
|
||||
'Error message\n'
|
||||
'TRACE HERE'
|
||||
)
|
||||
self.assertIsInstance(message.failure,
|
||||
oslo_messaging.MessagingException)
|
||||
|
||||
|
||||
class PikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.MagicMock()
|
||||
self._exchange = "it is exchange"
|
||||
self._routing_key = "it is routing key"
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
self._mandatory = object()
|
||||
|
||||
self._message = {"msg_type": 1, "msg_str": "hello"}
|
||||
self._context = {"request_id": 555, "token": "it is a token"}
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_with_confirmation(self):
|
||||
message = pika_drv_msg.PikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
message.send(
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
confirm=True,
|
||||
mandatory=self._mandatory,
|
||||
persistent=True,
|
||||
expiration_time=self._expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=self._mandatory,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 2)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_without_confirmation(self):
|
||||
message = pika_drv_msg.PikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
message.send(
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
confirm=False,
|
||||
mandatory=self._mandatory,
|
||||
persistent=False,
|
||||
expiration_time=self._expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=self._mandatory,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration)
|
||||
< 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._exchange = "it is exchange"
|
||||
self._routing_key = "it is routing key"
|
||||
|
||||
self._pika_engine = mock.MagicMock()
|
||||
self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
|
||||
self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
|
||||
|
||||
self._message = {"msg_type": 1, "msg_str": "hello"}
|
||||
self._context = {"request_id": 555, "token": "it is a token"}
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_cast_message(self):
|
||||
message = pika_drv_msg.RpcPikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
|
||||
message.send(
|
||||
target=oslo_messaging.Target(exchange=self._exchange,
|
||||
topic=self._routing_key),
|
||||
reply_listener=None,
|
||||
expiration_time=expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertIsNone(props.correlation_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_call_message(self):
|
||||
message = pika_drv_msg.RpcPikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
|
||||
result = "it is a result"
|
||||
reply_queue_name = "reply_queue_name"
|
||||
|
||||
future = futures.Future()
|
||||
future.set_result(result)
|
||||
reply_listener = mock.Mock()
|
||||
reply_listener.register_reply_waiter.return_value = future
|
||||
reply_listener.get_reply_qname.return_value = reply_queue_name
|
||||
|
||||
res = message.send(
|
||||
target=oslo_messaging.Target(exchange=self._exchange,
|
||||
topic=self._routing_key),
|
||||
reply_listener=reply_listener,
|
||||
expiration_time=expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self.assertEqual(result, res)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertEquals(props.reply_to, reply_queue_name)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
|
||||
class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._reply_q = "reply_queue_name"
|
||||
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
|
||||
self._pika_engine = mock.MagicMock()
|
||||
|
||||
self._rpc_reply_exchange = "rpc_reply_exchange"
|
||||
self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
|
||||
|
||||
self._msg_id = 12345567
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_success_message_send(self):
|
||||
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self._msg_id, reply="all_fine"
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=b'{"s": "all_fine"}',
|
||||
exchange=self._rpc_reply_exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._reply_q
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("traceback.format_exception", new=lambda x,y,z:z)
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_failure_message_send(self):
|
||||
failure_info = (oslo_messaging.MessagingException,
|
||||
oslo_messaging.MessagingException("Error message"),
|
||||
['It is a trace'])
|
||||
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self._msg_id, failure_info=failure_info
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._rpc_reply_exchange,
|
||||
mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._reply_q
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
self.assertEqual(
|
||||
b'{"e": {"c": "MessagingException", '
|
||||
b'"m": "oslo_messaging.exceptions", "s": "Error message", '
|
||||
b'"t": ["It is a trace"]}}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
Loading…
x
Reference in New Issue
Block a user