diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index 8414bbb03..d29131914 100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
@@ -50,6 +50,13 @@ pika_pool_opts = [
                     "staleness. Stale connections are closed on acquire.")
 ]
 
+message_opts = [
+    cfg.StrOpt('default_serializer_type', default='json',
+               choices=('json', 'msgpack'),
+               help="Default serialization mechanism for "
+                    "serializing/deserializing outgoing/incoming messages")
+]
+
 notification_opts = [
     cfg.BoolOpt('notification_persistence', default=False,
                 help="Persist notification messages."),
@@ -133,6 +140,7 @@ class PikaDriver(base.BaseDriver):
         conf.register_group(opt_group)
         conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
         conf.register_opts(pika_pool_opts, group=opt_group)
+        conf.register_opts(message_opts, group=opt_group)
         conf.register_opts(rpc_opts, group=opt_group)
         conf.register_opts(notification_opts, group=opt_group)
 
diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py
index 0737043bd..f5e9086cf 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_commons.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_commons.py
@@ -15,6 +15,8 @@
 import select
 import socket
 
+from oslo_serialization.serializer import json_serializer
+from oslo_serialization.serializer import msgpack_serializer
 from oslo_utils import timeutils
 from pika import exceptions as pika_exceptions
 import six
@@ -31,3 +33,8 @@ PIKA_CONNECTIVITY_ERRORS = (
 EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
 
 INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
+
+MESSAGE_SERIALIZERS = {
+    'application/json': json_serializer.JSONSerializer(),
+    'application/msgpack': msgpack_serializer.MessagePackSerializer()
+}
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index 97b6792d2..b31751f4c 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -137,6 +137,10 @@ class PikaEngine(object):
             raise ValueError("notification_retry_delay should be non-negative "
                              "integer")
 
+        self.default_content_type = (
+            'application/' + conf.oslo_messaging_pika.default_serializer_type
+        )
+
     def _init_if_needed(self):
         cur_pid = os.getpid()
 
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index 2802bedb1..86ede6af1 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -20,7 +20,6 @@ import uuid
 
 from concurrent import futures
 from oslo_log import log as logging
-from oslo_serialization import jsonutils
 from oslo_utils import importutils
 from oslo_utils import timeutils
 from pika import exceptions as pika_exceptions
@@ -101,7 +100,6 @@ class PikaIncomingMessage(base.IncomingMessage):
         self._version = version
 
         self._content_type = properties.content_type
-        self._content_encoding = properties.content_encoding
         self.unique_id = properties.message_id
 
         self.expiration_time = (
@@ -109,15 +107,16 @@ class PikaIncomingMessage(base.IncomingMessage):
             time.time() + float(properties.expiration) / 1000
         )
 
-        if self._content_type != "application/json":
+        try:
+            serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type]
+        except KeyError:
             raise NotImplementedError(
-                "Content-type['{}'] is not valid, "
-                "'application/json' only is supported.".format(
+                "Content-type['{}'] is not supported.".format(
                     self._content_type
                 )
             )
 
-        message_dict = jsonutils.loads(body, encoding=self._content_encoding)
+        message_dict = serializer.load_from_bytes(body)
 
         context_dict = {}
 
@@ -190,7 +189,6 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
         reply_outgoing_message = RpcReplyPikaOutgoingMessage(
             self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
             content_type=self._content_type,
-            content_encoding=self._content_encoding
         )
 
         def on_exception(ex):
@@ -297,8 +295,7 @@ class PikaOutgoingMessage(object):
     and send it
     """
 
-    def __init__(self, pika_engine, message, context,
-                 content_type="application/json", content_encoding="utf-8"):
+    def __init__(self, pika_engine, message, context, content_type=None):
         """Parse RabbitMQ message
 
         :param pika_engine: PikaEngine, shared object with configuration and
@@ -306,19 +303,23 @@ class PikaOutgoingMessage(object):
         :param message: Dictionary, user's message fields
         :param context: Dictionary, request context's fields
         :param content_type: String, content-type header, defines serialization
-            mechanism
-        :param content_encoding: String, defines encoding for text data
+            mechanism, if None default content-type from pika_engine is used
         """
 
         self._pika_engine = pika_engine
 
-        self._content_type = content_type
-        self._content_encoding = content_encoding
+        self._content_type = (
+            content_type if content_type is not None else
+            self._pika_engine.default_content_type
+        )
 
-        if self._content_type != "application/json":
+        try:
+            self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[
+                self._content_type
+            ]
+        except KeyError:
             raise NotImplementedError(
-                "Content-type['{}'] is not valid, "
-                "'application/json' only is supported.".format(
+                "Content-type['{}'] is not supported.".format(
                     self._content_type
                 )
             )
@@ -340,7 +341,6 @@ class PikaOutgoingMessage(object):
                 msg['_$_' + key] = value
 
         props = pika_spec.BasicProperties(
-            content_encoding=self._content_encoding,
             content_type=self._content_type,
             headers={_VERSION_HEADER: _VERSION},
             message_id=self.unique_id,
@@ -447,8 +447,7 @@ class PikaOutgoingMessage(object):
                 if confirm else
                 self._pika_engine.connection_without_confirmation_pool)
 
-        body = jsonutils.dump_as_bytes(msg_dict,
-                                       encoding=self._content_encoding)
+        body = self._serializer.dump_as_bytes(msg_dict)
 
         LOG.debug(
             "Sending message:[body:%s; properties: %s] to target: "
@@ -490,10 +489,9 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
     """PikaOutgoingMessage implementation for RPC messages. It adds
     possibility to wait and receive RPC reply
     """
-    def __init__(self, pika_engine, message, context,
-                 content_type="application/json", content_encoding="utf-8"):
+    def __init__(self, pika_engine, message, context, content_type=None):
         super(RpcPikaOutgoingMessage, self).__init__(
-            pika_engine, message, context, content_type, content_encoding
+            pika_engine, message, context, content_type
         )
         self.msg_id = None
         self.reply_q = None
@@ -549,7 +547,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
     correlation_id AMQP property to link this reply with response
     """
     def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
-                 content_type="application/json", content_encoding="utf-8"):
+                 content_type=None):
         """Initialize with reply information for sending
 
         :param pika_engine: PikaEngine, shared object with configuration and
@@ -559,8 +557,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
         :param failure_info: Tuple, should be a sys.exc_info() tuple.
             Should be None if RPC request was successfully processed.
         :param content_type: String, content-type header, defines serialization
-            mechanism
-        :param content_encoding: String, defines encoding for text data
+            mechanism, if None default content-type from pika_engine is used
         """
         self.msg_id = msg_id
 
