diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index 6c61663af..d259eb968 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -575,15 +575,16 @@ class Controller(pyngus.ConnectionEventHandler): self._socket_connection.connection.close() def sasl_done(self, connection, pn_sasl, outcome): - """This is a Pyngus callback invoked by Pyngus when the SASL handshake - has completed. The outcome of the handshake will be OK on success or - AUTH on failure. + """This is a Pyngus callback invoked when the SASL handshake + has completed. The outcome of the handshake is passed in the outcome + argument. """ - if outcome == proton.SASL.AUTH: - LOG.error("Unable to connect to %s:%s, authentication failure.", - self.hosts.current.hostname, self.hosts.current.port) - # requires user intervention, treat it like a connection failure: - self._handle_connection_loss() + if outcome == proton.SASL.OK: + return + LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s", + self.hosts.current.hostname, self.hosts.current.port, + self.hosts.current.username) + # connection failure will be handled later def _complete_shutdown(self): """The AMQP Connection has closed, and the driver shutdown is complete. @@ -607,19 +608,18 @@ class Controller(pyngus.ConnectionEventHandler): if not self._reconnecting: self._reconnecting = True self._replies = None - if self._delay == 0: - self._delay = 1 - self._do_reconnect() - else: - d = self._delay - LOG.info("delaying reconnect attempt for %d seconds", d) - self.processor.schedule(lambda: self._do_reconnect(), d) - self._delay = min(d * 2, 60) + d = self._delay + LOG.info("delaying reconnect attempt for %d seconds", d) + self.processor.schedule(lambda: self._do_reconnect(), d) + self._delay = 1 if self._delay == 0 else min(d * 2, 60) def _do_reconnect(self): """Invoked on connection/socket failure, failover and re-connect to the messaging service. """ + # note well: since this method destroys the connection, it cannot be + # invoked directly from a pyngus callback. Use processor.schedule() to + # run this method on the main loop instead. if not self._closing: self._reconnecting = False self._senders = {} diff --git a/oslo_messaging/_drivers/protocols/amqp/eventloop.py b/oslo_messaging/_drivers/protocols/amqp/eventloop.py index 193f26294..e8fee7d1f 100644 --- a/oslo_messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo_messaging/_drivers/protocols/amqp/eventloop.py @@ -54,9 +54,7 @@ class _SocketConnection(object): # Currently it is the Controller object. self._handler = handler self._container = container - c = container.create_connection(name, handler, self._properties) - c.user_context = self - self.connection = c + self.connection = None def _get_name_and_pid(self): # helps identify the process that is using the connection @@ -72,37 +70,31 @@ class _SocketConnection(object): while True: try: rc = pyngus.read_socket_input(self.connection, self.socket) - if rc > 0: - self.connection.process(time.time()) + self.connection.process(time.time()) return rc - except socket.error as e: - if e.errno == errno.EAGAIN or e.errno == errno.EINTR: - continue - elif e.errno == errno.EWOULDBLOCK: - return 0 - else: - self._handler.socket_error(str(e)) - return pyngus.Connection.EOS + except (socket.timeout, socket.error) as e: + # pyngus handles EAGAIN/EWOULDBLOCK and EINTER + self.connection.close_input() + self.connection.close() + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS def write(self): """Called when socket is write-ready.""" while True: try: rc = pyngus.write_socket_output(self.connection, self.socket) - if rc > 0: - self.connection.process(time.time()) + self.connection.process(time.time()) return rc - except socket.error as e: - if e.errno == errno.EAGAIN or e.errno == errno.EINTR: - continue - elif e.errno == errno.EWOULDBLOCK: - return 0 - else: - self._handler.socket_error(str(e)) - return pyngus.Connection.EOS + except (socket.timeout, socket.error) as e: + # pyngus handles EAGAIN/EWOULDBLOCK and EINTER + self.connection.close_output() + self.connection.close() + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS def connect(self, host): - """Connect to host:port and start the AMQP protocol.""" + """Connect to host and start the AMQP protocol.""" addr = socket.getaddrinfo(host.hostname, host.port, socket.AF_INET, socket.SOCK_STREAM) if not addr: @@ -124,31 +116,46 @@ class _SocketConnection(object): return self.socket = my_socket - # determine the proper SASL mechanism: PLAIN if a username/password is - # present, else ANONYMOUS - pn_sasl = self.connection.pn_sasl - if host.username: - password = host.password if host.password else "" - pn_sasl.plain(host.username, password) - else: - pn_sasl.mechanisms("ANONYMOUS") - # TODO(kgiusti): server if accepting inbound connections - pn_sasl.client() + props = self._properties.copy() + if pyngus.VERSION >= (2, 0, 0): + # configure client authentication + # + props['x-server'] = False + if host.username: + props['x-username'] = host.username + props['x-password'] = host.password or "" + + c = self._container.create_connection(self.name, self._handler, props) + c.user_context = self + self.connection = c + + if pyngus.VERSION < (2, 0, 0): + # older versions of pyngus requires manual SASL configuration: + # determine the proper SASL mechanism: PLAIN if a username/password + # is present, else ANONYMOUS + pn_sasl = self.connection.pn_sasl + if host.username: + password = host.password if host.password else "" + pn_sasl.plain(host.username, password) + else: + pn_sasl.mechanisms("ANONYMOUS") + # TODO(kgiusti): server if accepting inbound connections + pn_sasl.client() + self.connection.open() def reset(self, name=None): """Clean up the current state, expect 'connect()' to be recalled later. """ + # note well: since destroy() is called on the connection, do not invoke + # this method from a pyngus callback! if self.connection: self.connection.destroy() + self.connection = None self.close() if name: self.name = name - c = self._container.create_connection(self.name, self._handler, - self._properties) - c.user_context = self - self.connection = c def close(self): if self.socket: @@ -325,7 +332,6 @@ class Thread(threading.Thread): for r in readable: r.read() - self._schedule.process() # run any deferred requests for t in timers: if t.deadline > time.time(): break @@ -334,6 +340,8 @@ class Thread(threading.Thread): for w in writable: w.write() + self._schedule.process() # run any deferred requests + LOG.info("eventloop thread exiting, container=%s", self._container.name) self._container.destroy() diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index 379033d95..8e39a3ad6 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -15,13 +15,17 @@ import logging import os import select +import shutil import socket +import subprocess +import tempfile import threading import time import uuid from oslo_utils import importutils from six import moves +from string import Template import testtools import oslo_messaging @@ -295,9 +299,10 @@ class TestAmqpNotification(_AmqpBrokerTestCase): driver.cleanup() -@testtools.skipUnless(pyngus, "proton modules not present") +@testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0), + "pyngus module not present") class TestAuthentication(test_utils.BaseTestCase): - + """Test user authentication using the old pyngus API""" def setUp(self): super(TestAuthentication, self).setUp() # for simplicity, encode the credentials as they would appear 'on the @@ -349,6 +354,89 @@ class TestAuthentication(test_utils.BaseTestCase): driver.cleanup() +@testtools.skipUnless(pyngus and pyngus.VERSION >= (2, 0, 0), + "pyngus module not present") +class TestCyrusAuthentication(test_utils.BaseTestCase): + """Test the driver's Cyrus SASL integration""" + + def setUp(self): + """Create a simple SASL configuration. This assumes saslpasswd2 is in + the OS path, otherwise the test will be skipped. + """ + super(TestCyrusAuthentication, self).setUp() + # Create a SASL configuration and user database, + # add a user 'joe' with password 'secret': + self._conf_dir = tempfile.mkdtemp() + db = os.path.join(self._conf_dir, 'openstack.sasldb') + _t = "echo secret | saslpasswd2 -c -p -f ${db} joe" + cmd = Template(_t).substitute(db=db) + try: + subprocess.call(args=cmd, shell=True) + except Exception: + shutil.rmtree(self._conf_dir, ignore_errors=True) + self._conf_dir = None + raise self.SkipTest("Cyrus tool saslpasswd2 not installed") + + # configure the SASL broker: + conf = os.path.join(self._conf_dir, 'openstack.conf') + mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN" + t = Template("""sasldb_path: ${db} +mech_list: ${mechs} +""") + with open(conf, 'w') as f: + f.write(t.substitute(db=db, mechs=mechs)) + + self._broker = FakeBroker(sasl_mechanisms=mechs, + user_credentials=["\0joe\0secret"], + sasl_config_dir=self._conf_dir, + sasl_config_name="openstack") + self._broker.start() + + def tearDown(self): + super(TestCyrusAuthentication, self).tearDown() + if self._broker: + self._broker.stop() + if self._conf_dir: + shutil.rmtree(self._conf_dir, ignore_errors=True) + + def test_authentication_ok(self): + """Verify that username and password given in TransportHost are + accepted by the broker. + """ + + addr = "amqp://joe:secret@%s:%d" % (self._broker.host, + self._broker.port) + url = oslo_messaging.TransportURL.parse(self.conf, addr) + driver = amqp_driver.ProtonDriver(self.conf, url) + target = oslo_messaging.Target(topic="test-topic") + listener = _ListenerThread(driver.listen(target), 1) + rc = driver.send(target, {"context": True}, + {"method": "echo"}, wait_for_reply=True) + self.assertIsNotNone(rc) + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + driver.cleanup() + + def test_authentication_failure(self): + """Verify that a bad password given in TransportHost is + rejected by the broker. + """ + + addr = "amqp://joe:badpass@%s:%d" % (self._broker.host, + self._broker.port) + url = oslo_messaging.TransportURL.parse(self.conf, addr) + driver = amqp_driver.ProtonDriver(self.conf, url) + target = oslo_messaging.Target(topic="test-topic") + _ListenerThread(driver.listen(target), 1) + self.assertRaises(oslo_messaging.MessagingTimeout, + driver.send, + target, {"context": True}, + {"method": "echo"}, + wait_for_reply=True, + timeout=2.0) + driver.cleanup() + + @testtools.skipUnless(pyngus, "proton modules not present") class TestFailover(test_utils.BaseTestCase): @@ -429,19 +517,33 @@ class FakeBroker(threading.Thread): """A single AMQP connection.""" def __init__(self, server, socket_, name, - sasl_mechanisms, user_credentials): + sasl_mechanisms, user_credentials, + sasl_config_dir, sasl_config_name): """Create a Connection using socket_.""" self.socket = socket_ self.name = name self.server = server - self.connection = server.container.create_connection(name, - self) - self.connection.user_context = self self.sasl_mechanisms = sasl_mechanisms self.user_credentials = user_credentials - if sasl_mechanisms: - self.connection.pn_sasl.mechanisms(sasl_mechanisms) - self.connection.pn_sasl.server() + properties = {'x-server': True} + if self.sasl_mechanisms: + properties['x-sasl-mechs'] = self.sasl_mechanisms + if "ANONYMOUS" not in self.sasl_mechanisms: + properties['x-require-auth'] = True + if sasl_config_dir: + properties['x-sasl-config-dir'] = sasl_config_dir + if sasl_config_name: + properties['x-sasl-config-name'] = sasl_config_name + + self.connection = server.container.create_connection( + name, self, properties) + self.connection.user_context = self + if pyngus.VERSION < (2, 0, 0): + # older versions of pyngus don't recognize the sasl + # connection properties, so configure them manually: + if sasl_mechanisms: + self.connection.pn_sasl.mechanisms(sasl_mechanisms) + self.connection.pn_sasl.server() self.connection.open() self.sender_links = set() self.closed = False @@ -506,7 +608,8 @@ class FakeBroker(threading.Thread): link_handle, addr) def sasl_step(self, connection, pn_sasl): - if self.sasl_mechanisms == 'PLAIN': + # only called if not using Cyrus SASL + if 'PLAIN' in self.sasl_mechanisms: credentials = pn_sasl.recv() if not credentials: return # wait until some arrives @@ -592,7 +695,9 @@ class FakeBroker(threading.Thread): address_separator=".", sock_addr="", sock_port=0, sasl_mechanisms="ANONYMOUS", - user_credentials=None): + user_credentials=None, + sasl_config_dir=None, + sasl_config_name=None): """Create a fake broker listening on sock_addr:sock_port.""" if not pyngus: raise AssertionError("pyngus module not present") @@ -602,6 +707,8 @@ class FakeBroker(threading.Thread): self._group_prefix = group_prefix + address_separator self._address_separator = address_separator self._sasl_mechanisms = sasl_mechanisms + self._sasl_config_dir = sasl_config_dir + self._sasl_config_name = sasl_config_name self._user_credentials = user_credentials self._wakeup_pipe = os.pipe() self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -664,7 +771,9 @@ class FakeBroker(threading.Thread): name = str(client_address) conn = FakeBroker.Connection(self, client_socket, name, self._sasl_mechanisms, - self._user_credentials) + self._user_credentials, + self._sasl_config_dir, + self._sasl_config_name) self._connections[conn.name] = conn elif r is self._wakeup_pipe[0]: os.read(self._wakeup_pipe[0], 512) diff --git a/setup-test-env-qpid.sh b/setup-test-env-qpid.sh index 6efe8f388..9c0e38d5e 100755 --- a/setup-test-env-qpid.sh +++ b/setup-test-env-qpid.sh @@ -1,4 +1,8 @@ #!/bin/bash +# +# Usage: setup-test-env-qpid.sh PROTOCOL +# where PROTOCOL is the version of the AMQP protocol to use with +# qpidd. Valid values for PROTOCOL are "1", "1.0", "0-10", "0.10" set -e # require qpidd, qpid-tools sasl2-bin/cyrus-sasl-plain+cyrus-sasl-lib @@ -8,6 +12,34 @@ set -e DATADIR=$(mktemp -d /tmp/OSLOMSG-QPID.XXXXX) trap "clean_exit $DATADIR" EXIT +QPIDD=$(which qpidd 2>/dev/null) + +# which protocol should be used with qpidd? +# 1 for AMQP 1.0, 0.10 for AMQP 0.10 +# +PROTOCOL=$1 +case $PROTOCOL in + "1" | "1.0") + PROTOCOL="1" + shift + ;; + "0.10" | "0-10") + PROTOCOL="0-10" + shift + ;; + *) + # assume the old protocol + echo "No protocol specified, assuming 0.10" + PROTOCOL="0-10" + ;; +esac + +# ensure that the version of qpidd does support AMQP 1.0 +if [ $PROTOCOL == "1" ] && ! `$QPIDD --help | grep -q "queue-patterns"`; then + echo "This version of $QPIDD does not support AMQP 1.0" + exit 1 +fi + [ -f "/usr/lib/qpid/daemon/acl.so" ] && LIBACL="load-module=/usr/lib/qpid/daemon/acl.so" cat > ${DATADIR}/qpidd.conf <> ${DATADIR}/qpidd.conf <> ${DATADIR}/qpidd.conf < ${DATADIR}/qpidd.acl </dev/null) - mkfifo ${DATADIR}/out $QPIDD --log-enable info+ --log-to-file ${DATADIR}/out --config ${DATADIR}/qpidd.conf & wait_for_line "Broker .*running" "error" ${DATADIR}/out diff --git a/tox.ini b/tox.ini index 92cdc4b93..7cf9b2bc5 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = python setup.py build_sphinx [testenv:py27-func-qpid] setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123// -commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-rabbit] commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' @@ -34,7 +34,7 @@ commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest [testenv:py27-func-amqp1] setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// # NOTE(flaper87): This gate job run on fedora21 for now. -commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-zeromq] commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'