From 53ead5c11d8b570a9d6e025e13db72526f966a87 Mon Sep 17 00:00:00 2001
From: Kenneth Giusti <kgiusti@gmail.com>
Date: Thu, 5 Jan 2017 15:11:35 -0500
Subject: [PATCH] [AMQP 1.0] Fix SSL client authentication

The driver incorrectly set up SSL client authentication.  This patch
fixes the configuration. Unit tests added.

Change-Id: I64b1736f5a708c70013d9fedba73da4fa8d9ccfb
Closes-Bug: #1606985
---
 .../_drivers/amqp1_driver/controller.py       |   5 -
 oslo_messaging/_drivers/amqp1_driver/opts.py  |  12 +-
 .../tests/drivers/test_amqp_driver.py         | 165 +++++++++++++++++-
 3 files changed, 172 insertions(+), 10 deletions(-)

diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 56fd97708..6bfe92004 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -978,14 +978,9 @@ class Controller(pyngus.ConnectionEventHandler):
         if self.ssl_ca_file:
             conn_props["x-ssl-ca-file"] = self.ssl_ca_file
         if self.ssl_cert_file:
-            # assume this connection is for a server.  If client authentication
-            # support is developed, we'll need an explicit flag (server or
-            # client)
-            conn_props["x-ssl-server"] = True
             conn_props["x-ssl-identity"] = (self.ssl_cert_file,
                                             self.ssl_key_file,
                                             self.ssl_key_password)
-            conn_props["x-ssl-allow-cleartext"] = self.ssl_allow_insecure
         # SASL configuration:
         if self.sasl_mechanisms:
             conn_props["x-sasl-mechs"] = self.sasl_mechanisms
diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py
index 127dd0d6c..e0943bf7d 100644
--- a/oslo_messaging/_drivers/amqp1_driver/opts.py
+++ b/oslo_messaging/_drivers/amqp1_driver/opts.py
@@ -34,17 +34,20 @@ amqp1_opts = [
     cfg.StrOpt('ssl_ca_file',
                default='',
                deprecated_group='amqp1',
-               help="CA certificate PEM file to verify server certificate"),
+               help="CA certificate PEM file used to verify the server's"
+               ' certificate'),
 
     cfg.StrOpt('ssl_cert_file',
                default='',
                deprecated_group='amqp1',
-               help='Identifying certificate PEM file to present to clients'),
+               help='Self-identifying certificate PEM file'
+               ' for client authentication'),
 
     cfg.StrOpt('ssl_key_file',
                default='',
                deprecated_group='amqp1',
-               help='Private key PEM file used to sign cert_file certificate'),
+               help='Private key PEM file used to sign ssl_cert_file'
+               ' certificate (optional)'),
 
     cfg.StrOpt('ssl_key_password',
                deprecated_group='amqp1',
@@ -54,6 +57,9 @@ amqp1_opts = [
     cfg.BoolOpt('allow_insecure_clients',
                 default=False,
                 deprecated_group='amqp1',
+                # marked as deprecated in Ocata
+                deprecated_for_removal=True,
+                deprecated_reason="Not applicable - not a SSL server",
                 help='Accept clients using either SSL or plain TCP'),
 
     cfg.StrOpt('sasl_mechanisms',
diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py
index 30b796b32..b6d00d8e9 100644
--- a/oslo_messaging/tests/drivers/test_amqp_driver.py
+++ b/oslo_messaging/tests/drivers/test_amqp_driver.py
@@ -15,10 +15,12 @@
 import logging
 import os
 import select
+import shlex
 import shutil
 import socket
 import subprocess
 import sys
+import tempfile
 import threading
 import time
 import uuid
@@ -49,6 +51,8 @@ if pyngus:
 _proton = importutils.try_import("proton")
 CYRUS_ENABLED = (pyngus and pyngus.VERSION >= (2, 0, 0) and _proton
                  and getattr(_proton.SASL, "extended", lambda: False)())
+# same with SSL
+SSL_ENABLED = (_proton and getattr(_proton.SSL, "present", lambda: False)())
 
 LOG = logging.getLogger(__name__)
 
@@ -1449,6 +1453,148 @@ class TestMessageRetransmit(_AmqpBrokerTestCase):
                           lambda l, h: l.message_rejected(h, {}))
 
 
+@testtools.skipUnless(SSL_ENABLED, "OpenSSL not supported")
+class TestSSL(test_utils.BaseTestCase):
+    """Test the driver's OpenSSL integration"""
+
+    def setUp(self):
+        super(TestSSL, self).setUp()
+        # Create the CA, server, and client SSL certificates:
+        self._tmpdir = tempfile.mkdtemp(prefix='amqp1')
+        files = ['ca_key', 'ca_cert', 's_key', 's_req', 's_cert', 'c_key',
+                 'c_req', 'c_cert', 'bad_cert', 'bad_req', 'bad_key']
+        conf = dict(zip(files, [os.path.join(self._tmpdir, "%s.pem" % f)
+                                for f in files]))
+        conf['pw'] = 'password'
+        conf['s_name'] = '127.0.0.1'
+        conf['c_name'] = 'client.com'
+        self._ssl_config = conf
+        ssl_setup = [
+            # create self-signed CA key and certificate:
+            Template('openssl genrsa -out ${ca_key} 2048').substitute(conf),
+            Template('openssl req -x509 -key ${ca_key} -subj'
+                     ' "/CN=Trusted.CA.com" -out'
+                     ' ${ca_cert}').substitute(conf),
+            # create Server key and certificate:
+            Template('openssl genrsa -out ${s_key} 2048').substitute(conf),
+            Template('openssl req -new -key ${s_key} -subj /CN=${s_name}'
+                     ' -passin pass:${pw} -out ${s_req}').substitute(conf),
+            Template('openssl x509 -req -in ${s_req} -CA ${ca_cert}'
+                     ' -CAkey ${ca_key} -CAcreateserial -out'
+                     ' ${s_cert}').substitute(conf),
+            # create a "bad" Server cert for testing CN validation:
+            Template('openssl genrsa -out ${bad_key} 2048').substitute(conf),
+            Template('openssl req -new -key ${bad_key} -subj /CN=Invalid'
+                     ' -passin pass:${pw} -out ${bad_req}').substitute(conf),
+            Template('openssl x509 -req -in ${bad_req} -CA ${ca_cert}'
+                     ' -CAkey ${ca_key} -CAcreateserial -out'
+                     ' ${bad_cert}').substitute(conf),
+            # create Client key and certificate for client authentication:
+            Template('openssl genrsa -out ${c_key} 2048').substitute(conf),
+            Template('openssl req -new -key ${c_key} -subj /CN=${c_name}'
+                     ' -passin pass:${pw} -out'
+                     ' ${c_req}').substitute(conf),
+            Template('openssl x509 -req -in ${c_req} -CA ${ca_cert}'
+                     ' -CAkey ${ca_key} -CAcreateserial -out'
+                     ' ${c_cert}').substitute(conf)
+        ]
+        for cmd in ssl_setup:
+            try:
+                subprocess.check_call(args=shlex.split(cmd))
+            except Exception:
+                shutil.rmtree(self._tmpdir, ignore_errors=True)
+                self._tmpdir = None
+                self.skipTest("OpenSSL tools not installed - skipping")
+
+    def test_server_ok(self):
+        # test client authenticates server
+        self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
+                                  sock_addr=self._ssl_config['s_name'],
+                                  ssl_config=self._ssl_config)
+        url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
+                                                (self._broker.host,
+                                                 self._broker.port))
+        self._broker.start()
+
+        self.config(ssl_ca_file=self._ssl_config['ca_cert'],
+                    group='oslo_messaging_amqp')
+        driver = amqp_driver.ProtonDriver(self.conf, url)
+        target = oslo_messaging.Target(topic="test-topic")
+        listener = _ListenerThread(
+            driver.listen(target, None, None)._poll_style_listener, 1)
+
+        driver.send(target,
+                    {"context": "whatever"},
+                    {"method": "echo", "a": "b"},
+                    wait_for_reply=True,
+                    timeout=30)
+        listener.join(timeout=30)
+        self.assertFalse(listener.isAlive())
+        driver.cleanup()
+
+    def test_bad_server_fail(self):
+        # test client does not connect to invalid server
+        self._ssl_config['s_cert'] = self._ssl_config['bad_cert']
+        self._ssl_config['s_key'] = self._ssl_config['bad_key']
+        self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
+                                  sock_addr=self._ssl_config['s_name'],
+                                  ssl_config=self._ssl_config)
+        url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
+                                                (self._broker.host,
+                                                 self._broker.port))
+        self._broker.start()
+
+        self.config(ssl_ca_file=self._ssl_config['ca_cert'],
+                    group='oslo_messaging_amqp')
+        driver = amqp_driver.ProtonDriver(self.conf, url)
+        target = oslo_messaging.Target(topic="test-topic")
+        self.assertRaises(oslo_messaging.MessageDeliveryFailure,
+                          driver.send, target,
+                          {"context": "whatever"},
+                          {"method": "echo", "a": "b"},
+                          wait_for_reply=False,
+                          retry=1)
+        driver.cleanup()
+
+    def test_client_auth_ok(self):
+        # test server authenticates client
+        self._ssl_config['authenticate_client'] = True
+        self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
+                                  sock_addr=self._ssl_config['s_name'],
+                                  ssl_config=self._ssl_config)
+        url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
+                                                (self._broker.host,
+                                                 self._broker.port))
+        self._broker.start()
+
+        self.config(ssl_ca_file=self._ssl_config['ca_cert'],
+                    ssl_cert_file=self._ssl_config['c_cert'],
+                    ssl_key_file=self._ssl_config['c_key'],
+                    ssl_key_password=self._ssl_config['pw'],
+                    group='oslo_messaging_amqp')
+        driver = amqp_driver.ProtonDriver(self.conf, url)
+        target = oslo_messaging.Target(topic="test-topic")
+        listener = _ListenerThread(
+            driver.listen(target, None, None)._poll_style_listener, 1)
+
+        driver.send(target,
+                    {"context": "whatever"},
+                    {"method": "echo", "a": "b"},
+                    wait_for_reply=True,
+                    timeout=30)
+        listener.join(timeout=30)
+        self.assertFalse(listener.isAlive())
+        driver.cleanup()
+
+    def tearDown(self):
+        if self._broker:
+            self._broker.stop()
+            self._broker = None
+        if self._tmpdir:
+            shutil.rmtree(self._tmpdir, ignore_errors=True)
+        super(TestSSL, self).tearDown()
+
+
 class FakeBroker(threading.Thread):
     """A test AMQP message 'broker'."""
 
