diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index eaa4a7685..3d633a5b1 100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
@@ -198,8 +198,8 @@ class PikaDriver(object):
                 "Timeout for current operation was expired."
             )
         try:
-            with self._pika_engine.connection_pool.acquire(
-                    timeout=timeout) as conn:
+            with (self._pika_engine.connection_without_confirmation_pool
+                    .acquire)(timeout=timeout) as conn:
                 self._pika_engine.declare_queue_binding_by_channel(
                     conn.channel,
                     exchange=(
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index 06dfcdbf1..6e877bb2e 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import random
 import socket
 import sys
 import threading
@@ -44,7 +45,7 @@ def _is_eventlet_monkey_patched(module):
     return eventlet.patcher.is_monkey_patched(module)
 
 
-def _create__select_poller_connection_impl(
+def _create_select_poller_connection_impl(
         parameters, on_open_callback, on_open_error_callback,
         on_close_callback, stop_ioloop_on_close):
     """Used for disabling autochoise of poller ('select', 'poll', 'epool', etc)
@@ -198,7 +199,6 @@ class PikaEngine(object):
 
         self._connection_host_param_list = []
         self._connection_host_status_list = []
-        self._next_connection_host_num = 0
 
         for transport_host in url.hosts:
             pika_params = common_pika_params.copy()
@@ -215,9 +215,13 @@ class PikaEngine(object):
                 self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0
             })
 
+        self._next_connection_host_num = random.randint(
+            0, len(self._connection_host_param_list) - 1
+        )
+
         # initializing 2 connection pools: 1st for connections without
         # confirmations, 2nd - with confirmations
-        self.connection_pool = pika_pool.QueuedPool(
+        self.connection_without_confirmation_pool = pika_pool.QueuedPool(
             create=self.create_connection,
             max_size=self.conf.oslo_messaging_pika.pool_max_size,
             max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
@@ -336,7 +340,7 @@ class PikaEngine(object):
                         ),
                         **base_host_params
                     ),
-                    _impl_class=(_create__select_poller_connection_impl
+                    _impl_class=(_create_select_poller_connection_impl
                                  if self._force_select_poller_use else None)
                 )
 
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index 9bf9febdb..edd5c7328 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -95,40 +95,36 @@ class PikaIncomingMessage(object):
         self._pika_engine = pika_engine
         self._no_ack = no_ack
         self._channel = channel
-        self.delivery_tag = method.delivery_tag
+        self._delivery_tag = method.delivery_tag
 
-        self.version = version
+        self._version = version
 
-        self.content_type = getattr(properties, "content_type",
-                                    "application/json")
-        self.content_encoding = getattr(properties, "content_encoding",
-                                        "utf-8")
+        self._content_type = properties.content_type
+        self._content_encoding = properties.content_encoding
+        self.unique_id = properties.message_id
 
         self.expiration_time = (
             None if properties.expiration is None else
             time.time() + float(properties.expiration) / 1000
         )
 
-        if self.content_type != "application/json":
+        if self._content_type != "application/json":
             raise NotImplementedError(
                 "Content-type['{}'] is not valid, "
                 "'application/json' only is supported.".format(
-                    self.content_type
+                    self._content_type
                 )
             )
 
-        message_dict = jsonutils.loads(body, encoding=self.content_encoding)
+        message_dict = jsonutils.loads(body, encoding=self._content_encoding)
 
         context_dict = {}
 
         for key in list(message_dict.keys()):
             key = six.text_type(key)
-            if key.startswith('_context_'):
+            if key.startswith('_$_'):
                 value = message_dict.pop(key)
-                context_dict[key[9:]] = value
-            elif key.startswith('_'):
-                value = message_dict.pop(key)
-                setattr(self, key[1:], value)
+                context_dict[key[3:]] = value
         self.message = message_dict
         self.ctxt = context_dict
 
@@ -138,7 +134,7 @@ class PikaIncomingMessage(object):
         message anymore)
         """
         if not self._no_ack:
-            self._channel.basic_ack(delivery_tag=self.delivery_tag)
+            self._channel.basic_ack(delivery_tag=self._delivery_tag)
 
     def requeue(self):
         """Rollback the message. Should be called by message processing logic
@@ -146,7 +142,7 @@ class PikaIncomingMessage(object):
         later if it is possible
         """
         if not self._no_ack:
-            return self._channel.basic_nack(delivery_tag=self.delivery_tag,
+            return self._channel.basic_nack(delivery_tag=self._delivery_tag,
                                             requeue=True)
 
 
@@ -170,58 +166,30 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
         :param no_ack: Boolean, defines should this message be acked by
             consumer or not
         """
-        self.msg_id = None
-        self.reply_q = None
-
         super(RpcPikaIncomingMessage, self).__init__(
             pika_engine, channel, method, properties, body, no_ack
         )
+        self.reply_q = properties.reply_to
+        self.msg_id = properties.correlation_id
 
     def reply(self, reply=None, failure=None, log_failure=True):
         """Send back reply to the RPC client
-        :param reply - Dictionary, reply. In case of exception should be None
-        :param failure - Exception, exception, raised during processing RPC
-            request. Should be None if RPC request was successfully processed
-        :param log_failure, Boolean, not used in this implementation.
+        :param reply: Dictionary, reply. In case of exception should be None
+        :param failure: Tuple, should be a sys.exc_info() tuple.
+            Should be None if RPC request was successfully processed.
+        :param log_failure: Boolean, not used in this implementation.
             It present here to be compatible with driver API
+
+        :return RpcReplyPikaIncomingMessage, message with reply
         """
-        if not (self.msg_id and self.reply_q):
+
+        if self.reply_q is None:
             return
 
-        msg = {
-            '_msg_id': self.msg_id,
-        }
-
-        if failure is not None:
-            if isinstance(failure, RemoteExceptionMixin):
-                failure_data = {
-                    'class': failure.clazz,
-                    'module': failure.module,
-                    'message': failure.message,
-                    'tb': failure.trace
-                }
-            else:
-                tb = traceback.format_exception(*failure)
-                failure = failure[1]
-
-                cls_name = six.text_type(failure.__class__.__name__)
-                mod_name = six.text_type(failure.__class__.__module__)
-
-                failure_data = {
-                    'class': cls_name,
-                    'module': mod_name,
-                    'message': six.text_type(failure),
-                    'tb': tb
-                }
-
-            msg['_failure'] = failure_data
-
-        if reply is not None:
-            msg['_result'] = reply
-
-        reply_outgoing_message = PikaOutgoingMessage(
-            self._pika_engine, msg, self.ctxt, content_type=self.content_type,
-            content_encoding=self.content_encoding
+        reply_outgoing_message = RpcReplyPikaOutgoingMessage(
+            self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
+            content_type=self._content_type,
+            content_encoding=self._content_encoding
         )
 
         def on_exception(ex):
@@ -242,11 +210,7 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
 
         try:
             reply_outgoing_message.send(
-                exchange=self._pika_engine.rpc_reply_exchange,
-                routing_key=self.reply_q,
-                confirm=True,
-                mandatory=False,
-                persistent=False,
+                reply_q=self.reply_q,
                 expiration_time=self.expiration_time,
                 retrier=retrier
             )
@@ -282,18 +246,20 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
         :param no_ack: Boolean, defines should this message be acked by
             consumer or not
         """
-        self.result = None
-        self.failure = None
-
         super(RpcReplyPikaIncomingMessage, self).__init__(
             pika_engine, channel, method, properties, body, no_ack
         )
 
+        self.msg_id = properties.correlation_id
+
+        self.result = self.message.get("s", None)
+        self.failure = self.message.get("e", None)
+
         if self.failure is not None:
-            trace = self.failure.get('tb', [])
-            message = self.failure.get('message', "")
-            class_name = self.failure.get('class')
-            module_name = self.failure.get('module')
+            trace = self.failure.get('t', [])
+            message = self.failure.get('s', "")
+            class_name = self.failure.get('c')
+            module_name = self.failure.get('m')
 
             res_exc = None
 
@@ -343,14 +309,14 @@ class PikaOutgoingMessage(object):
 
         self._pika_engine = pika_engine
 
-        self.content_type = content_type
-        self.content_encoding = content_encoding
+        self._content_type = content_type
+        self._content_encoding = content_encoding
 
-        if self.content_type != "application/json":
+        if self._content_type != "application/json":
             raise NotImplementedError(
                 "Content-type['{}'] is not valid, "
                 "'application/json' only is supported.".format(
-                    self.content_type
+                    self._content_type
                 )
             )
 
@@ -362,23 +328,21 @@ class PikaOutgoingMessage(object):
     def _prepare_message_to_send(self):
         """Combine user's message fields an system fields (_unique_id,
         context's data etc)
-
-        :param pika_engine: PikaEngine, shared object with configuration and
-            shared driver functionality
-        :param message: Dictionary, user's message fields
-        :param context: Dictionary, request context's fields
-        :param content_type: String, content-type header, defines serialization
-            mechanism
-        :param content_encoding: String, defines encoding for text data
         """
         msg = self.message.copy()
 
-        msg['_unique_id'] = self.unique_id
+        if self.context:
+            for key, value in six.iteritems(self.context):
+                key = six.text_type(key)
+                msg['_$_' + key] = value
 
-        for key, value in self.context.iteritems():
-            key = six.text_type(key)
-            msg['_context_' + key] = value
-        return msg
+        props = pika_spec.BasicProperties(
+            content_encoding=self._content_encoding,
+            content_type=self._content_type,
+            headers={_VERSION_HEADER: _VERSION},
+            message_id=self.unique_id,
+        )
+        return msg, props
 
     @staticmethod
     def _publish(pool, exchange, routing_key, body, properties, mandatory,
@@ -456,14 +420,15 @@ class PikaOutgoingMessage(object):
                 "Socket timeout exceeded."
             )
 
-    def _do_send(self, exchange, routing_key, msg_dict, confirm=True,
-                 mandatory=True, persistent=False, expiration_time=None,
-                 retrier=None):
+    def _do_send(self, exchange, routing_key, msg_dict, msg_props,
+                 confirm=True, mandatory=True, persistent=False,
+                 expiration_time=None, retrier=None):
         """Send prepared message with configured retrying
 
         :param exchange: String, RabbitMQ exchange name for message sending
         :param routing_key: String, RabbitMQ routing key for message routing
         :param msg_dict: Dictionary, message payload
+        :param msg_props: Properties, message properties
         :param confirm: Boolean, enable publisher confirmation if True
         :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
             exception if it is not possible to deliver message to any queue)
@@ -474,29 +439,26 @@ class PikaOutgoingMessage(object):
         :param retrier: retrying.Retrier, configured retrier object for sending
             message, if None no retrying is performed
         """
-        properties = pika_spec.BasicProperties(
-            content_encoding=self.content_encoding,
-            content_type=self.content_type,
-            headers={_VERSION_HEADER: _VERSION},
-            delivery_mode=2 if persistent else 1
-        )
+        msg_props.delivery_mode = 2 if persistent else 1
 
         pool = (self._pika_engine.connection_with_confirmation_pool
-                if confirm else self._pika_engine.connection_pool)
+                if confirm else
+                self._pika_engine.connection_without_confirmation_pool)
 
-        body = jsonutils.dumps(msg_dict, encoding=self.content_encoding)
+        body = jsonutils.dump_as_bytes(msg_dict,
+                                       encoding=self._content_encoding)
 
         LOG.debug(
             "Sending message:[body:{}; properties: {}] to target: "
             "[exchange:{}; routing_key:{}]".format(
-                body, properties, exchange, routing_key
+                body, msg_props, exchange, routing_key
             )
         )
 
         publish = (self._publish if retrier is None else
                    retrier(self._publish))
 
-        return publish(pool, exchange, routing_key, body, properties,
+        return publish(pool, exchange, routing_key, body, msg_props,
                        mandatory, expiration_time)
 
     def send(self, exchange, routing_key='', confirm=True, mandatory=True,
@@ -515,10 +477,11 @@ class PikaOutgoingMessage(object):
         :param retrier: retrying.Retrier, configured retrier object for sending
             message, if None no retrying is performed
         """
-        msg_dict = self._prepare_message_to_send()
+        msg_dict, msg_props = self._prepare_message_to_send()
 
-        return self._do_send(exchange, routing_key, msg_dict, confirm,
-                             mandatory, persistent, expiration_time, retrier)
+        return self._do_send(exchange, routing_key, msg_dict, msg_props,
+                             confirm, mandatory, persistent, expiration_time,
+                             retrier)
 
 
 class RpcPikaOutgoingMessage(PikaOutgoingMessage):
@@ -554,23 +517,25 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
             target.topic, target.server, retrier is None
         )
 
-        msg_dict = self._prepare_message_to_send()
+        msg_dict, msg_props = self._prepare_message_to_send()
 
         if reply_listener:
-            msg_id = uuid.uuid4().hex
-            msg_dict["_msg_id"] = msg_id
-            LOG.debug('MSG_ID is %s', msg_id)
+            self.msg_id = uuid.uuid4().hex
+            msg_props.correlation_id = self.msg_id
+            LOG.debug('MSG_ID is %s', self.msg_id)
 
-            msg_dict["_reply_q"] = reply_listener.get_reply_qname(
+            self.reply_q = reply_listener.get_reply_qname(
                 expiration_time - time.time()
             )
+            msg_props.reply_to = self.reply_q
 
-            future = reply_listener.register_reply_waiter(msg_id=msg_id)
+            future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
 
             self._do_send(
                 exchange=exchange, routing_key=queue, msg_dict=msg_dict,
-                confirm=True, mandatory=True, persistent=False,
-                expiration_time=expiration_time, retrier=retrier
+                msg_props=msg_props, confirm=True, mandatory=True,
+                persistent=False, expiration_time=expiration_time,
+                retrier=retrier
             )
 
             try:
@@ -580,10 +545,78 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
                 if isinstance(e, futures.TimeoutError):
                     e = exceptions.MessagingTimeout()
                 raise e
-
         else:
             self._do_send(
                 exchange=exchange, routing_key=queue, msg_dict=msg_dict,
-                confirm=True, mandatory=True, persistent=False,
-                expiration_time=expiration_time, retrier=retrier
+                msg_props=msg_props, confirm=True, mandatory=True,
+                persistent=False, expiration_time=expiration_time,
+                retrier=retrier
             )
+
+
+class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
+    """PikaOutgoingMessage implementation for RPC reply messages. It sets
+    correlation_id AMQP property to link this reply with response
+    """
+    def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
+                 content_type="application/json", content_encoding="utf-8"):
+        """Initialize with reply information for sending
+
+        :param pika_engine: PikaEngine, shared object with configuration and
+            shared driver functionality
+        :param msg_id: String, msg_id of RPC request, which waits for reply
+        :param reply: Dictionary, reply. In case of exception should be None
+        :param failure_info: Tuple, should be a sys.exc_info() tuple.
+            Should be None if RPC request was successfully processed.
+        :param content_type: String, content-type header, defines serialization
+            mechanism
+        :param content_encoding: String, defines encoding for text data
+        """
+        self.msg_id = msg_id
+
+        if failure_info is not None:
+            ex_class = failure_info[0]
+            ex = failure_info[1]
+            tb = traceback.format_exception(*failure_info)
+            if issubclass(ex_class, RemoteExceptionMixin):
+                failure_data = {
+                    'c': ex.clazz,
+                    'm': ex.module,
+                    's': ex.message,
+                    't': tb
+                }
+            else:
+                failure_data = {
+                    'c': six.text_type(ex_class.__name__),
+                    'm': six.text_type(ex_class.__module__),
+                    's': six.text_type(ex),
+                    't': tb
+                }
+
+            msg = {'e': failure_data}
+        else:
+            msg = {'s': reply}
+
+        super(RpcReplyPikaOutgoingMessage, self).__init__(
+            pika_engine, msg, None, content_type, content_encoding
+        )
+
+    def send(self, reply_q, expiration_time=None, retrier=None):
+        """Send RPC message with configured retrying
+
+        :param reply_q: String, queue name for sending reply
+        :param expiration_time: Float, expiration time in seconds
+            (like time.time())
+        :param retrier: retrying.Retrier, configured retrier object for sending
+            message, if None no retrying is performed
+        """
+
+        msg_dict, msg_props = self._prepare_message_to_send()
+        msg_props.correlation_id = self.msg_id
+
+        self._do_send(
+            exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
+            msg_dict=msg_dict, msg_props=msg_props, confirm=True,
+            mandatory=True, persistent=False, expiration_time=expiration_time,
+            retrier=retrier
+        )
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 1390ced75..5aa948a2e 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -18,6 +18,7 @@ import time
 from oslo_log import log as logging
 import pika_pool
 import retrying
+import six
 
 from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
 
@@ -68,7 +69,7 @@ class PikaPoller(object):
         if self._queues_to_consume is None:
             self._queues_to_consume = self._declare_queue_binding()
 
-        for queue, no_ack in self._queues_to_consume.iteritems():
+        for queue, no_ack in six.iteritems(self._queues_to_consume):
             self._start_consuming(queue, no_ack)
 
     def _declare_queue_binding(self):
diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
new file mode 100644
index 000000000..5008ce36e
--- /dev/null
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -0,0 +1,622 @@
+#    Copyright 2015 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import functools
+import time
+import unittest
+
+from concurrent import futures
+from mock import mock, patch
+from oslo_serialization import jsonutils
+import pika
+from pika import spec
+
+import oslo_messaging
+from oslo_messaging._drivers.pika_driver import pika_engine
+from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
+
+
+class PikaIncomingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._pika_engine = mock.Mock()
+        self._channel = mock.Mock()
+
+        self._delivery_tag = 12345
+
+        self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+        self._properties = pika.BasicProperties(
+            content_type="application/json",
+            headers={"version": "1.0"},
+        )
+        self._body = (
+            b'{"_$_key_context":"context_value",'
+            b'"payload_key": "payload_value"}'
+        )
+
+    def test_message_body_parsing(self):
+        message = pika_drv_msg.PikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+
+    def test_message_acknowledge(self):
+        message = pika_drv_msg.PikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, False
+        )
+
+        message.acknowledge()
+
+        self.assertEqual(1,  self._channel.basic_ack.call_count)
+        self.assertEqual({"delivery_tag": self._delivery_tag},
+                         self._channel.basic_ack.call_args[1])
+
+    def test_message_acknowledge_no_ack(self):
+        message = pika_drv_msg.PikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        message.acknowledge()
+
+        self.assertEqual(0,  self._channel.basic_ack.call_count)
+
+    def test_message_requeue(self):
+        message = pika_drv_msg.PikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, False
+        )
+
+        message.requeue()
+
+        self.assertEqual(1, self._channel.basic_nack.call_count)
+        self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
+                         self._channel.basic_nack.call_args[1])
+
+    def test_message_requeue_no_ack(self):
+        message = pika_drv_msg.PikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        message.requeue()
+
+        self.assertEqual(0, self._channel.basic_nack.call_count)
+
+
+class RpcPikaIncomingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._pika_engine = mock.Mock()
+        self._pika_engine.rpc_reply_retry_attempts = 3
+        self._pika_engine.rpc_reply_retry_delay = 0.25
+
+        self._channel = mock.Mock()
+
+        self._delivery_tag = 12345
+
+        self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+        self._body = (
+            b'{"_$_key_context":"context_value",'
+            b'"payload_key":"payload_value"}'
+        )
+        self._properties = pika.BasicProperties(
+            content_type="application/json",
+            content_encoding="utf-8",
+            headers={"version": "1.0"},
+        )
+
+    def test_call_message_body_parsing(self):
+        self._properties.correlation_id = 123456789
+        self._properties.reply_to = "reply_queue"
+
+        message = pika_drv_msg.RpcPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.msg_id, 123456789)
+        self.assertEqual(message.reply_q, "reply_queue")
+
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+
+    def test_cast_message_body_parsing(self):
+        message = pika_drv_msg.RpcPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.msg_id, None)
+        self.assertEqual(message.reply_q, None)
+
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+
+    @patch(("oslo_messaging._drivers.pika_driver.pika_message."
+            "PikaOutgoingMessage.send"))
+    def test_reply_for_cast_message(self, send_reply_mock):
+        message = pika_drv_msg.RpcPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.msg_id, None)
+        self.assertEqual(message.reply_q, None)
+
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+
+        message.reply(reply=object())
+
+        self.assertEqual(send_reply_mock.call_count, 0)
+
+    @patch("oslo_messaging._drivers.pika_driver.pika_message."
+           "RpcReplyPikaOutgoingMessage")
+    @patch("retrying.retry")
+    def test_positive_reply_for_call_message(self,
+                                             retry_mock,
+                                             outgoing_message_mock):
+        self._properties.correlation_id = 123456789
+        self._properties.reply_to = "reply_queue"
+
+        message = pika_drv_msg.RpcPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.msg_id, 123456789)
+        self.assertEqual(message.reply_q, "reply_queue")
+
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+        reply = "all_fine"
+        message.reply(reply=reply)
+
+        outgoing_message_mock.assert_called_once_with(
+            self._pika_engine, 123456789, failure_info=None, reply='all_fine',
+            content_encoding='utf-8', content_type='application/json'
+        )
+        outgoing_message_mock().send.assert_called_once_with(
+            expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
+        )
+        retry_mock.assert_called_once_with(
+            retry_on_exception=mock.ANY, stop_max_attempt_number=3,
+            wait_fixed=250.0
+        )
+
+    @patch("oslo_messaging._drivers.pika_driver.pika_message."
+           "RpcReplyPikaOutgoingMessage")
+    @patch("retrying.retry")
+    def test_negative_reply_for_call_message(self,
+                                             retry_mock,
+                                             outgoing_message_mock):
+        self._properties.correlation_id = 123456789
+        self._properties.reply_to = "reply_queue"
+
+        message = pika_drv_msg.RpcPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            self._body, True
+        )
+
+        self.assertEqual(message.ctxt.get("key_context", None),
+                         "context_value")
+        self.assertEqual(message.msg_id, 123456789)
+        self.assertEqual(message.reply_q, "reply_queue")
+
+        self.assertEqual(message.message.get("payload_key", None),
+                         "payload_value")
+
+        failure_info = object()
+        message.reply(failure=failure_info)
+
+        outgoing_message_mock.assert_called_once_with(
+            self._pika_engine, 123456789,
+            failure_info=failure_info,
+            reply=None,
+            content_encoding='utf-8',
+            content_type='application/json'
+        )
+        outgoing_message_mock().send.assert_called_once_with(
+            expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
+        )
+        retry_mock.assert_called_once_with(
+            retry_on_exception=mock.ANY, stop_max_attempt_number=3,
+            wait_fixed=250.0
+        )
+
+
+class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._pika_engine = mock.Mock()
+        self._pika_engine.allowed_remote_exmods = [
+            pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
+        ]
+
+        self._channel = mock.Mock()
+
+        self._delivery_tag = 12345
+
+        self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+
+        self._properties = pika.BasicProperties(
+            content_type="application/json",
+            content_encoding="utf-8",
+            headers={"version": "1.0"},
+            correlation_id=123456789
+        )
+
+    def test_positive_reply_message_body_parsing(self):
+
+        body = b'{"s": "all fine"}'
+
+        message = pika_drv_msg.RpcReplyPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            body, True
+        )
+
+        self.assertEqual(message.msg_id, 123456789)
+        self.assertIsNone(message.failure)
+        self.assertEquals(message.result, "all fine")
+
+    def test_negative_reply_message_body_parsing(self):
+
+        body = (b'{'
+                b'    "e": {'
+                b'         "s": "Error message",'
+                b'         "t": ["TRACE HERE"],'
+                b'         "c": "MessagingException",'
+                b'         "m": "oslo_messaging.exceptions"'
+                b'     }'
+                b'}')
+
+        message = pika_drv_msg.RpcReplyPikaIncomingMessage(
+            self._pika_engine, self._channel, self._method, self._properties,
+            body, True
+        )
+
+        self.assertEqual(message.msg_id, 123456789)
+        self.assertIsNone(message.result)
+        self.assertEquals(
+            str(message.failure),
+            'Error message\n'
+            'TRACE HERE'
+        )
+        self.assertIsInstance(message.failure,
+                              oslo_messaging.MessagingException)
+
+
+class PikaOutgoingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._pika_engine = mock.MagicMock()
+        self._exchange = "it is exchange"
+        self._routing_key = "it is routing key"
+        self._expiration = 1
+        self._expiration_time = time.time() + self._expiration
+        self._mandatory = object()
+
+        self._message = {"msg_type": 1, "msg_str": "hello"}
+        self._context = {"request_id": 555, "token": "it is a token"}
+
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_send_with_confirmation(self):
+        message = pika_drv_msg.PikaOutgoingMessage(
+            self._pika_engine, self._message, self._context
+        )
+
+        message.send(
+            exchange=self._exchange,
+            routing_key=self._routing_key,
+            confirm=True,
+            mandatory=self._mandatory,
+            persistent=True,
+            expiration_time=self._expiration_time,
+            retrier=None
+        )
+
+        self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=mock.ANY,
+            exchange=self._exchange, mandatory=self._mandatory,
+            properties=mock.ANY,
+            routing_key=self._routing_key
+        )
+
+        body = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["body"]
+
+        self.assertEqual(
+            b'{"_$_request_id": 555, "_$_token": "it is a token", '
+            b'"msg_str": "hello", "msg_type": 1}',
+            body
+        )
+
+        props = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 2)
+        self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+                        100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertTrue(props.message_id)
+
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_send_without_confirmation(self):
+        message = pika_drv_msg.PikaOutgoingMessage(
+            self._pika_engine, self._message, self._context
+        )
+
+        message.send(
+            exchange=self._exchange,
+            routing_key=self._routing_key,
+            confirm=False,
+            mandatory=self._mandatory,
+            persistent=False,
+            expiration_time=self._expiration_time,
+            retrier=None
+        )
+
+        self._pika_engine.connection_without_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=mock.ANY,
+            exchange=self._exchange, mandatory=self._mandatory,
+            properties=mock.ANY,
+            routing_key=self._routing_key
+        )
+
+        body = self._pika_engine.connection_without_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["body"]
+
+        self.assertEqual(
+            b'{"_$_request_id": 555, "_$_token": "it is a token", '
+            b'"msg_str": "hello", "msg_type": 1}',
+            body
+        )
+
+        props = self._pika_engine.connection_without_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 1)
+        self.assertTrue(self._expiration * 1000 - float(props.expiration)
+                        < 100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertTrue(props.message_id)
+
+
+class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._exchange = "it is exchange"
+        self._routing_key = "it is routing key"
+
+        self._pika_engine = mock.MagicMock()
+        self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
+        self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
+
+        self._message = {"msg_type": 1, "msg_str": "hello"}
+        self._context = {"request_id": 555, "token": "it is a token"}
+
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_send_cast_message(self):
+        message = pika_drv_msg.RpcPikaOutgoingMessage(
+            self._pika_engine, self._message, self._context
+        )
+
+        expiration = 1
+        expiration_time = time.time() + expiration
+
+        message.send(
+            target=oslo_messaging.Target(exchange=self._exchange,
+                                         topic=self._routing_key),
+            reply_listener=None,
+            expiration_time=expiration_time,
+            retrier=None
+        )
+
+        self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=mock.ANY,
+            exchange=self._exchange, mandatory=True,
+            properties=mock.ANY,
+            routing_key=self._routing_key
+        )
+
+        body = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["body"]
+
+        self.assertEqual(
+            b'{"_$_request_id": 555, "_$_token": "it is a token", '
+            b'"msg_str": "hello", "msg_type": 1}',
+            body
+        )
+
+        props = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 1)
+        self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertIsNone(props.correlation_id)
+        self.assertIsNone(props.reply_to)
+        self.assertTrue(props.message_id)
+
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_send_call_message(self):
+        message = pika_drv_msg.RpcPikaOutgoingMessage(
+            self._pika_engine, self._message, self._context
+        )
+
+        expiration = 1
+        expiration_time = time.time() + expiration
+
+        result = "it is a result"
+        reply_queue_name = "reply_queue_name"
+
+        future = futures.Future()
+        future.set_result(result)
+        reply_listener = mock.Mock()
+        reply_listener.register_reply_waiter.return_value = future
+        reply_listener.get_reply_qname.return_value = reply_queue_name
+
+        res = message.send(
+            target=oslo_messaging.Target(exchange=self._exchange,
+                                         topic=self._routing_key),
+            reply_listener=reply_listener,
+            expiration_time=expiration_time,
+            retrier=None
+        )
+
+        self.assertEqual(result, res)
+
+        self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=mock.ANY,
+            exchange=self._exchange, mandatory=True,
+            properties=mock.ANY,
+            routing_key=self._routing_key
+        )
+
+        body = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["body"]
+
+        self.assertEqual(
+            b'{"_$_request_id": 555, "_$_token": "it is a token", '
+            b'"msg_str": "hello", "msg_type": 1}',
+            body
+        )
+
+        props = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 1)
+        self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertEqual(props.correlation_id, message.msg_id)
+        self.assertEquals(props.reply_to, reply_queue_name)
+        self.assertTrue(props.message_id)
+
+
+class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
+    def setUp(self):
+        self._reply_q = "reply_queue_name"
+
+        self._expiration = 1
+        self._expiration_time = time.time() + self._expiration
+
+        self._pika_engine = mock.MagicMock()
+
+        self._rpc_reply_exchange = "rpc_reply_exchange"
+        self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
+
+        self._msg_id = 12345567
+
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_success_message_send(self):
+        message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
+            self._pika_engine, self._msg_id, reply="all_fine"
+        )
+
+        message.send(self._reply_q, expiration_time=self._expiration_time,
+                     retrier=None)
+
+        self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=b'{"s": "all_fine"}',
+            exchange=self._rpc_reply_exchange, mandatory=True,
+            properties=mock.ANY,
+            routing_key=self._reply_q
+        )
+
+        props = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 1)
+        self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+                        100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertEqual(props.correlation_id, message.msg_id)
+        self.assertIsNone(props.reply_to)
+        self.assertTrue(props.message_id)
+
+    @patch("traceback.format_exception", new=lambda x,y,z:z)
+    @patch("oslo_serialization.jsonutils.dumps",
+           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    def test_failure_message_send(self):
+        failure_info = (oslo_messaging.MessagingException,
+                        oslo_messaging.MessagingException("Error message"),
+                        ['It is a trace'])
+
+
+        message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
+            self._pika_engine, self._msg_id, failure_info=failure_info
+        )
+
+        message.send(self._reply_q, expiration_time=self._expiration_time,
+                     retrier=None)
+
+        self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.assert_called_once_with(
+            body=mock.ANY,
+            exchange=self._rpc_reply_exchange,
+            mandatory=True,
+            properties=mock.ANY,
+            routing_key=self._reply_q
+        )
+
+        body = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["body"]
+        self.assertEqual(
+            b'{"e": {"c": "MessagingException", '
+            b'"m": "oslo_messaging.exceptions", "s": "Error message", '
+            b'"t": ["It is a trace"]}}',
+            body
+        )
+
+        props = self._pika_engine.connection_with_confirmation_pool.acquire(
+        ).__enter__().channel.publish.call_args[1]["properties"]
+
+        self.assertEqual(props.content_encoding, 'utf-8')
+        self.assertEqual(props.content_type, 'application/json')
+        self.assertEqual(props.delivery_mode, 1)
+        self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+                        100)
+        self.assertEqual(props.headers, {'version': '1.0'})
+        self.assertEqual(props.correlation_id, message.msg_id)
+        self.assertIsNone(props.reply_to)
+        self.assertTrue(props.message_id)