Port the AMQP1 driver to new Pyngus SASL API

Pyngus 2.0 includes a new API for configuring SASL credentials.
Previous versions of Pyngus did not provide this API - the driver had
to invoke Proton APIs in order to configure user credentials.  Moving
to the Pyngus API will preserve compatibility with older versions of
Proton, since the next release of Proton wil be changing its SASL API.

Pyngus 2.0 also adds strict enforcement of callback re-entrancy
constrants.  This patch fixes some bad driver reentrancy violations.

Closes-bug: #1473515
Change-Id: Iddccefd3ee3c9092c086fc54e3810f78d5df9338
This commit is contained in:
Kenneth Giusti 2015-07-21 15:31:24 -04:00
parent a3d3a12aab
commit 992c7ec5b0
5 changed files with 228 additions and 71 deletions

View File

@ -575,15 +575,16 @@ class Controller(pyngus.ConnectionEventHandler):
self._socket_connection.connection.close()
def sasl_done(self, connection, pn_sasl, outcome):
"""This is a Pyngus callback invoked by Pyngus when the SASL handshake
has completed. The outcome of the handshake will be OK on success or
AUTH on failure.
"""This is a Pyngus callback invoked when the SASL handshake
has completed. The outcome of the handshake is passed in the outcome
argument.
"""
if outcome == proton.SASL.AUTH:
LOG.error("Unable to connect to %s:%s, authentication failure.",
self.hosts.current.hostname, self.hosts.current.port)
# requires user intervention, treat it like a connection failure:
self._handle_connection_loss()
if outcome == proton.SASL.OK:
return
LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s",
self.hosts.current.hostname, self.hosts.current.port,
self.hosts.current.username)
# connection failure will be handled later
def _complete_shutdown(self):
"""The AMQP Connection has closed, and the driver shutdown is complete.
@ -607,19 +608,18 @@ class Controller(pyngus.ConnectionEventHandler):
if not self._reconnecting:
self._reconnecting = True
self._replies = None
if self._delay == 0:
self._delay = 1
self._do_reconnect()
else:
d = self._delay
LOG.info("delaying reconnect attempt for %d seconds", d)
self.processor.schedule(lambda: self._do_reconnect(), d)
self._delay = min(d * 2, 60)
d = self._delay
LOG.info("delaying reconnect attempt for %d seconds", d)
self.processor.schedule(lambda: self._do_reconnect(), d)
self._delay = 1 if self._delay == 0 else min(d * 2, 60)
def _do_reconnect(self):
"""Invoked on connection/socket failure, failover and re-connect to the
messaging service.
"""
# note well: since this method destroys the connection, it cannot be
# invoked directly from a pyngus callback. Use processor.schedule() to
# run this method on the main loop instead.
if not self._closing:
self._reconnecting = False
self._senders = {}

View File