@@ -1466,6 +1612,7 @@ class FakeBroker(threading.Thread):
                 self.sasl_mechanisms = sasl_mechanisms
                 self.user_credentials = user_credentials
                 properties = {'x-server': True}
+                # setup SASL:
                 if self.sasl_mechanisms:
                     properties['x-sasl-mechs'] = self.sasl_mechanisms
                     if "ANONYMOUS" not in self.sasl_mechanisms:
@@ -1474,6 +1621,19 @@ class FakeBroker(threading.Thread):
                     properties['x-sasl-config-dir'] = sasl_config_dir
                 if sasl_config_name:
                     properties['x-sasl-config-name'] = sasl_config_name
+                # setup SSL
+                if self.server._ssl_config:
+                    ssl = self.server._ssl_config
+                    properties['x-ssl-server'] = True
+                    properties['x-ssl-identity'] = (ssl['s_cert'],
+                                                    ssl['s_key'],
+                                                    ssl['pw'])
+                    # check for client authentication
+                    if ssl.get('authenticate_client'):
+                        properties['x-ssl-ca-file'] = ssl['ca_cert']
+                        properties['x-ssl-verify-mode'] = 'verify-peer'
+                        properties['x-ssl-peer-name'] = ssl['c_name']
+                # misc connection properties
                 if product:
                     properties['properties'] = {'product': product}
 
@@ -1681,17 +1841,18 @@ class FakeBroker(threading.Thread):
                  sasl_mechanisms="ANONYMOUS",
                  user_credentials=None,
                  sasl_config_dir=None,
-                 sasl_config_name=None):
+                 sasl_config_name=None,
+                 ssl_config=None):
         """Create a fake broker listening on sock_addr:sock_port."""
         if not pyngus:
             raise AssertionError("pyngus module not present")
         threading.Thread.__init__(self)
-        self._config = cfg
         self._product = product
         self._sasl_mechanisms = sasl_mechanisms
         self._sasl_config_dir = sasl_config_dir
         self._sasl_config_name = sasl_config_name
         self._user_credentials = user_credentials
+        self._ssl_config = ssl_config
         self._wakeup_pipe = os.pipe()
         self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self._my_socket.bind((sock_addr, sock_port))