Merge "Port the AMQP1 driver to new Pyngus SASL API"
This commit is contained in:
commit
155e867611
@ -575,15 +575,16 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._socket_connection.connection.close()
|
||||
|
||||
def sasl_done(self, connection, pn_sasl, outcome):
|
||||
"""This is a Pyngus callback invoked by Pyngus when the SASL handshake
|
||||
has completed. The outcome of the handshake will be OK on success or
|
||||
AUTH on failure.
|
||||
"""This is a Pyngus callback invoked when the SASL handshake
|
||||
has completed. The outcome of the handshake is passed in the outcome
|
||||
argument.
|
||||
"""
|
||||
if outcome == proton.SASL.AUTH:
|
||||
LOG.error("Unable to connect to %s:%s, authentication failure.",
|
||||
self.hosts.current.hostname, self.hosts.current.port)
|
||||
# requires user intervention, treat it like a connection failure:
|
||||
self._handle_connection_loss()
|
||||
if outcome == proton.SASL.OK:
|
||||
return
|
||||
LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s",
|
||||
self.hosts.current.hostname, self.hosts.current.port,
|
||||
self.hosts.current.username)
|
||||
# connection failure will be handled later
|
||||
|
||||
def _complete_shutdown(self):
|
||||
"""The AMQP Connection has closed, and the driver shutdown is complete.
|
||||
@ -607,19 +608,18 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
if not self._reconnecting:
|
||||
self._reconnecting = True
|
||||
self._replies = None
|
||||
if self._delay == 0:
|
||||
self._delay = 1
|
||||
self._do_reconnect()
|
||||
else:
|
||||
d = self._delay
|
||||
LOG.info("delaying reconnect attempt for %d seconds", d)
|
||||
self.processor.schedule(lambda: self._do_reconnect(), d)
|
||||
self._delay = min(d * 2, 60)
|
||||
self._delay = 1 if self._delay == 0 else min(d * 2, 60)
|
||||
|
||||
def _do_reconnect(self):
|
||||
"""Invoked on connection/socket failure, failover and re-connect to the
|
||||
messaging service.
|
||||
"""
|
||||
# note well: since this method destroys the connection, it cannot be
|
||||
# invoked directly from a pyngus callback. Use processor.schedule() to
|
||||
# run this method on the main loop instead.
|
||||
if not self._closing:
|
||||
self._reconnecting = False
|
||||
self._senders = {}
|
||||
|
@ -54,9 +54,7 @@ class _SocketConnection(object):
|
||||
# Currently it is the Controller object.
|
||||
self._handler = handler
|
||||
self._container = container
|
||||
c = container.create_connection(name, handler, self._properties)
|
||||
c.user_context = self
|
||||
self.connection = c
|
||||
self.connection = None
|
||||
|
||||
def _get_name_and_pid(self):
|
||||
# helps identify the process that is using the connection
|
||||
@ -72,15 +70,12 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.read_socket_input(self.connection, self.socket)
|
||||
if rc > 0:
|
||||
self.connection.process(time.time())
|
||||
return rc
|
||||
except socket.error as e:
|
||||
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
|
||||
continue
|
||||
elif e.errno == errno.EWOULDBLOCK:
|
||||
return 0
|
||||
else:
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_input()
|
||||
self.connection.close()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
@ -89,20 +84,17 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.write_socket_output(self.connection, self.socket)
|
||||
if rc > 0:
|
||||
self.connection.process(time.time())
|
||||
return rc
|
||||
except socket.error as e:
|
||||
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
|
||||
continue
|
||||
elif e.errno == errno.EWOULDBLOCK:
|
||||
return 0
|
||||
else:
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_output()
|
||||
self.connection.close()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
def connect(self, host):
|
||||
"""Connect to host:port and start the AMQP protocol."""
|
||||
"""Connect to host and start the AMQP protocol."""
|
||||
addr = socket.getaddrinfo(host.hostname, host.port,
|
||||
socket.AF_INET, socket.SOCK_STREAM)
|
||||
if not addr:
|
||||
@ -124,8 +116,23 @@ class _SocketConnection(object):
|
||||
return
|
||||
self.socket = my_socket
|
||||
|
||||
# determine the proper SASL mechanism: PLAIN if a username/password is
|
||||
# present, else ANONYMOUS
|
||||
props = self._properties.copy()
|
||||
if pyngus.VERSION >= (2, 0, 0):
|
||||
# configure client authentication
|
||||
#
|
||||
props['x-server'] = False
|
||||
if host.username:
|
||||
props['x-username'] = host.username
|
||||
props['x-password'] = host.password or ""
|
||||
|
||||
c = self._container.create_connection(self.name, self._handler, props)
|
||||
c.user_context = self
|
||||
self.connection = c
|
||||
|
||||
if pyngus.VERSION < (2, 0, 0):
|
||||
# older versions of pyngus requires manual SASL configuration:
|
||||
# determine the proper SASL mechanism: PLAIN if a username/password
|
||||
# is present, else ANONYMOUS
|
||||
pn_sasl = self.connection.pn_sasl
|
||||
if host.username:
|
||||
password = host.password if host.password else ""
|
||||
@ -134,21 +141,21 @@ class _SocketConnection(object):
|
||||
pn_sasl.mechanisms("ANONYMOUS")
|
||||
# TODO(kgiusti): server if accepting inbound connections
|
||||
pn_sasl.client()
|
||||
|
||||
self.connection.open()
|
||||
|
||||
def reset(self, name=None):
|
||||
"""Clean up the current state, expect 'connect()' to be recalled
|
||||
later.
|
||||
"""
|
||||
# note well: since destroy() is called on the connection, do not invoke
|
||||
# this method from a pyngus callback!
|
||||
if self.connection:
|
||||
self.connection.destroy()
|
||||
self.connection = None
|
||||
self.close()
|
||||
if name:
|
||||
self.name = name
|
||||
c = self._container.create_connection(self.name, self._handler,
|
||||
self._properties)
|
||||
c.user_context = self
|
||||
self.connection = c
|
||||
|
||||
def close(self):
|
||||
if self.socket:
|
||||
@ -325,7 +332,6 @@ class Thread(threading.Thread):
|
||||
for r in readable:
|
||||
r.read()
|
||||
|
||||
self._schedule.process() # run any deferred requests
|
||||
for t in timers:
|
||||
if t.deadline > time.time():
|
||||
break
|
||||
@ -334,6 +340,8 @@ class Thread(threading.Thread):
|
||||
for w in writable:
|
||||
w.write()
|
||||
|
||||
self._schedule.process() # run any deferred requests
|
||||
|
||||
LOG.info("eventloop thread exiting, container=%s",
|
||||
self._container.name)
|
||||
self._container.destroy()
|
||||
|
@ -15,13 +15,17 @@
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_utils import importutils
|
||||
from six import moves
|
||||
from string import Template
|
||||
import testtools
|
||||
|
||||
import oslo_messaging
|
||||
@ -295,9 +299,10 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
|
||||
driver.cleanup()
|
||||
|
||||
|
||||
@testtools.skipUnless(pyngus, "proton modules not present")
|
||||
@testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0),
|
||||
"pyngus module not present")
|
||||
class TestAuthentication(test_utils.BaseTestCase):
|
||||
|
||||
"""Test user authentication using the old pyngus API"""
|
||||
def setUp(self):
|
||||
super(TestAuthentication, self).setUp()
|
||||
# for simplicity, encode the credentials as they would appear 'on the
|
||||
@ -349,6 +354,89 @@ class TestAuthentication(test_utils.BaseTestCase):
|
||||
driver.cleanup()
|
||||
|
||||
|
||||
@testtools.skipUnless(pyngus and pyngus.VERSION >= (2, 0, 0),
|
||||
"pyngus module not present")
|
||||
class TestCyrusAuthentication(test_utils.BaseTestCase):
|
||||
"""Test the driver's Cyrus SASL integration"""
|
||||
|
||||
def setUp(self):
|
||||
"""Create a simple SASL configuration. This assumes saslpasswd2 is in
|
||||
the OS path, otherwise the test will be skipped.
|
||||
"""
|
||||
super(TestCyrusAuthentication, self).setUp()
|
||||
# Create a SASL configuration and user database,
|
||||
# add a user 'joe' with password 'secret':
|
||||
self._conf_dir = tempfile.mkdtemp()
|
||||
db = os.path.join(self._conf_dir, 'openstack.sasldb')
|
||||
_t = "echo secret | saslpasswd2 -c -p -f ${db} joe"
|
||||
cmd = Template(_t).substitute(db=db)
|
||||
try:
|
||||
subprocess.call(args=cmd, shell=True)
|
||||
except Exception:
|
||||
shutil.rmtree(self._conf_dir, ignore_errors=True)
|
||||
self._conf_dir = None
|
||||
raise self.SkipTest("Cyrus tool saslpasswd2 not installed")
|
||||
|
||||
# configure the SASL broker:
|
||||
conf = os.path.join(self._conf_dir, 'openstack.conf')
|
||||
mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN"
|
||||
t = Template("""sasldb_path: ${db}
|
||||
mech_list: ${mechs}
|
||||
""")
|
||||
with open(conf, 'w') as f:
|
||||
f.write(t.substitute(db=db, mechs=mechs))
|
||||
|
||||
self._broker = FakeBroker(sasl_mechanisms=mechs,
|
||||
user_credentials=["\0joe\0secret"],
|
||||
sasl_config_dir=self._conf_dir,
|
||||
sasl_config_name="openstack")
|
||||
self._broker.start()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestCyrusAuthentication, self).tearDown()
|
||||
if self._broker:
|
||||
self._broker.stop()
|
||||
if self._conf_dir:
|
||||
shutil.rmtree(self._conf_dir, ignore_errors=True)
|
||||
|
||||
def test_authentication_ok(self):
|
||||
"""Verify that username and password given in TransportHost are
|
||||
accepted by the broker.
|
||||
"""
|
||||
|
||||
addr = "amqp://joe:secret@%s:%d" % (self._broker.host,
|
||||
self._broker.port)
|
||||
url = oslo_messaging.TransportURL.parse(self.conf, addr)
|
||||
driver = amqp_driver.ProtonDriver(self.conf, url)
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
listener = _ListenerThread(driver.listen(target), 1)
|
||||
rc = driver.send(target, {"context": True},
|
||||
{"method": "echo"}, wait_for_reply=True)
|
||||
self.assertIsNotNone(rc)
|
||||
listener.join(timeout=30)
|
||||
self.assertFalse(listener.isAlive())
|
||||
driver.cleanup()
|
||||
|
||||
def test_authentication_failure(self):
|
||||
"""Verify that a bad password given in TransportHost is
|
||||
rejected by the broker.
|
||||
"""
|
||||
|
||||
addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
|
||||
self._broker.port)
|
||||
url = oslo_messaging.TransportURL.parse(self.conf, addr)
|
||||
driver = amqp_driver.ProtonDriver(self.conf, url)
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
_ListenerThread(driver.listen(target), 1)
|
||||
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||
driver.send,
|
||||
target, {"context": True},
|
||||
{"method": "echo"},
|
||||
wait_for_reply=True,
|
||||
timeout=2.0)
|
||||
driver.cleanup()
|
||||
|
||||
|
||||
@testtools.skipUnless(pyngus, "proton modules not present")
|
||||
class TestFailover(test_utils.BaseTestCase):
|
||||
|
||||
@ -429,16 +517,30 @@ class FakeBroker(threading.Thread):
|
||||
"""A single AMQP connection."""
|
||||
|
||||
def __init__(self, server, socket_, name,
|
||||
sasl_mechanisms, user_credentials):
|
||||
sasl_mechanisms, user_credentials,
|
||||
sasl_config_dir, sasl_config_name):
|
||||
"""Create a Connection using socket_."""
|
||||
self.socket = socket_
|
||||
self.name = name
|
||||
self.server = server
|
||||
self.connection = server.container.create_connection(name,
|
||||
self)
|
||||
self.connection.user_context = self
|
||||
self.sasl_mechanisms = sasl_mechanisms
|
||||
self.user_credentials = user_credentials
|
||||
properties = {'x-server': True}
|
||||
if self.sasl_mechanisms:
|
||||
properties['x-sasl-mechs'] = self.sasl_mechanisms
|
||||
if "ANONYMOUS" not in self.sasl_mechanisms:
|
||||
properties['x-require-auth'] = True
|
||||
if sasl_config_dir:
|
||||
properties['x-sasl-config-dir'] = sasl_config_dir
|
||||
if sasl_config_name:
|
||||
properties['x-sasl-config-name'] = sasl_config_name
|
||||
|
||||
self.connection = server.container.create_connection(
|
||||
name, self, properties)
|
||||
self.connection.user_context = self
|
||||
if pyngus.VERSION < (2, 0, 0):
|
||||
# older versions of pyngus don't recognize the sasl
|
||||
# connection properties, so configure them manually:
|
||||
if sasl_mechanisms:
|
||||
self.connection.pn_sasl.mechanisms(sasl_mechanisms)
|
||||
self.connection.pn_sasl.server()
|
||||
@ -506,7 +608,8 @@ class FakeBroker(threading.Thread):
|
||||
link_handle, addr)
|
||||
|
||||
def sasl_step(self, connection, pn_sasl):
|
||||
if self.sasl_mechanisms == 'PLAIN':
|
||||
# only called if not using Cyrus SASL
|
||||
if 'PLAIN' in self.sasl_mechanisms:
|
||||
credentials = pn_sasl.recv()
|
||||
if not credentials:
|
||||
return # wait until some arrives
|
||||
@ -592,7 +695,9 @@ class FakeBroker(threading.Thread):
|
||||
address_separator=".",
|
||||
sock_addr="", sock_port=0,
|
||||
sasl_mechanisms="ANONYMOUS",
|
||||
user_credentials=None):
|
||||
user_credentials=None,
|
||||
sasl_config_dir=None,
|
||||
sasl_config_name=None):
|
||||
"""Create a fake broker listening on sock_addr:sock_port."""
|
||||
if not pyngus:
|
||||
raise AssertionError("pyngus module not present")
|
||||
@ -602,6 +707,8 @@ class FakeBroker(threading.Thread):
|
||||
self._group_prefix = group_prefix + address_separator
|
||||
self._address_separator = address_separator
|
||||
self._sasl_mechanisms = sasl_mechanisms
|
||||
self._sasl_config_dir = sasl_config_dir
|
||||
self._sasl_config_name = sasl_config_name
|
||||
self._user_credentials = user_credentials
|
||||
self._wakeup_pipe = os.pipe()
|
||||
self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@ -664,7 +771,9 @@ class FakeBroker(threading.Thread):
|
||||
name = str(client_address)
|
||||
conn = FakeBroker.Connection(self, client_socket, name,
|
||||
self._sasl_mechanisms,
|
||||
self._user_credentials)
|
||||
self._user_credentials,
|
||||
self._sasl_config_dir,
|
||||
self._sasl_config_name)
|
||||
self._connections[conn.name] = conn
|
||||
elif r is self._wakeup_pipe[0]:
|
||||
os.read(self._wakeup_pipe[0], 512)
|
||||
|
@ -1,4 +1,8 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Usage: setup-test-env-qpid.sh PROTOCOL <command to run>
|
||||
# where PROTOCOL is the version of the AMQP protocol to use with
|
||||
# qpidd. Valid values for PROTOCOL are "1", "1.0", "0-10", "0.10"
|
||||
set -e
|
||||
|
||||
# require qpidd, qpid-tools sasl2-bin/cyrus-sasl-plain+cyrus-sasl-lib
|
||||
@ -8,6 +12,34 @@ set -e
|
||||
DATADIR=$(mktemp -d /tmp/OSLOMSG-QPID.XXXXX)
|
||||
trap "clean_exit $DATADIR" EXIT
|
||||
|
||||
QPIDD=$(which qpidd 2>/dev/null)
|
||||
|
||||
# which protocol should be used with qpidd?
|
||||
# 1 for AMQP 1.0, 0.10 for AMQP 0.10
|
||||
#
|
||||
PROTOCOL=$1
|
||||
case $PROTOCOL in
|
||||
"1" | "1.0")
|
||||
PROTOCOL="1"
|
||||
shift
|
||||
;;
|
||||
"0.10" | "0-10")
|
||||
PROTOCOL="0-10"
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
# assume the old protocol
|
||||
echo "No protocol specified, assuming 0.10"
|
||||
PROTOCOL="0-10"
|
||||
;;
|
||||
esac
|
||||
|
||||
# ensure that the version of qpidd does support AMQP 1.0
|
||||
if [ $PROTOCOL == "1" ] && ! `$QPIDD --help | grep -q "queue-patterns"`; then
|
||||
echo "This version of $QPIDD does not support AMQP 1.0"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
[ -f "/usr/lib/qpid/daemon/acl.so" ] && LIBACL="load-module=/usr/lib/qpid/daemon/acl.so"
|
||||
|
||||
cat > ${DATADIR}/qpidd.conf <<EOF
|
||||
@ -18,12 +50,22 @@ ${LIBACL}
|
||||
mgmt-enable=yes
|
||||
auth=yes
|
||||
log-to-stderr=no
|
||||
EOF
|
||||
|
||||
if [ $PROTOCOL == "1" ]; then
|
||||
cat >> ${DATADIR}/qpidd.conf <<EOF
|
||||
# Used by AMQP1.0 only
|
||||
queue-patterns=exclusive
|
||||
queue-patterns=unicast
|
||||
topic-patterns=broadcast
|
||||
EOF
|
||||
# some versions of qpidd require this for AMQP 1 and SASL:
|
||||
if `$QPIDD --help | grep -q "sasl-service-name"`; then
|
||||
cat >> ${DATADIR}/qpidd.conf <<EOF
|
||||
sasl-service-name=amqp
|
||||
EOF
|
||||
fi
|
||||
fi
|
||||
|
||||
cat > ${DATADIR}/qpidd.acl <<EOF
|
||||
group admin stackqpid@QPID
|
||||
@ -41,8 +83,6 @@ EOF
|
||||
|
||||
echo secretqpid | saslpasswd2 -c -p -f ${DATADIR}/qpidd.sasldb -u QPID stackqpid
|
||||
|
||||
QPIDD=$(which qpidd 2>/dev/null)
|
||||
|
||||
mkfifo ${DATADIR}/out
|
||||
$QPIDD --log-enable info+ --log-to-file ${DATADIR}/out --config ${DATADIR}/qpidd.conf &
|
||||
wait_for_line "Broker .*running" "error" ${DATADIR}/out
|
||||
|
4
tox.ini
4
tox.ini
@ -26,7 +26,7 @@ commands = python setup.py build_sphinx
|
||||
|
||||
[testenv:py27-func-qpid]
|
||||
setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123//
|
||||
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-rabbit]
|
||||
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
@ -34,7 +34,7 @@ commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest
|
||||
[testenv:py27-func-amqp1]
|
||||
setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
|
||||
# NOTE(flaper87): This gate job run on fedora21 for now.
|
||||
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-zeromq]
|
||||
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
Loading…
x
Reference in New Issue
Block a user