@ -54,9 +54,7 @@ class _SocketConnection(object):
# Currently it is the Controller object.
self._handler = handler
self._container = container
c = container.create_connection(name, handler, self._properties)
c.user_context = self
self.connection = c
self.connection = None
def _get_name_and_pid(self):
# helps identify the process that is using the connection
@ -72,37 +70,31 @@ class _SocketConnection(object):
while True:
try:
rc = pyngus.read_socket_input(self.connection, self.socket)
if rc > 0:
self.connection.process(time.time())
self.connection.process(time.time())
return rc
except socket.error as e:
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
continue
elif e.errno == errno.EWOULDBLOCK:
return 0
else:
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_input()
self.connection.close()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
def write(self):
"""Called when socket is write-ready."""
while True:
try:
rc = pyngus.write_socket_output(self.connection, self.socket)
if rc > 0:
self.connection.process(time.time())
self.connection.process(time.time())
return rc
except socket.error as e:
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
continue
elif e.errno == errno.EWOULDBLOCK:
return 0
else:
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_output()
self.connection.close()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
def connect(self, host):
"""Connect to host:port and start the AMQP protocol."""
"""Connect to host and start the AMQP protocol."""
addr = socket.getaddrinfo(host.hostname, host.port,
socket.AF_INET, socket.SOCK_STREAM)
if not addr:
@ -124,31 +116,46 @@ class _SocketConnection(object):
return
self.socket = my_socket
# determine the proper SASL mechanism: PLAIN if a username/password is
# present, else ANONYMOUS
pn_sasl = self.connection.pn_sasl
if host.username:
password = host.password if host.password else ""
pn_sasl.plain(host.username, password)
else:
pn_sasl.mechanisms("ANONYMOUS")
# TODO(kgiusti): server if accepting inbound connections
pn_sasl.client()
props = self._properties.copy()
if pyngus.VERSION >= (2, 0, 0):
# configure client authentication
#
props['x-server'] = False
if host.username:
props['x-username'] = host.username
props['x-password'] = host.password or ""
c = self._container.create_connection(self.name, self._handler, props)
c.user_context = self
self.connection = c
if pyngus.VERSION < (2, 0, 0):
# older versions of pyngus requires manual SASL configuration:
# determine the proper SASL mechanism: PLAIN if a username/password
# is present, else ANONYMOUS
pn_sasl = self.connection.pn_sasl
if host.username:
password = host.password if host.password else ""
pn_sasl.plain(host.username, password)
else:
pn_sasl.mechanisms("ANONYMOUS")
# TODO(kgiusti): server if accepting inbound connections
pn_sasl.client()
self.connection.open()
def reset(self, name=None):
"""Clean up the current state, expect 'connect()' to be recalled
later.
"""
# note well: since destroy() is called on the connection, do not invoke
# this method from a pyngus callback!
if self.connection:
self.connection.destroy()
self.connection = None
self.close()
if name:
self.name = name
c = self._container.create_connection(self.name, self._handler,
self._properties)
c.user_context = self
self.connection = c
def close(self):
if self.socket:
@ -325,7 +332,6 @@ class Thread(threading.Thread):
for r in readable:
r.read()
self._schedule.process() # run any deferred requests
for t in timers:
if t.deadline > time.time():
break
@ -334,6 +340,8 @@ class Thread(threading.Thread):
for w in writable:
w.write()
self._schedule.process() # run any deferred requests
LOG.info("eventloop thread exiting, container=%s",
self._container.name)
self._container.destroy()

View File

