From f9de265f392dca5386491120593d57b124342c5f Mon Sep 17 00:00:00 2001 From: John Eckersberg Date: Wed, 4 Aug 2021 14:57:30 -0400 Subject: [PATCH] amqp1: Do not reuse _socket_connection on reconnect Each _SocketConnection object is unique per-peer. For example, the properties attribute may contain keys such as 'x-ssl-peer-name'. Reusing the existing _socket_connection during failover will cause the TLS handshake to fail since the peer name will not match. There is potential for other similar-yet-unexplored bad things to happen as well. Instead, reconnect by waking up the eventloop via the _do_reconnect method, which reconstructs the connection properties to reflect the new (failed-over-to) host and ultimately crates a new _SocketConnection (or re-uses a *valid* old one) in eventloop.Thread.connect(). Closes-Bug: #1938945 Change-Id: I0c8dc447f4dc8d0d08c312a1f3e6fa1745fb69fd --- .../_drivers/amqp1_driver/controller.py | 2 +- .../tests/drivers/test_amqp_driver.py | 43 ++++++++++++++++--- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 5451cabf0..bba7228c9 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -1265,7 +1265,7 @@ class Controller(pyngus.ConnectionEventHandler): host = self.hosts.next() LOG.info("Reconnecting to: %(hostname)s:%(port)s", {'hostname': host.hostname, 'port': host.port}) - self._socket_connection.connect(host) + self.processor.wakeup(lambda: self._do_connect()) def _hard_reset(self, reason): """Reset the controller to its pre-connection state""" diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index a167a6478..f58a40d79 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -1553,33 +1553,42 @@ class TestMessageRetransmit(_AmqpBrokerTestCase): @testtools.skipUnless(SSL_ENABLED, "OpenSSL not supported") -class TestSSL(test_utils.BaseTestCase): +class TestSSL(TestFailover): """Test the driver's OpenSSL integration""" def setUp(self): - super(TestSSL, self).setUp() + self._broker = None # Create the CA, server, and client SSL certificates: self._tmpdir = tempfile.mkdtemp(prefix='amqp1') - files = ['ca_key', 'ca_cert', 's_key', 's_req', 's_cert', 'c_key', - 'c_req', 'c_cert', 'bad_cert', 'bad_req', 'bad_key'] + files = ['ca_key', 'ca_cert', 's_key', 's_req', 's_cert', 's2_key', + 's2_req', 's2_cert', 'c_key', 'c_req', 'c_cert', 'bad_cert', + 'bad_req', 'bad_key'] conf = dict(zip(files, [os.path.join(self._tmpdir, "%s.pem" % f) for f in files])) conf['pw'] = 'password' conf['s_name'] = '127.0.0.1' + conf['s2_name'] = '127.0.0.2' conf['c_name'] = 'client.com' + self._ssl_config = conf ssl_setup = [ # create self-signed CA certificate: Template('openssl req -x509 -nodes -newkey rsa:2048' ' -subj "/CN=Trusted.CA.com" -keyout ${ca_key}' ' -out ${ca_cert}').substitute(conf), - # create Server key and certificate: + # create Server keys and certificates: Template('openssl genrsa -out ${s_key} 2048').substitute(conf), Template('openssl req -new -key ${s_key} -subj /CN=${s_name}' ' -passin pass:${pw} -out ${s_req}').substitute(conf), Template('openssl x509 -req -in ${s_req} -CA ${ca_cert}' ' -CAkey ${ca_key} -CAcreateserial -out' ' ${s_cert}').substitute(conf), + Template('openssl genrsa -out ${s2_key} 2048').substitute(conf), + Template('openssl req -new -key ${s2_key} -subj /CN=${s2_name}' + ' -passin pass:${pw} -out ${s2_req}').substitute(conf), + Template('openssl x509 -req -in ${s2_req} -CA ${ca_cert}' + ' -CAkey ${ca_key} -CAcreateserial -out' + ' ${s2_cert}').substitute(conf), # create a "bad" Server cert for testing CN validation: Template('openssl genrsa -out ${bad_key} 2048').substitute(conf), Template('openssl req -new -key ${bad_key} -subj /CN=Invalid' @@ -1604,10 +1613,30 @@ class TestSSL(test_utils.BaseTestCase): self._tmpdir = None self.skipTest("OpenSSL tools not installed - skipping") - def _ssl_server_ok(self, url): - self._broker.start() + super(TestSSL, self).setUp() + self.config(ssl_ca_file=self._ssl_config['ca_cert'], group='oslo_messaging_amqp') + + def _gen_brokers(self): + s2_conf = self._ssl_config.copy() + for item in ['name', 'key', 'req', 'cert']: + s2_conf["s_%s" % item] = s2_conf["s2_%s" % item] + + return [FakeBroker(self.conf.oslo_messaging_amqp, + sock_addr=self._ssl_config['s_name'], + ssl_config=self._ssl_config), + FakeBroker(self.conf.oslo_messaging_amqp, + sock_addr=s2_conf['s_name'], + ssl_config=s2_conf)] + + def _gen_transport_url(self, hosts): + url = "amqp://%s" % (",".join(map(lambda x: "%s:%d" % + (x.hostname, x.port), hosts))) + return oslo_messaging.TransportURL.parse(self.conf, url) + + def _ssl_server_ok(self, url): + self._broker.start() tport_url = oslo_messaging.TransportURL.parse(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, tport_url) target = oslo_messaging.Target(topic="test-topic")