From 992c7ec5b0d27e6a188d8b320e6d9ad46d92ebfa Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 21 Jul 2015 15:31:24 -0400 Subject: [PATCH] Port the AMQP1 driver to new Pyngus SASL API Pyngus 2.0 includes a new API for configuring SASL credentials. Previous versions of Pyngus did not provide this API - the driver had to invoke Proton APIs in order to configure user credentials. Moving to the Pyngus API will preserve compatibility with older versions of Proton, since the next release of Proton wil be changing its SASL API. Pyngus 2.0 also adds strict enforcement of callback re-entrancy constrants. This patch fixes some bad driver reentrancy violations. Closes-bug: #1473515 Change-Id: Iddccefd3ee3c9092c086fc54e3810f78d5df9338 --- .../_drivers/protocols/amqp/controller.py | 32 ++--- .../_drivers/protocols/amqp/eventloop.py | 86 ++++++----- oslo_messaging/tests/test_amqp_driver.py | 133 ++++++++++++++++-- setup-test-env-qpid.sh | 44 +++++- tox.ini | 4 +- 5 files changed, 228 insertions(+), 71 deletions(-) diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index 8d949ed2e..24b601c30 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -575,15 +575,16 @@ class Controller(pyngus.ConnectionEventHandler): self._socket_connection.connection.close() def sasl_done(self, connection, pn_sasl, outcome): - """This is a Pyngus callback invoked by Pyngus when the SASL handshake - has completed. The outcome of the handshake will be OK on success or - AUTH on failure. + """This is a Pyngus callback invoked when the SASL handshake + has completed. The outcome of the handshake is passed in the outcome + argument. """ - if outcome == proton.SASL.AUTH: - LOG.error("Unable to connect to %s:%s, authentication failure.", - self.hosts.current.hostname, self.hosts.current.port) - # requires user intervention, treat it like a connection failure: - self._handle_connection_loss() + if outcome == proton.SASL.OK: + return + LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s", + self.hosts.current.hostname, self.hosts.current.port, + self.hosts.current.username) + # connection failure will be handled later def _complete_shutdown(self): """The AMQP Connection has closed, and the driver shutdown is complete. @@ -607,19 +608,18 @@ class Controller(pyngus.ConnectionEventHandler): if not self._reconnecting: self._reconnecting = True self._replies = None - if self._delay == 0: - self._delay = 1 - self._do_reconnect() - else: - d = self._delay - LOG.info("delaying reconnect attempt for %d seconds", d) - self.processor.schedule(lambda: self._do_reconnect(), d) - self._delay = min(d * 2, 60) + d = self._delay + LOG.info("delaying reconnect attempt for %d seconds", d) + self.processor.schedule(lambda: self._do_reconnect(), d) + self._delay = 1 if self._delay == 0 else min(d * 2, 60) def _do_reconnect(self): """Invoked on connection/socket failure, failover and re-connect to the messaging service. """ + # note well: since this method destroys the connection, it cannot be + # invoked directly from a pyngus callback. Use processor.schedule() to + # run this method on the main loop instead. if not self._closing: self._reconnecting = False self._senders = {} diff --git a/oslo_messaging/_drivers/protocols/amqp/eventloop.py b/oslo_messaging/_drivers/protocols/amqp/eventloop.py index 193f26294..e8fee7d1f 100644 --- a/oslo_messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo_messaging/_drivers/protocols/amqp/eventloop.py @@ -54,9 +54,7 @@ class _SocketConnection(object): # Currently it is the Controller object. self._handler = handler self._container = container - c = container.create_connection(name, handler, self._properties) - c.user_context = self - self.connection = c + self.connection = None def _get_name_and_pid(self): # helps identify the process that is using the connection @@ -72,37 +70,31 @@ class _SocketConnection(object): while True: try: rc = pyngus.read_socket_input(self.connection, self.socket) - if rc > 0: - self.connection.process(time.time()) + self.connection.process(time.time()) return rc - except socket.error as e: - if e.errno == errno.EAGAIN or e.errno == errno.EINTR: - continue - elif e.errno == errno.EWOULDBLOCK: - return 0 - else: - self._handler.socket_error(str(e)) - return pyngus.Connection.EOS + except (socket.timeout, socket.error) as e: + # pyngus handles EAGAIN/EWOULDBLOCK and EINTER + self.connection.close_input() + self.connection.close() + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS def write(self): """Called when socket is write-ready.""" while True: try: rc = pyngus.write_socket_output(self.connection, self.socket) - if rc > 0: - self.connection.process(time.time()) + self.connection.process(time.time()) return rc - except socket.error as e: - if e.errno == errno.EAGAIN or e.errno == errno.EINTR: - continue - elif e.errno == errno.EWOULDBLOCK: - return 0 - else: - self._handler.socket_error(str(e)) - return pyngus.Connection.EOS + except (socket.timeout, socket.error) as e: + # pyngus handles EAGAIN/EWOULDBLOCK and EINTER + self.connection.close_output() + self.connection.close() + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS def connect(self, host): - """Connect to host:port and start the AMQP protocol.""" + """Connect to host and start the AMQP protocol.""" addr = socket.getaddrinfo(host.hostname, host.port, socket.AF_INET, socket.SOCK_STREAM) if not addr: @@ -124,31 +116,46 @@ class _SocketConnection(object): return self.socket = my_socket - # determine the proper SASL mechanism: PLAIN if a username/password is - # present, else ANONYMOUS - pn_sasl = self.connection.pn_sasl - if host.username: - password = host.password if host.password else "" - pn_sasl.plain(host.username, password) - else: - pn_sasl.mechanisms("ANONYMOUS") - # TODO(kgiusti): server if accepting inbound connections - pn_sasl.client() + props = self._properties.copy() + if pyngus.VERSION >= (2, 0, 0): + # configure client authentication + # + props['x-server'] = False + if host.username: + props['x-username'] = host.username + props['x-password'] = host.password or "" + + c = self._container.create_connection(self.name, self._handler, props) + c.user_context = self + self.connection = c + + if pyngus.VERSION < (2, 0, 0): + # older versions of pyngus requires manual SASL configuration: + # determine the proper SASL mechanism: PLAIN if a username/password + # is present, else ANONYMOUS + pn_sasl = self.connection.pn_sasl + if host.username: + password = host.password if host.password else "" + pn_sasl.plain(host.username, password) + else: + pn_sasl.mechanisms("ANONYMOUS") + # TODO(kgiusti): server if accepting inbound connections + pn_sasl.client() + self.connection.open() def reset(self, name=None): """Clean up the current state, expect 'connect()' to be recalled later. """ + # note well: since destroy() is called on the connection, do not invoke + # this method from a pyngus callback! if self.connection: self.connection.destroy() + self.connection = None self.close() if name: self.name = name - c = self._container.create_connection(self.name, self._handler, - self._properties) - c.user_context = self - self.connection = c def close(self): if self.socket: @@ -325,7 +332,6 @@ class Thread(threading.Thread): for r in readable: r.read() - self._schedule.process() # run any deferred requests for t in timers: if t.deadline > time.time(): break @@ -334,6 +340,8 @@ class Thread(threading.Thread): for w in writable: w.write() + self._schedule.process() # run any deferred requests + LOG.info("eventloop thread exiting, container=%s", self._container.name) self._container.destroy() diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index 379033d95..8e39a3ad6 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -15,13 +15,17 @@ import logging import os import select +import shutil import socket +import subprocess +import tempfile import threading import time import uuid from oslo_utils import importutils from six import moves +from string import Template import testtools import oslo_messaging @@ -295,9 +299,10 @@ class TestAmqpNotification(_AmqpBrokerTestCase): driver.cleanup() -@testtools.skipUnless(pyngus, "proton modules not present") +@testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0), + "pyngus module not present") class TestAuthentication(test_utils.BaseTestCase): - + """Test user authentication using the old pyngus API""" def setUp(self): super(TestAuthentication, self).setUp() # for simplicity, encode the credentials as they would appear 'on the @@ -349,6 +354,89 @@ class TestAuthentication(test_utils.BaseTestCase): driver.cleanup() +@testtools.skipUnless(pyngus and pyngus.VERSION >= (2, 0, 0), + "pyngus module not present") +class TestCyrusAuthentication(test_utils.BaseTestCase): + """Test the driver's Cyrus SASL integration""" + + def setUp(self): + """Create a simple SASL configuration. This assumes saslpasswd2 is in + the OS path, otherwise the test will be skipped. + """ + super(TestCyrusAuthentication, self).setUp() + # Create a SASL configuration and user database, + # add a user 'joe' with password 'secret': + self._conf_dir = tempfile.mkdtemp() + db = os.path.join(self._conf_dir, 'openstack.sasldb') + _t = "echo secret | saslpasswd2 -c -p -f ${db} joe" + cmd = Template(_t).substitute(db=db) + try: + subprocess.call(args=cmd, shell=True) + except Exception: + shutil.rmtree(self._conf_dir, ignore_errors=True) + self._conf_dir = None + raise self.SkipTest("Cyrus tool saslpasswd2 not installed") + + # configure the SASL broker: + conf = os.path.join(self._conf_dir, 'openstack.conf') + mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN" + t = Template("""sasldb_path: ${db} +mech_list: ${mechs} +""") + with open(conf, 'w') as f: + f.write(t.substitute(db=db, mechs=mechs)) + + self._broker = FakeBroker(sasl_mechanisms=mechs, + user_credentials=["\0joe\0secret"], + sasl_config_dir=self._conf_dir, + sasl_config_name="openstack") + self._broker.start() + + def tearDown(self): + super(TestCyrusAuthentication, self).tearDown() + if self._broker: + self._broker.stop() + if self._conf_dir: + shutil.rmtree(self._conf_dir, ignore_errors=True) + + def test_authentication_ok(self): + """Verify that username and password given in TransportHost are + accepted by the broker. + """ + + addr = "amqp://joe:secret@%s:%d" % (self._broker.host, + self._broker.port) + url = oslo_messaging.TransportURL.parse(self.conf, addr) + driver = amqp_driver.ProtonDriver(self.conf, url) + target = oslo_messaging.Target(topic="test-topic") + listener = _ListenerThread(driver.listen(target), 1) + rc = driver.send(target, {"context": True}, + {"method": "echo"}, wait_for_reply=True) + self.assertIsNotNone(rc) + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + driver.cleanup() + + def test_authentication_failure(self): + """Verify that a bad password given in TransportHost is + rejected by the broker. + """ + + addr = "amqp://joe:badpass@%s:%d" % (self._broker.host, + self._broker.port) + url = oslo_messaging.TransportURL.parse(self.conf, addr) + driver = amqp_driver.ProtonDriver(self.conf, url) + target = oslo_messaging.Target(topic="test-topic") + _ListenerThread(driver.listen(target), 1) + self.assertRaises(oslo_messaging.MessagingTimeout, + driver.send, + target, {"context": True}, + {"method": "echo"}, + wait_for_reply=True, + timeout=2.0) + driver.cleanup() + + @testtools.skipUnless(pyngus, "proton modules not present") class TestFailover(test_utils.BaseTestCase): @@ -429,19 +517,33 @@ class FakeBroker(threading.Thread): """A single AMQP connection.""" def __init__(self, server, socket_, name, - sasl_mechanisms, user_credentials): + sasl_mechanisms, user_credentials, + sasl_config_dir, sasl_config_name): """Create a Connection using socket_.""" self.socket = socket_ self.name = name self.server = server - self.connection = server.container.create_connection(name, - self) - self.connection.user_context = self self.sasl_mechanisms = sasl_mechanisms self.user_credentials = user_credentials - if sasl_mechanisms: - self.connection.pn_sasl.mechanisms(sasl_mechanisms) - self.connection.pn_sasl.server() + properties = {'x-server': True} + if self.sasl_mechanisms: + properties['x-sasl-mechs'] = self.sasl_mechanisms + if "ANONYMOUS" not in self.sasl_mechanisms: + properties['x-require-auth'] = True + if sasl_config_dir: + properties['x-sasl-config-dir'] = sasl_config_dir + if sasl_config_name: + properties['x-sasl-config-name'] = sasl_config_name + + self.connection = server.container.create_connection( + name, self, properties) + self.connection.user_context = self + if pyngus.VERSION < (2, 0, 0): + # older versions of pyngus don't recognize the sasl + # connection properties, so configure them manually: + if sasl_mechanisms: + self.connection.pn_sasl.mechanisms(sasl_mechanisms) + self.connection.pn_sasl.server() self.connection.open() self.sender_links = set() self.closed = False @@ -506,7 +608,8 @@ class FakeBroker(threading.Thread): link_handle, addr) def sasl_step(self, connection, pn_sasl): - if self.sasl_mechanisms == 'PLAIN': + # only called if not using Cyrus SASL + if 'PLAIN' in self.sasl_mechanisms: credentials = pn_sasl.recv() if not credentials: return # wait until some arrives @@ -592,7 +695,9 @@ class FakeBroker(threading.Thread): address_separator=".", sock_addr="", sock_port=0, sasl_mechanisms="ANONYMOUS", - user_credentials=None): + user_credentials=None, + sasl_config_dir=None, + sasl_config_name=None): """Create a fake broker listening on sock_addr:sock_port.""" if not pyngus: raise AssertionError("pyngus module not present") @@ -602,6 +707,8 @@ class FakeBroker(threading.Thread): self._group_prefix = group_prefix + address_separator self._address_separator = address_separator self._sasl_mechanisms = sasl_mechanisms + self._sasl_config_dir = sasl_config_dir + self._sasl_config_name = sasl_config_name self._user_credentials = user_credentials self._wakeup_pipe = os.pipe() self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -664,7 +771,9 @@ class FakeBroker(threading.Thread): name = str(client_address) conn = FakeBroker.Connection(self, client_socket, name, self._sasl_mechanisms, - self._user_credentials) + self._user_credentials, + self._sasl_config_dir, + self._sasl_config_name) self._connections[conn.name] = conn elif r is self._wakeup_pipe[0]: os.read(self._wakeup_pipe[0], 512) diff --git a/setup-test-env-qpid.sh b/setup-test-env-qpid.sh index 6efe8f388..9c0e38d5e 100755 --- a/setup-test-env-qpid.sh +++ b/setup-test-env-qpid.sh @@ -1,4 +1,8 @@ #!/bin/bash +# +# Usage: setup-test-env-qpid.sh PROTOCOL +# where PROTOCOL is the version of the AMQP protocol to use with +# qpidd. Valid values for PROTOCOL are "1", "1.0", "0-10", "0.10" set -e # require qpidd, qpid-tools sasl2-bin/cyrus-sasl-plain+cyrus-sasl-lib @@ -8,6 +12,34 @@ set -e DATADIR=$(mktemp -d /tmp/OSLOMSG-QPID.XXXXX) trap "clean_exit $DATADIR" EXIT +QPIDD=$(which qpidd 2>/dev/null) + +# which protocol should be used with qpidd? +# 1 for AMQP 1.0, 0.10 for AMQP 0.10 +# +PROTOCOL=$1 +case $PROTOCOL in + "1" | "1.0") + PROTOCOL="1" + shift + ;; + "0.10" | "0-10") + PROTOCOL="0-10" + shift + ;; + *) + # assume the old protocol + echo "No protocol specified, assuming 0.10" + PROTOCOL="0-10" + ;; +esac + +# ensure that the version of qpidd does support AMQP 1.0 +if [ $PROTOCOL == "1" ] && ! `$QPIDD --help | grep -q "queue-patterns"`; then + echo "This version of $QPIDD does not support AMQP 1.0" + exit 1 +fi + [ -f "/usr/lib/qpid/daemon/acl.so" ] && LIBACL="load-module=/usr/lib/qpid/daemon/acl.so" cat > ${DATADIR}/qpidd.conf <> ${DATADIR}/qpidd.conf <> ${DATADIR}/qpidd.conf < ${DATADIR}/qpidd.acl </dev/null) - mkfifo ${DATADIR}/out $QPIDD --log-enable info+ --log-to-file ${DATADIR}/out --config ${DATADIR}/qpidd.conf & wait_for_line "Broker .*running" "error" ${DATADIR}/out diff --git a/tox.ini b/tox.ini index 92cdc4b93..7cf9b2bc5 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = python setup.py build_sphinx [testenv:py27-func-qpid] setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123// -commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-rabbit] commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' @@ -34,7 +34,7 @@ commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest [testenv:py27-func-amqp1] setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// # NOTE(flaper87): This gate job run on fedora21 for now. -commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-zeromq] commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'