@ -15,13 +15,17 @@
import logging
import os
import select
import shutil
import socket
import subprocess
import tempfile
import threading
import time
import uuid
from oslo_utils import importutils
from six import moves
from string import Template
import testtools
import oslo_messaging
@ -295,9 +299,10 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
driver.cleanup()
@testtools.skipUnless(pyngus, "proton modules not present")
@testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0),
"pyngus module not present")
class TestAuthentication(test_utils.BaseTestCase):
"""Test user authentication using the old pyngus API"""
def setUp(self):
super(TestAuthentication, self).setUp()
# for simplicity, encode the credentials as they would appear 'on the
@ -349,6 +354,89 @@ class TestAuthentication(test_utils.BaseTestCase):
driver.cleanup()
@testtools.skipUnless(pyngus and pyngus.VERSION >= (2, 0, 0),
"pyngus module not present")
class TestCyrusAuthentication(test_utils.BaseTestCase):
"""Test the driver's Cyrus SASL integration"""
def setUp(self):
"""Create a simple SASL configuration. This assumes saslpasswd2 is in
the OS path, otherwise the test will be skipped.
"""
super(TestCyrusAuthentication, self).setUp()
# Create a SASL configuration and user database,
# add a user 'joe' with password 'secret':
self._conf_dir = tempfile.mkdtemp()
db = os.path.join(self._conf_dir, 'openstack.sasldb')
_t = "echo secret | saslpasswd2 -c -p -f ${db} joe"
cmd = Template(_t).substitute(db=db)
try:
subprocess.call(args=cmd, shell=True)
except Exception:
shutil.rmtree(self._conf_dir, ignore_errors=True)
self._conf_dir = None
raise self.SkipTest("Cyrus tool saslpasswd2 not installed")
# configure the SASL broker:
conf = os.path.join(self._conf_dir, 'openstack.conf')
mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN"
t = Template("""sasldb_path: ${db}
mech_list: ${mechs}
""")
with open(conf, 'w') as f:
f.write(t.substitute(db=db, mechs=mechs))
self._broker = FakeBroker(sasl_mechanisms=mechs,
user_credentials=["\0joe\0secret"],
sasl_config_dir=self._conf_dir,
sasl_config_name="openstack")
self._broker.start()
def tearDown(self):
super(TestCyrusAuthentication, self).tearDown()
if self._broker:
self._broker.stop()
if self._conf_dir:
shutil.rmtree(self._conf_dir, ignore_errors=True)
def test_authentication_ok(self):
"""Verify that username and password given in TransportHost are
accepted by the broker.
"""
addr = "amqp://joe:secret@%s:%d" % (self._broker.host,
self._broker.port)
url = oslo_messaging.TransportURL.parse(self.conf, addr)
driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread(driver.listen(target), 1)
rc = driver.send(target, {"context": True},
{"method": "echo"}, wait_for_reply=True)
self.assertIsNotNone(rc)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
driver.cleanup()
def test_authentication_failure(self):
"""Verify that a bad password given in TransportHost is
rejected by the broker.
"""
addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
self._broker.port)
url = oslo_messaging.TransportURL.parse(self.conf, addr)
driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic")
_ListenerThread(driver.listen(target), 1)
self.assertRaises(oslo_messaging.MessagingTimeout,
driver.send,
target, {"context": True},
{"method": "echo"},
wait_for_reply=True,
timeout=2.0)
driver.cleanup()
@testtools.skipUnless(pyngus, "proton modules not present")
class TestFailover(test_utils.BaseTestCase):
@ -429,19 +517,33 @@ class FakeBroker(threading.Thread):
"""A single AMQP connection."""
def __init__(self, server, socket_, name,
sasl_mechanisms, user_credentials):
sasl_mechanisms, user_credentials,
sasl_config_dir, sasl_config_name):
"""Create a Connection using socket_."""
self.socket = socket_
self.name = name
self.server = server
self.connection = server.container.create_connection(name,
self)
self.connection.user_context = self
self.sasl_mechanisms = sasl_mechanisms
self.user_credentials = user_credentials
if sasl_mechanisms:
self.connection.pn_sasl.mechanisms(sasl_mechanisms)
self.connection.pn_sasl.server()
properties = {'x-server': True}
if self.sasl_mechanisms:
properties['x-sasl-mechs'] = self.sasl_mechanisms
if "ANONYMOUS" not in self.sasl_mechanisms:
properties['x-require-auth'] = True
if sasl_config_dir:
properties['x-sasl-config-dir'] = sasl_config_dir
if sasl_config_name:
properties['x-sasl-config-name'] = sasl_config_name
self.connection = server.container.create_connection(
name, self, properties)
self.connection.user_context = self
if pyngus.VERSION < (2, 0, 0):
# older versions of pyngus don't recognize the sasl
# connection properties, so configure them manually:
if sasl_mechanisms:
self.connection.pn_sasl.mechanisms(sasl_mechanisms)
self.connection.pn_sasl.server()
self.connection.open()
self.sender_links = set()
self.closed = False
@ -506,7 +608,8 @@ class FakeBroker(threading.Thread):
link_handle, addr)
def sasl_step(self, connection, pn_sasl):
if self.sasl_mechanisms == 'PLAIN':
# only called if not using Cyrus SASL
if 'PLAIN' in self.sasl_mechanisms:
credentials = pn_sasl.recv()
if not credentials:
return # wait until some arrives
@ -592,7 +695,9 @@ class FakeBroker(threading.Thread):
address_separator=".",
sock_addr="", sock_port=0,
sasl_mechanisms="ANONYMOUS",
user_credentials=None):
user_credentials=None,
sasl_config_dir=None,
sasl_config_name=None):
"""Create a fake broker listening on sock_addr:sock_port."""
if not pyngus:
raise AssertionError("pyngus module not present")
@ -602,6 +707,8 @@ class FakeBroker(threading.Thread):
self._group_prefix = group_prefix + address_separator
self._address_separator = address_separator
self._sasl_mechanisms = sasl_mechanisms
self._sasl_config_dir = sasl_config_dir
self._sasl_config_name = sasl_config_name
self._user_credentials = user_credentials
self._wakeup_pipe = os.pipe()
self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -664,7 +771,9 @@ class FakeBroker(threading.Thread):
name = str(client_address)
conn = FakeBroker.Connection(self, client_socket, name,
self._sasl_mechanisms,
self._user_credentials)
self._user_credentials,
self._sasl_config_dir,
self._sasl_config_name)
self._connections[conn.name] = conn
elif r is self._wakeup_pipe[0]:
os.read(self._wakeup_pipe[0], 512)