@@ -588,7 +585,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
             msg = {'s': reply}
 
         super(RpcReplyPikaOutgoingMessage, self).__init__(
-            pika_engine, msg, None, content_type, content_encoding
+            pika_engine, msg, None, content_type
         )
 
     def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index 9fa87f118..7c373c353 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -51,8 +51,8 @@ _opts = [
     ('oslo_messaging_rabbit', list(
         itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
                         pika_connection_factory.pika_opts,
-                        impl_pika.pika_pool_opts, impl_pika.notification_opts,
-                        impl_pika.rpc_opts))),
+                        impl_pika.pika_pool_opts, impl_pika.message_opts,
+                        impl_pika.notification_opts, impl_pika.rpc_opts))),
 ]
 
 
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
index 5d29c8ab5..354f3a399 100644
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -116,7 +116,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
         )
         self._properties = pika.BasicProperties(
             content_type="application/json",
-            content_encoding="utf-8",
             headers={"version": "1.0"},
         )
 
@@ -197,7 +196,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
 
         outgoing_message_mock.assert_called_once_with(
             self._pika_engine, 123456789, failure_info=None, reply='all_fine',
-            content_encoding='utf-8', content_type='application/json'
+            content_type='application/json'
         )
         outgoing_message_mock().send.assert_called_once_with(
             reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
@@ -236,7 +235,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
             self._pika_engine, 123456789,
             failure_info=failure_info,
             reply=None,
-            content_encoding='utf-8',
             content_type='application/json'
         )
         outgoing_message_mock().send.assert_called_once_with(
@@ -263,7 +261,6 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
 
         self._properties = pika.BasicProperties(
             content_type="application/json",
-            content_encoding="utf-8",
             headers={"version": "1.0"},
             correlation_id=123456789
         )
@@ -311,6 +308,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
 class PikaOutgoingMessageTestCase(unittest.TestCase):
     def setUp(self):
         self._pika_engine = mock.MagicMock()
+        self._pika_engine.default_content_type = "application/json"
         self._exchange = "it is exchange"
         self._routing_key = "it is routing key"
         self._expiration = 1
@@ -322,8 +320,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
         self._message = {"msg_type": 1, "msg_str": "hello"}
         self._context = {"request_id": 555, "token": "it is a token"}
 
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_send_with_confirmation(self):
         message = pika_drv_msg.PikaOutgoingMessage(
             self._pika_engine, self._message, self._context
@@ -359,7 +357,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_with_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(2, props.delivery_mode)
         self.assertTrue(self._expiration * 1000 - float(props.expiration) <
@@ -367,8 +364,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
         self.assertEqual({'version': '1.0'}, props.headers)
         self.assertTrue(props.message_id)
 
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_send_without_confirmation(self):
         message = pika_drv_msg.PikaOutgoingMessage(
             self._pika_engine, self._message, self._context
@@ -404,7 +401,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_without_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(1, props.delivery_mode)
         self.assertTrue(self._expiration * 1000 - float(props.expiration)
@@ -421,12 +417,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
         self._pika_engine = mock.MagicMock()
         self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
         self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
+        self._pika_engine.default_content_type = "application/json"
 
         self._message = {"msg_type": 1, "msg_str": "hello"}
         self._context = {"request_id": 555, "token": "it is a token"}
 
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_send_cast_message(self):
         message = pika_drv_msg.RpcPikaOutgoingMessage(
             self._pika_engine, self._message, self._context
@@ -463,7 +460,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_with_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(1, props.delivery_mode)
         self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
@@ -472,8 +468,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
         self.assertIsNone(props.reply_to)
         self.assertTrue(props.message_id)
 
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_send_call_message(self):
         message = pika_drv_msg.RpcPikaOutgoingMessage(
             self._pika_engine, self._message, self._context
@@ -521,7 +517,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_with_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(1, props.delivery_mode)
         self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
@@ -544,11 +539,12 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
 
         self._rpc_reply_exchange = "rpc_reply_exchange"
         self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
+        self._pika_engine.default_content_type = "application/json"
 
         self._msg_id = 12345567
 
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_success_message_send(self):
         message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
             self._pika_engine, self._msg_id, reply="all_fine"
@@ -567,7 +563,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_with_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(1, props.delivery_mode)
         self.assertTrue(self._expiration * 1000 - float(props.expiration) <
@@ -578,8 +573,8 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
         self.assertTrue(props.message_id)
 
     @patch("traceback.format_exception", new=lambda x, y, z: z)
-    @patch("oslo_serialization.jsonutils.dumps",
-           new=functools.partial(jsonutils.dumps, sort_keys=True))
+    @patch("oslo_serialization.jsonutils.dump_as_bytes",
+           new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
     def test_failure_message_send(self):
         failure_info = (oslo_messaging.MessagingException,
                         oslo_messaging.MessagingException("Error message"),
@@ -612,7 +607,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
         props = self._pika_engine.connection_with_confirmation_pool.acquire(
         ).__enter__().channel.publish.call_args[1]["properties"]
 
-        self.assertEqual('utf-8', props.content_encoding)
         self.assertEqual('application/json', props.content_type)
         self.assertEqual(1, props.delivery_mode)
         self.assertTrue(self._expiration * 1000 - float(props.expiration) <