View File

@ -1,4 +1,8 @@
#!/bin/bash
#
# Usage: setup-test-env-qpid.sh PROTOCOL <command to run>
# where PROTOCOL is the version of the AMQP protocol to use with
# qpidd. Valid values for PROTOCOL are "1", "1.0", "0-10", "0.10"
set -e
# require qpidd, qpid-tools sasl2-bin/cyrus-sasl-plain+cyrus-sasl-lib
@ -8,6 +12,34 @@ set -e
DATADIR=$(mktemp -d /tmp/OSLOMSG-QPID.XXXXX)
trap "clean_exit $DATADIR" EXIT
QPIDD=$(which qpidd 2>/dev/null)
# which protocol should be used with qpidd?
# 1 for AMQP 1.0, 0.10 for AMQP 0.10
#
PROTOCOL=$1
case $PROTOCOL in
"1" | "1.0")
PROTOCOL="1"
shift
;;
"0.10" | "0-10")
PROTOCOL="0-10"
shift
;;
*)
# assume the old protocol
echo "No protocol specified, assuming 0.10"
PROTOCOL="0-10"
;;
esac
# ensure that the version of qpidd does support AMQP 1.0
if [ $PROTOCOL == "1" ] && ! `$QPIDD --help | grep -q "queue-patterns"`; then
echo "This version of $QPIDD does not support AMQP 1.0"
exit 1
fi
[ -f "/usr/lib/qpid/daemon/acl.so" ] && LIBACL="load-module=/usr/lib/qpid/daemon/acl.so"
cat > ${DATADIR}/qpidd.conf <<EOF
@ -18,12 +50,22 @@ ${LIBACL}
mgmt-enable=yes
auth=yes
log-to-stderr=no
EOF
if [ $PROTOCOL == "1" ]; then
cat >> ${DATADIR}/qpidd.conf <<EOF
# Used by AMQP1.0 only
queue-patterns=exclusive
queue-patterns=unicast
topic-patterns=broadcast
EOF
# some versions of qpidd require this for AMQP 1 and SASL:
if `$QPIDD --help | grep -q "sasl-service-name"`; then
cat >> ${DATADIR}/qpidd.conf <<EOF
sasl-service-name=amqp
EOF
fi
fi
cat > ${DATADIR}/qpidd.acl <<EOF
group admin stackqpid@QPID
@ -41,8 +83,6 @@ EOF
echo secretqpid | saslpasswd2 -c -p -f ${DATADIR}/qpidd.sasldb -u QPID stackqpid
QPIDD=$(which qpidd 2>/dev/null)
mkfifo ${DATADIR}/out
$QPIDD --log-enable info+ --log-to-file ${DATADIR}/out --config ${DATADIR}/qpidd.conf &
wait_for_line "Broker .*running" "error" ${DATADIR}/out

View File

@ -26,7 +26,7 @@ commands = python setup.py build_sphinx
[testenv:py27-func-qpid]
setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123//
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-rabbit]
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
@ -34,7 +34,7 @@ commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest
[testenv:py27-func-amqp1]
setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
# NOTE(flaper87): This gate job run on fedora21 for now.
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-zeromq]
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'