diff --git a/amqp1-requirements.txt b/amqp1-requirements.txt new file mode 100644 index 000000000..bf8a37ee1 --- /dev/null +++ b/amqp1-requirements.txt @@ -0,0 +1,9 @@ +# TODO(kgiusti) AMQP 1.0 support depends on the Qpid Proton AMQP 1.0 +# development libraries. As these are not yet available from the +# Ubuntu repositories, do not require these packages by default. If +# you have installed the Proton development libraries, you can run the +# amqp1 driver tox tests this way: +# tox -e amqp1 +pyngus>=1.0.0,<2.0.0 # Apache-2.0 +python-qpid-proton>=0.7,<0.8 # Apache-2.0 + diff --git a/oslo/messaging/_drivers/protocols/__init__.py b/oslo/messaging/_drivers/protocols/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo/messaging/_drivers/protocols/amqp/__init__.py b/oslo/messaging/_drivers/protocols/amqp/__init__.py new file mode 100644 index 000000000..bdd170f57 --- /dev/null +++ b/oslo/messaging/_drivers/protocols/amqp/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2014, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from .driver import ProtonDriver diff --git a/oslo/messaging/_drivers/protocols/amqp/controller.py b/oslo/messaging/_drivers/protocols/amqp/controller.py new file mode 100644 index 000000000..d3d14213b --- /dev/null +++ b/oslo/messaging/_drivers/protocols/amqp/controller.py @@ -0,0 +1,621 @@ +# Copyright 2014, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Controller that manages the interface between the driver and the messaging +service. + +This module defines a Controller class that is responsible for performing +messaging-related operations (Tasks) requested by the driver, and for managing +the connection to the messaging service. The Controller creates a background +thread which performs all messaging operations and socket I/O. The +Controller's messaging logic is executed in the background thread via lambda +functions scheduled by the Controller. +""" + +import abc +import collections +import logging +import threading +import uuid + +import pyngus +from six import moves + +from oslo.config import cfg +from oslo.messaging._drivers.protocols.amqp import eventloop + +LOG = logging.getLogger(__name__) + +_amqp1_opts = [ + cfg.StrOpt('server_request_prefix', + default='exclusive', + help="address prefix used when sending to a specific server"), + + cfg.StrOpt('broadcast_prefix', + default='broadcast', + help="address prefix used when broadcasting to all servers"), + + cfg.StrOpt('group_request_prefix', + default='unicast', + help="address prefix when sending to any server in group"), + + cfg.StrOpt('container_name', + default=None, + help='Name for the AMQP container'), + + cfg.IntOpt('idle_timeout', + default=0, # disabled + help='Timeout for inactive connections (in seconds)'), + + cfg.BoolOpt('trace', + default=False, + help='Debug: dump AMQP frames to stdout'), + + cfg.StrOpt('ssl_ca_file', + default='', + help="CA certificate PEM file for verifing server certificate"), + + cfg.StrOpt('ssl_cert_file', + default='', + help='Identifying certificate PEM file to present to clients'), + + cfg.StrOpt('ssl_key_file', + default='', + help='Private key PEM file used to sign cert_file certificate'), + + cfg.StrOpt('ssl_key_password', + default=None, + help='Password for decrypting ssl_key_file (if encrypted)'), + + cfg.BoolOpt('allow_insecure_clients', + default=False, + help='Accept clients using either SSL or plain TCP') +] + + +class Task(object): + """Perform a messaging operation via the Controller.""" + @abc.abstractmethod + def execute(self, controller): + """This method will be run on the eventloop thread.""" + + +class Replies(pyngus.ReceiverEventHandler): + """This is the receiving link for all reply messages. Messages are routed + to the proper Listener's incoming queue using the correlation-id header in + the message. + """ + def __init__(self, connection, on_ready): + self._correlation = {} # map of correlation-id to response queue + self._ready = False + self._on_ready = on_ready + self._receiver = connection.create_receiver("replies", + event_handler=self) + # capacity determines the maximum number of reply messages this link + # can receive. As messages are received and credit is consumed, this + # driver will 'top up' the credit back to max capacity. This number + # should be large enough to avoid needlessly flow-controlling the + # replies. + self.capacity = 100 # TODO(kgiusti) guesstimate - make configurable + self._credit = 0 + self._receiver.open() + + def ready(self): + return self._ready + + def prepare_for_response(self, request, reply_queue): + """Apply a unique message identifier to this request message. This will + be used to identify messages sent in reply. The identifier is placed + in the 'id' field of the request message. It is expected that the + identifier will appear in the 'correlation-id' field of the + corresponding response message. + """ + request.id = uuid.uuid4().hex + # reply is placed on reply_queue + self._correlation[request.id] = reply_queue + request.reply_to = self._receiver.source_address + LOG.debug("Reply for msg id=%s expected on link %s", + request.id, request.reply_to) + + # Pyngus ReceiverLink event callbacks: + + def receiver_active(self, receiver_link): + """This is a Pyngus callback, invoked by Pyngus when the receiver_link + has transitioned to the open state and is able to receive incoming + messages. + """ + self._ready = True + self._update_credit() + self._on_ready() + LOG.debug("Replies expected on link %s", + self._receiver.source_address) + + def receiver_remote_closed(self, receiver, pn_condition): + """This is a Pyngus callback, invoked by Pyngus when the peer of this + receiver link has initiated closing the connection. + """ + # TODO(kgiusti) Unclear if this error will ever occur (as opposed to + # the Connection failing instead). Log for now, possibly implement a + # recovery strategy if necessary. + LOG.error("Reply subscription closed by peer: %s", + (pn_condition or "no error given")) + + def message_received(self, receiver, message, handle): + """This is a Pyngus callback, invoked by Pyngus when a new message + arrives on this receiver link from the peer. + """ + self._credit = self._credit - 1 + self._update_credit() + + key = message.correlation_id + if key in self._correlation: + LOG.debug("Received response for msg id=%s", key) + self._correlation[key].put(message) + # cleanup (only need one response per request) + del self._correlation[key] + else: + LOG.warn("Can't find receiver for response msg id=%s, dropping!", + key) + receiver.message_accepted(handle) + + def _update_credit(self): + if self.capacity > self._credit: + self._receiver.add_capacity(self.capacity - self._credit) + self._credit = self.capacity + + +class Server(pyngus.ReceiverEventHandler): + """A group of links that receive messages from a set of addresses derived + from a given target. Messages arriving on the links are placed on the + 'incoming' queue. + """ + def __init__(self, addresses, incoming): + self._incoming = incoming + self._addresses = addresses + + def attach(self, connection): + """Create receiver links over the given connection for all the + configured addresses. + """ + self._receivers = [] + for a in self._addresses: + props = {"snd-settle-mode": "settled"} + rname = "Consumer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, a, a) + r = connection.create_receiver(source_address=a, + target_address=a, + event_handler=self, + name=rname, + properties=props) + + # TODO(kgiusti) Hardcoding credit here is sub-optimal. A better + # approach would monitor for a back-up of inbound messages to be + # processed by the consuming application and backpressure the + # sender based on configured thresholds. + r.add_capacity(500) + r.open() + self._receivers.append(r) + + # Pyngus ReceiverLink event callbacks: + + def receiver_remote_closed(self, receiver, pn_condition): + """This is a Pyngus callback, invoked by Pyngus when the peer of this + receiver link has initiated closing the connection. + """ + text = "Server subscription %(addr)s closed by peer: %(err_msg)s" + vals = { + "addr": receiver.source_address or receiver.target_address, + "err_msg": pn_condition or "no error given" + } + LOG.error(text % vals) + + def message_received(self, receiver, message, handle): + """This is a Pyngus callback, invoked by Pyngus when a new message + arrives on this receiver link from the peer. + """ + # TODO(kgiusti) Sub-optimal to grant one credit each time a message + # arrives. A better approach would grant batches of credit on demand. + receiver.add_capacity(1) + self._incoming.put(message) + LOG.debug("message received: %s", message) + receiver.message_accepted(handle) + + +class Hosts(object): + """An order list of peer addresses. Connection failover progresses from + one host to the next. + """ + HostnamePort = collections.namedtuple('HostnamePort', + ['hostname', 'port']) + + def __init__(self, entries=None): + self._entries = [self.HostnamePort(h, p) for h, p in entries or []] + self._current = 0 + + def add(self, hostname, port=5672): + self._entries.append(self.HostnamePort(hostname, port)) + + @property + def current(self): + if len(self._entries): + return self._entries[self._current] + else: + return self.HostnamePort("localhost", 5672) + + def next(self): + if len(self._entries) > 1: + self._current = (self._current + 1) % len(self._entries) + return self.current + + def __repr__(self): + return '<Hosts ' + str(self) + '>' + + def __str__(self): + return ", ".join(["%s:%i" % e for e in self._entries]) + + +class Controller(pyngus.ConnectionEventHandler): + """Controls the connection to the AMQP messaging service. This object is + the 'brains' of the driver. It maintains the logic for addressing, sending + and receiving messages, and managing the connection. All messaging and I/O + work is done on the Eventloop thread, allowing the driver to run + asynchronously from the messaging clients. + """ + def __init__(self, hosts, default_exchange, config): + self.processor = None + # queue of Task() objects to execute on the eventloop once the + # connection is ready: + self._tasks = moves.queue.Queue(maxsize=500) + # limit the number of Task()'s to execute per call to _process_tasks(). + # This allows the eventloop main thread to return to servicing socket + # I/O in a timely manner + self._max_task_batch = 50 + # cache of sending links indexed by address: + self._senders = {} + # Servers (set of receiving links), indexed by target: + self._servers = {} + self.hosts = Hosts(hosts) + + opt_group = cfg.OptGroup(name='amqp1', + title='AMQP 1.0 options') + config.register_group(opt_group) + config.register_opts(_amqp1_opts, group=opt_group) + + self.server_request_prefix = config.amqp1.server_request_prefix + self.broadcast_prefix = config.amqp1.broadcast_prefix + self.group_request_prefix = config.amqp1.group_request_prefix + self._container_name = config.amqp1.container_name + if not self._container_name: + self._container_name = "container-%s" % uuid.uuid4().hex + self.idle_timeout = config.amqp1.idle_timeout + self.trace_protocol = config.amqp1.trace + self.ssl_ca_file = config.amqp1.ssl_ca_file + self.ssl_cert_file = config.amqp1.ssl_cert_file + self.ssl_key_file = config.amqp1.ssl_key_file + self.ssl_key_password = config.amqp1.ssl_key_password + self.ssl_allow_insecure = config.amqp1.allow_insecure_clients + self.separator = "." + self.fanout_qualifier = "all" + self.default_exchange = default_exchange + + # can't handle a request until the replies link is active, as + # we need the peer assigned address, so need to delay any + # processing of task queue until this is done + self._replies = None + # Set True when the driver is shutting down + self._closing = False + # only schedule one outstanding reconnect attempt at a time + self._reconnecting = False + self._delay = 0 # seconds between retries + # prevent queuing up multiple requests to run _process_tasks() + self._process_tasks_scheduled = False + self._process_tasks_lock = threading.Lock() + + def connect(self): + """Connect to the messaging service.""" + self.processor = eventloop.Thread(self._container_name) + self.processor.wakeup(lambda: self._do_connect()) + + def add_task(self, task): + """Add a Task for execution on processor thread.""" + self._tasks.put(task) + self._schedule_task_processing() + + def destroy(self): + """Shutdown the messaging service.""" + if self.processor: + self.processor.wakeup(lambda: self._start_shutdown()) + LOG.info("Waiting for eventloop to exit") + self.processor.join() + self.processor = None + LOG.info("Eventloop exited, driver shut down") + + # The remaining methods are reserved to run from the eventloop thread only! + # They must not be invoked directly! + + # methods executed by Tasks created by the driver: + + def request(self, target, request, reply_queue=None): + """Send a request message to the given target, and arrange for a + response to be put on the optional reply_queue if specified + """ + address = self._resolve(target) + LOG.debug("Sending request for %s to %s", target, address) + if reply_queue is not None: + self._replies.prepare_for_response(request, reply_queue) + self._send(address, request) + + def response(self, address, response): + LOG.debug("Sending response to %s", address) + self._send(address, response) + + def subscribe(self, target, in_queue): + """Subscribe to messages sent to 'target', place received messages on + 'in_queue'. + """ + addresses = [ + self._server_address(target), + self._broadcast_address(target), + self._group_request_address(target) + ] + self._subscribe(target, addresses, in_queue) + + def subscribe_notifications(self, target, in_queue): + """Subscribe for notifications on 'target', place received messages on + 'in_queue'. + """ + addresses = [self._group_request_address(target)] + self._subscribe(target, addresses, in_queue) + + def _subscribe(self, target, addresses, in_queue): + LOG.debug("Subscribing to %s (%s)", target, addresses) + self._servers[target] = Server(addresses, in_queue) + self._servers[target].attach(self._socket_connection.connection) + + def _resolve(self, target): + """Return a link address for a given target.""" + if target.server: + return self._server_address(target) + elif target.fanout: + return self._broadcast_address(target) + else: + return self._group_request_address(target) + + def _sender(self, address): + # if we already have a sender for that address, use it + # else establish the sender and cache it + if address in self._senders: + sender = self._senders[address] + else: + sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, + address, address) + conn = self._socket_connection.connection + sender = conn.create_sender(source_address=address, + target_address=address, + name=sname) + sender.open() + self._senders[address] = sender + return sender + + def _send(self, addr, message): + """Send the message out the link addressed by 'addr'.""" + address = str(addr) + message.address = address + self._sender(address).send(message) + + def _server_address(self, target): + return self._concatenate([self.server_request_prefix, + target.exchange or self.default_exchange, + target.topic, target.server]) + + def _broadcast_address(self, target): + return self._concatenate([self.broadcast_prefix, + target.exchange or self.default_exchange, + target.topic, self.fanout_qualifier]) + + def _group_request_address(self, target): + return self._concatenate([self.group_request_prefix, + target.exchange or self.default_exchange, + target.topic]) + + def _concatenate(self, items): + return self.separator.join(filter(bool, items)) + + # commands executed on the processor (eventloop) via 'wakeup()': + + def _do_connect(self): + """Establish connection and reply subscription on processor thread.""" + hostname = self.hosts.current.hostname + port = self.hosts.current.port + conn_props = {} + if self.idle_timeout: + conn_props["idle-time-out"] = float(self.idle_timeout) + if self.trace_protocol: + conn_props["x-trace-protocol"] = self.trace_protocol + if self.ssl_ca_file: + conn_props["x-ssl-ca-file"] = self.ssl_ca_file + if self.ssl_cert_file: + # assume this connection is for a server. If client authentication + # support is developed, we'll need an explict flag (server or + # client) + conn_props["x-ssl-server"] = True + conn_props["x-ssl-identity"] = (self.ssl_cert_file, + self.ssl_key_file, + self.ssl_key_password) + conn_props["x-ssl-allow-cleartext"] = self.ssl_allow_insecure + self._socket_connection = self.processor.connect(hostname, port, + handler=self, + properties=conn_props) + LOG.debug("Connection initiated") + + def _process_tasks(self): + """Execute Task objects in the context of the processor thread.""" + with self._process_tasks_lock: + self._process_tasks_scheduled = False + count = 0 + while (not self._tasks.empty() and + count < self._max_task_batch and + self._can_process_tasks): + try: + self._tasks.get(False).execute(self) + except Exception as e: + LOG.exception("Error processing task: %s", e) + count += 1 + + # if we hit _max_task_batch, resume task processing later: + if not self._tasks.empty() and self._can_process_tasks: + self._schedule_task_processing() + + def _schedule_task_processing(self): + """_process_tasks() helper: prevent queuing up multiple requests for + task processing. This method is called both by the application thread + and the processing thread. + """ + if self.processor: + with self._process_tasks_lock: + already_scheduled = self._process_tasks_scheduled + self._process_tasks_scheduled = True + if not already_scheduled: + self.processor.wakeup(lambda: self._process_tasks()) + + @property + def _can_process_tasks(self): + """_process_tasks helper(): indicates that the driver is ready to + process Tasks. In order to process messaging-related tasks, the reply + queue link must be active. + """ + return (not self._closing and + self._replies and self._replies.ready()) + + def _start_shutdown(self): + """Called when the driver destroys the controller, this method attempts + to cleanly close the AMQP connection to the peer. + """ + LOG.info("Shutting down AMQP connection") + self._closing = True + if self._socket_connection.connection.active: + # try a clean shutdown + self._socket_connection.connection.close() + else: + # don't wait for a close from the remote, may never happen + self._complete_shutdown() + + # reply link active callback: + + def _reply_link_ready(self): + """Invoked when the Replies reply link has become active. At this + point, we are ready to send/receive messages (via Task processing). + """ + LOG.info("Messaging is active (%s:%i)", self.hosts.current.hostname, + self.hosts.current.port) + self._schedule_task_processing() + + # callback from eventloop on socket error + + def socket_error(self, error): + """Called by eventloop when a socket error occurs.""" + LOG.debug("Socket failure: %s", error) + self._handle_connection_loss() + + # Pyngus connection event callbacks (and their helpers), all invoked from + # the eventloop thread: + + def connection_failed(self, connection, error): + """This is a Pyngus callback, invoked by Pyngus when a non-recoverable + error occurs on the connection. + """ + if connection is not self._socket_connection.connection: + # pyngus bug: ignore failure callback on destroyed connections + return + LOG.debug("AMQP Connection failure: %s", error) + self._handle_connection_loss() + + def connection_active(self, connection): + """This is a Pyngus callback, invoked by Pyngus when the connection to + the peer is up. At this point, the driver will activate all subscriber + links (server) and the reply link. + """ + LOG.debug("Connection active (%s:%i), subscribing...", + self.hosts.current.hostname, self.hosts.current.port) + for s in self._servers.itervalues(): + s.attach(self._socket_connection.connection) + self._replies = Replies(self._socket_connection.connection, + lambda: self._reply_link_ready()) + self._delay = 0 + + def connection_closed(self, connection): + """This is a Pyngus callback, invoked by Pyngus when the connection has + cleanly closed. This occurs after the driver closes the connection + locally, and the peer has acknowledged the close. At this point, the + shutdown of the driver's connection is complete. + """ + LOG.debug("AMQP connection closed.") + # if the driver isn't being shutdown, failover and reconnect + self._handle_connection_loss() + + def connection_remote_closed(self, connection, reason): + """This is a Pyngus callback, invoked by Pyngus when the peer has + requested that the connection be closed. + """ + if not self._closing: + # The messaging service/broker is trying to shut down the + # connection. Acknowledge the close, and try to reconnect/failover + # later once the connection has closed (connection_closed is + # called). + LOG.info("Connection closed by peer: %s", + reason or "no reason given") + self._socket_connection.connection.close() + + def _complete_shutdown(self): + """The AMQP Connection has closed, and the driver shutdown is complete. + Clean up controller resources and exit. + """ + self._socket_connection.close() + self.processor.shutdown() + LOG.info("Messaging has shutdown") + + def _handle_connection_loss(self): + """The connection to the messaging service has been lost. Try to + reestablish the connection/failover. + """ + if self._closing: + # we're in the middle of shutting down the driver anyways, + # just consider it done: + self._complete_shutdown() + else: + # for some reason, we've lost the connection to the messaging + # service. Try to re-establish the connection: + if not self._reconnecting: + self._reconnecting = True + self._replies = None + if self._delay == 0: + self._delay = 1 + self._do_reconnect() + else: + d = self._delay + LOG.info("delaying reconnect attempt for %d seconds", d) + self.processor.schedule(lambda: self._do_reconnect(), d) + self._delay = min(d * 2, 60) + + def _do_reconnect(self): + """Invoked on connection/socket failure, failover and re-connect to the + messaging service. + """ + if not self._closing: + self._reconnecting = False + self._senders = {} + self._socket_connection.reset() + hostname, port = self.hosts.next() + LOG.info("Reconnecting to: %s:%i", hostname, port) + self._socket_connection.connect(hostname, port) diff --git a/oslo/messaging/_drivers/protocols/amqp/driver.py b/oslo/messaging/_drivers/protocols/amqp/driver.py new file mode 100644 index 000000000..b15c97298 --- /dev/null +++ b/oslo/messaging/_drivers/protocols/amqp/driver.py @@ -0,0 +1,322 @@ +# Copyright 2014, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Driver for the 'amqp' transport. + +This module provides a transport driver that speaks version 1.0 of the AMQP +messaging protocol. The driver sends messages and creates subscriptions via +'tasks' that are performed on its behalf via the controller module. +""" + +import logging +import threading +import time + +from six import moves + +from oslo import messaging +from oslo.messaging._drivers import base +from oslo.messaging._drivers import common +from oslo.messaging.openstack.common import importutils +from oslo.messaging.openstack.common import jsonutils +from oslo.messaging import target as messaging_target + +# TODO(kgiusti): this module depends on platform specific libraries (proton) +# which are not available on all systems (yet). The unittest loader will +# attempt to directly import this driver even if the dependent libraries are +# not installed. Since the default set of unit tests do not exercise this +# driver, we shouldn't cause them to fail due to the missing +# dependencies. These hacks allow the import to succeed without raising an +# import error and causing all the tests to fail. [Note: to run the set of test +# for this driver, use the 'amqp1' test environment - e.g. 'tox -eamqp1'] +# +# Remove these hacks once the qpid-proton C libraries are available via Ubuntu +# base repos and can be added to the base test-requirements.txt [they are +# already available via EPEL]: + + +class _FakeController(object): + """A mocked Controller to use if the controller module fails to import + due to missing dependencies. Stubs out the _amqp1_opts option list and + provides a fake 'Task' superclass so the sub-classes SendTask, ListenTask, + and ReplyTask defined by this module will parse correctly on import. + + This allows the tests to import the driver.py module without failing even + if the proton libraries are not installed. Be aware that attempting to use + (instantiate) the PythonDriver will raise a NotImplementedError if the fake + controller is in use. This is by design since the driver really cannot + work without the real controller and its dependencies. + """ + fake_controller = True + Task = type('Task', (object,), {}) + _amqp1_opts = list() + + +proton = importutils.try_import("proton") +try: + from oslo.messaging._drivers.protocols.amqp import controller +except ImportError: + controller = _FakeController() + + +def get_opts(): + """Provide access to the controller's configuration options.""" + return controller._amqp1_opts +# TODO(kgiusti) End of hack + +LOG = logging.getLogger(__name__) + + +class SendTask(controller.Task): + """A task that sends a message to a target, and optionally allows for the + calling thread to wait for a reply. + """ + def __init__(self, target, request, reply_expected, deadline): + super(SendTask, self).__init__() + self._target = target + self._request = request + self._deadline = deadline + if reply_expected: + self._reply_queue = moves.queue.Queue() + else: + self._reply_queue = None + + def execute(self, controller): + """Runs on eventloop thread - sends request.""" + if not self._deadline or self._deadline > time.time(): + controller.request(self._target, self._request, self._reply_queue) + else: + LOG.warn("Send request to %s aborted: TTL expired.", self._target) + + def get_reply(self, timeout): + """Retrieve the reply.""" + if not self._reply_queue: + return None + try: + return self._reply_queue.get(timeout=timeout) + except moves.queue.Empty: + raise messaging.MessagingTimeout('Timed out waiting for a reply') + + +class ListenTask(controller.Task): + """A task that creates a subscription to the given target. Messages + arriving from the target are given to the listener. + """ + def __init__(self, target, listener, notifications=False): + """Create a subscription to the target.""" + super(ListenTask, self).__init__() + self._target = target + self._listener = listener + self._notifications = notifications + + def execute(self, controller): + """Run on the eventloop thread - subscribes to target. Inbound messages + are queued to the listener's incoming queue. + """ + if self._notifications: + controller.subscribe_notifications(self._target, + self._listener.incoming) + else: + controller.subscribe(self._target, self._listener.incoming) + + +class ReplyTask(controller.Task): + """A task that sends 'response' message to address.""" + def __init__(self, address, response, log_failure): + super(ReplyTask, self).__init__() + self._address = address + self._response = response + self._log_failure = log_failure + + def execute(self, controller): + """Run on the eventloop thread - send the response message.""" + controller.response(self._address, self._response) + + +def marshal_response(reply=None, failure=None): + # TODO(grs): do replies have a context? + msg = proton.Message() + if failure: + failure = common.serialize_remote_exception(failure) + data = {"failure": failure} + else: + data = {"response": reply} + msg.body = jsonutils.dumps(data) + return msg + + +def unmarshal_response(message, allowed): + # TODO(kgiusti) This may fail to unpack and raise an exception. Need to + # communicate this to the caller! + data = jsonutils.loads(message.body) + failure = data.get('failure') + if failure is not None: + raise common.deserialize_remote_exception(failure, allowed) + return data.get("response") + + +def marshal_request(request, context, envelope): + msg = proton.Message() + if envelope: + request = common.serialize_msg(request) + data = { + "request": request, + "context": context + } + msg.body = jsonutils.dumps(data) + return msg + + +def unmarshal_request(message): + data = jsonutils.loads(message.body) + return (data.get("request"), data.get("context")) + + +class ProtonIncomingMessage(base.IncomingMessage): + def __init__(self, listener, ctxt, request, message): + super(ProtonIncomingMessage, self).__init__(listener, ctxt, request) + self._reply_to = message.reply_to + self._correlation_id = message.id + + def reply(self, reply=None, failure=None, log_failure=True): + """Schedule a ReplyTask to send the reply.""" + if self._reply_to: + response = marshal_response(reply=reply, failure=failure) + response.correlation_id = self._correlation_id + LOG.debug("Replying to %s", self._correlation_id) + task = ReplyTask(self._reply_to, response, log_failure) + self.listener.driver._ctrl.add_task(task) + else: + LOG.debug("Ignoring reply as no reply address available") + + def acknowledge(self): + pass + + def requeue(self): + pass + + +class ProtonListener(base.Listener): + def __init__(self, driver): + super(ProtonListener, self).__init__(driver) + self.incoming = moves.queue.Queue() + + def poll(self): + message = self.incoming.get() + request, ctxt = unmarshal_request(message) + LOG.debug("Returning incoming message") + return ProtonIncomingMessage(self, ctxt, request, message) + + +class ProtonDriver(base.BaseDriver): + + def __init__(self, conf, url, + default_exchange=None, allowed_remote_exmods=[]): + # TODO(kgiusti) Remove once driver fully stabilizes: + LOG.warning("Support for the 'amqp' transport is EXPERIMENTAL.") + if proton is None or hasattr(controller, "fake_controller"): + raise NotImplementedError("Proton AMQP C libraries not installed") + + super(ProtonDriver, self).__init__(conf, url, default_exchange, + allowed_remote_exmods) + # TODO(grs): handle authentication etc + hosts = [(h.hostname, h.port or 5672) for h in url.hosts] + + # Create a Controller that connects to the messaging service: + self._ctrl = controller.Controller(hosts, default_exchange, conf) + + # lazy connection setup - don't cause the controller to connect until + # after the first messaging request: + self._connect_called = False + self._lock = threading.Lock() + + def _ensure_connect_called(func): + """Causes the controller to connect to the messaging service when it is + first used. It is safe to push tasks to it whether connected or not, + but those tasks won't be processed until connection completes. + """ + def wrap(self, *args, **kws): + with self._lock: + connect_called = self._connect_called + self._connect_called = True + if not connect_called: + self._ctrl.connect() + return func(self, *args, **kws) + return wrap + + @_ensure_connect_called + def send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=False, + retry=None): + """Send a message to the given target.""" + # TODO(kgiusti) need to add support for retry + if retry is not None: + raise NotImplementedError('"retry" not implemented by' + 'this transport driver') + + request = marshal_request(message, ctxt, envelope) + expire = 0 + if timeout: + expire = time.time() + timeout # when the caller times out + # amqp uses millisecond time values, timeout is seconds + request.ttl = int(timeout * 1000) + request.expiry_time = int(expire * 1000) + LOG.debug("Send to %s", target) + task = SendTask(target, request, wait_for_reply, expire) + self._ctrl.add_task(task) + result = None + if wait_for_reply: + # the following can raise MessagingTimeout if no reply received: + reply = task.get_reply(timeout) + # TODO(kgiusti) how to handle failure to un-marshal? Must log, and + # determine best way to communicate this failure back up to the + # caller + result = unmarshal_response(reply, self._allowed_remote_exmods) + LOG.debug("Send to %s returning", target) + return result + + @_ensure_connect_called + def send_notification(self, target, ctxt, message, version, + retry=None): + """Send a notification message to the given target.""" + # TODO(kgiusti) need to add support for retry + if retry is not None: + raise NotImplementedError('"retry" not implemented by' + 'this transport driver') + return self.send(target, ctxt, message, envelope=(version == 2.0)) + + @_ensure_connect_called + def listen(self, target): + """Construct a Listener for the given target.""" + LOG.debug("Listen to %s", target) + listener = ProtonListener(self) + self._ctrl.add_task(ListenTask(target, listener)) + return listener + + @_ensure_connect_called + def listen_for_notifications(self, targets_and_priorities): + LOG.debug("Listen for notifications %s", targets_and_priorities) + listener = ProtonListener(self) + for target, priority in targets_and_priorities: + topic = '%s.%s' % (target.topic, priority) + t = messaging_target.Target(topic=topic) + self._ctrl.add_task(ListenTask(t, listener, True)) + return listener + + def cleanup(self): + """Release all resources.""" + LOG.debug("Cleaning up ProtonDriver") + self._ctrl.destroy() + self._ctrl = None diff --git a/oslo/messaging/_drivers/protocols/amqp/eventloop.py b/oslo/messaging/_drivers/protocols/amqp/eventloop.py new file mode 100644 index 000000000..806f27100 --- /dev/null +++ b/oslo/messaging/_drivers/protocols/amqp/eventloop.py @@ -0,0 +1,326 @@ +# Copyright 2014, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A thread that performs all messaging I/O and protocol event handling. + +This module provides a background thread that handles messaging operations +scheduled via the Controller, and performs blocking socket I/O and timer +processing. This thread is designed to be as simple as possible - all the +protocol specific intelligence is provided by the Controller and executed on +the background thread via callables. +""" + +import errno +import heapq +import logging +import os +import select +import socket +import sys +import threading +import time +import uuid + +import pyngus +from six import moves + +LOG = logging.getLogger(__name__) + + +class _SocketConnection(): + """Associates a pyngus Connection with a python network socket, + and handles all connection-related I/O and timer events. + """ + + def __init__(self, name, container, properties, handler): + self.name = name + self.socket = None + self._properties = properties or {} + self._properties["properties"] = self._get_name_and_pid() + # The handler is a pyngus ConnectionEventHandler, which is invoked by + # pyngus on connection-related events (active, closed, error, etc). + # Currently it is the Controller object. + self._handler = handler + self._container = container + c = container.create_connection(name, handler, self._properties) + c.user_context = self + self.connection = c + + def _get_name_and_pid(self): + # helps identify the process that is using the connection + return {u'process': os.path.basename(sys.argv[0]), u'pid': os.getpid()} + + def fileno(self): + """Allows use of a _SocketConnection in a select() call. + """ + return self.socket.fileno() + + def read(self): + """Called when socket is read-ready.""" + while True: + try: + rc = pyngus.read_socket_input(self.connection, self.socket) + if rc > 0: + self.connection.process(time.time()) + return rc + except socket.error as e: + if e.errno == errno.EAGAIN or e.errno == errno.EINTR: + continue + elif e.errno == errno.EWOULDBLOCK: + return 0 + else: + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS + + def write(self): + """Called when socket is write-ready.""" + while True: + try: + rc = pyngus.write_socket_output(self.connection, self.socket) + if rc > 0: + self.connection.process(time.time()) + return rc + except socket.error as e: + if e.errno == errno.EAGAIN or e.errno == errno.EINTR: + continue + elif e.errno == errno.EWOULDBLOCK: + return 0 + else: + self._handler.socket_error(str(e)) + return pyngus.Connection.EOS + + def connect(self, hostname, port, sasl_mechanisms="ANONYMOUS"): + """Connect to host:port and start the AMQP protocol.""" + addr = socket.getaddrinfo(hostname, port, + socket.AF_INET, socket.SOCK_STREAM) + if not addr: + key = "%s:%i" % (hostname, port) + error = "Invalid peer address '%s'" % key + LOG.error(error) + self._handler.socket_error(error) + return + my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2]) + my_socket.setblocking(0) # 0=non-blocking + my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + try: + my_socket.connect(addr[0][4]) + except socket.error as e: + if e.errno != errno.EINPROGRESS: + error = "Socket connect failure '%s'" % str(e) + LOG.error(error) + self._handler.socket_error(error) + return + self.socket = my_socket + + if sasl_mechanisms: + pn_sasl = self.connection.pn_sasl + pn_sasl.mechanisms(sasl_mechanisms) + # TODO(kgiusti): server if accepting inbound connections + pn_sasl.client() + self.connection.open() + + def reset(self, name=None): + """Clean up the current state, expect 'connect()' to be recalled + later. + """ + if self.connection: + self.connection.destroy() + self.close() + if name: + self.name = name + c = self._container.create_connection(self.name, self._handler, + self._properties) + c.user_context = self + self.connection = c + + def close(self): + if self.socket: + self.socket.close() + + +class Schedule(object): + """A list of callables (requests). Each callable may have a delay (in + milliseconds) which causes the callable to be scheduled to run after the + delay passes. + """ + def __init__(self): + self._entries = [] + + def schedule(self, request, delay): + """Request a callable be executed after delay.""" + entry = (time.time() + delay, request) + heapq.heappush(self._entries, entry) + + def get_delay(self, max_delay=None): + """Get the delay in milliseconds until the next callable needs to be + run, or 'max_delay' if no outstanding callables or the delay to the + next callable is > 'max_delay'. + """ + due = self._entries[0][0] if self._entries else None + if due is None: + return max_delay + now = time.time() + if due < now: + return 0 + else: + return min(due - now, max_delay) if max_delay else due - now + + def process(self): + """Invoke all expired callables.""" + while self._entries and self._entries[0][0] < time.time(): + heapq.heappop(self._entries)[1]() + + +class Requests(object): + """A queue of callables to execute from the eventloop thread's main + loop. + """ + def __init__(self): + self._requests = moves.queue.Queue(maxsize=10) + self._wakeup_pipe = os.pipe() + + def wakeup(self, request=None): + """Enqueue a callable to be executed by the eventloop, and force the + eventloop thread to wake up from select(). + """ + if request: + self._requests.put(request) + os.write(self._wakeup_pipe[1], "!") + + def fileno(self): + """Allows this request queue to be used by select().""" + return self._wakeup_pipe[0] + + def read(self): + """Invoked by the eventloop thread, execute each queued callable.""" + os.read(self._wakeup_pipe[0], 512) + # first pop of all current tasks + requests = [] + while not self._requests.empty(): + requests.append(self._requests.get()) + # then process them, this allows callables to re-register themselves to + # be run on the next iteration of the I/O loop + for r in requests: + r() + + +class Thread(threading.Thread): + """Manages socket I/O and executes callables queued up by external + threads. + """ + def __init__(self, container_name=None): + super(Thread, self).__init__() + + # callables from other threads: + self._requests = Requests() + # delayed callables (only used on this thread for now): + self._schedule = Schedule() + + # Configure a container + if container_name is None: + container_name = uuid.uuid4().hex + self._container = pyngus.Container(container_name) + + self.name = "Thread for Proton container: %s" % self._container.name + self._shutdown = False + self.daemon = True + self.start() + + def wakeup(self, request=None): + """Wake up the eventloop thread, Optionally providing a callable to run + when the eventloop wakes up. + """ + self._requests.wakeup(request) + + def schedule(self, request, delay): + """Invoke request after delay seconds.""" + self._schedule.schedule(request, delay) + + def destroy(self): + """Stop the processing thread, releasing all resources. + """ + LOG.debug("Stopping Proton container %s", self._container.name) + self.wakeup(lambda: self.shutdown()) + self.join() + + def shutdown(self): + LOG.info("eventloop shutdown requested") + self._shutdown = True + + def connect(self, hostname, port, handler, properties=None, name=None, + sasl_mechanisms="ANONYMOUS"): + """Get a _SocketConnection to a peer represented by url.""" + key = name or "%s:%i" % (hostname, port) + # return pre-existing + conn = self._container.get_connection(key) + if conn: + return conn.user_context + + # create a new connection - this will be stored in the + # container, using the specified name as the lookup key, or if + # no name was provided, the host:port combination + sc = _SocketConnection(key, self._container, + properties, handler=handler) + sc.connect(hostname, port, sasl_mechanisms) + return sc + + def run(self): + """Run the proton event/timer loop.""" + LOG.debug("Starting Proton thread, container=%s", + self._container.name) + + while not self._shutdown: + readers, writers, timers = self._container.need_processing() + + readfds = [c.user_context for c in readers] + # additionally, always check for readability of pipe we + # are using to wakeup processing thread by other threads + readfds.append(self._requests) + writefds = [c.user_context for c in writers] + + timeout = None + if timers: + deadline = timers[0].deadline # 0 == next expiring timer + now = time.time() + timeout = 0 if deadline <= now else deadline - now + + # adjust timeout for any deferred requests + timeout = self._schedule.get_delay(timeout) + + try: + results = select.select(readfds, writefds, [], timeout) + except select.error as serror: + if serror[0] == errno.EINTR: + LOG.warning("ignoring interrupt from select(): %s", + str(serror)) + continue + raise # assuming fatal... + readable, writable, ignore = results + + for r in readable: + r.read() + + self._schedule.process() # run any deferred requests + for t in timers: + if t.deadline > time.time(): + break + t.process(time.time()) + + for w in writable: + w.write() + + LOG.info("eventloop thread exiting, container=%s", + self._container.name) + self._container.destroy() diff --git a/oslo/messaging/opts.py b/oslo/messaging/opts.py index 603156e04..5ca0ab4aa 100644 --- a/oslo/messaging/opts.py +++ b/oslo/messaging/opts.py @@ -27,6 +27,7 @@ from oslo.messaging._drivers import impl_zmq from oslo.messaging._drivers import matchmaker from oslo.messaging._drivers import matchmaker_redis from oslo.messaging._drivers import matchmaker_ring +from oslo.messaging._drivers.protocols.amqp import driver as amqp1_driver from oslo.messaging._executors import impl_eventlet from oslo.messaging.notify import notifier from oslo.messaging.rpc import client @@ -41,7 +42,8 @@ _global_opt_lists = [ impl_eventlet._eventlet_opts, notifier._notifier_opts, client._client_opts, - transport._transport_opts + transport._transport_opts, + amqp1_driver.get_opts() ] _opts = [ diff --git a/setup.cfg b/setup.cfg index 2e650fb0d..994d4ccbd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,7 @@ oslo.messaging.drivers = rabbit = oslo.messaging._drivers.impl_rabbit:RabbitDriver qpid = oslo.messaging._drivers.impl_qpid:QpidDriver zmq = oslo.messaging._drivers.impl_zmq:ZmqDriver + amqp = oslo.messaging._drivers.protocols.amqp:ProtonDriver # To avoid confusion kombu = oslo.messaging._drivers.impl_rabbit:RabbitDriver diff --git a/tests/test_amqp_driver.py b/tests/test_amqp_driver.py new file mode 100644 index 000000000..7df6eb11e --- /dev/null +++ b/tests/test_amqp_driver.py @@ -0,0 +1,654 @@ +# Copyright (C) 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import select +import socket +import threading +import time +import uuid + +from six import moves +import testtools + +from oslo import messaging +from oslo.messaging._drivers.protocols.amqp import driver as amqp_driver +from oslo.messaging.openstack.common import importutils +from tests import utils as test_utils + +# TODO(kgiusti) Conditionally run these tests only if the necessary +# dependencies are installed. This should be removed once the proton libraries +# are available in the base repos for all supported platforms. +pyngus = importutils.try_import("pyngus") + +LOG = logging.getLogger(__name__) + + +class _ListenerThread(threading.Thread): + """Run a blocking listener in a thread.""" + def __init__(self, listener, msg_count): + super(_ListenerThread, self).__init__() + self.listener = listener + self.msg_count = msg_count + self.messages = moves.queue.Queue() + self.daemon = True + self.start() + + def run(self): + LOG.info("Listener started") + while self.msg_count > 0: + in_msg = self.listener.poll() + self.messages.put(in_msg) + self.msg_count -= 1 + if in_msg.message.get('method') == 'echo': + in_msg.reply(reply={'correlation-id': + in_msg.message.get('id')}) + LOG.info("Listener stopped") + + def get_messages(self): + """Returns a list of all received messages.""" + msgs = [] + try: + while True: + m = self.messages.get(False) + msgs.append(m) + except moves.queue.Empty: + pass + return msgs + + +@testtools.skipUnless(pyngus, "proton modules not present") +class TestProtonDriverLoad(test_utils.BaseTestCase): + + def setUp(self): + super(TestProtonDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'amqp' + + def test_driver_load(self): + transport = messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, + amqp_driver.ProtonDriver) + + +class _AmqpBrokerTestCase(test_utils.BaseTestCase): + + @testtools.skipUnless(pyngus, "proton modules not present") + def setUp(self): + LOG.info("Starting Broker Test") + super(_AmqpBrokerTestCase, self).setUp() + self._broker = FakeBroker() + self._broker_addr = "amqp://%s:%d" % (self._broker.host, + self._broker.port) + self._broker_url = messaging.TransportURL.parse(self.conf, + self._broker_addr) + self._broker.start() + + def tearDown(self): + super(_AmqpBrokerTestCase, self).tearDown() + self._broker.stop() + LOG.info("Broker Test Ended") + + +class TestAmqpSend(_AmqpBrokerTestCase): + """Test sending and receiving messages.""" + + def test_driver_unconnected_cleanup(self): + """Verify the driver can cleanly shutdown even if never connected.""" + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + driver.cleanup() + + def test_listener_cleanup(self): + """Verify unused listener can cleanly shutdown.""" + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = messaging.Target(topic="test-topic") + listener = driver.listen(target) + self.assertIsInstance(listener, amqp_driver.ProtonListener) + driver.cleanup() + + def test_send_no_reply(self): + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = messaging.Target(topic="test-topic") + listener = _ListenerThread(driver.listen(target), 1) + rc = driver.send(target, {"context": True}, + {"msg": "value"}, wait_for_reply=False) + self.assertIsNone(rc) + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + self.assertEqual(listener.messages.get().message, {"msg": "value"}) + driver.cleanup() + + def test_send_exchange_with_reply(self): + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target1 = messaging.Target(topic="test-topic", exchange="e1") + listener1 = _ListenerThread(driver.listen(target1), 1) + target2 = messaging.Target(topic="test-topic", exchange="e2") + listener2 = _ListenerThread(driver.listen(target2), 1) + + rc = driver.send(target1, {"context": "whatever"}, + {"method": "echo", "id": "e1"}, + wait_for_reply=True, + timeout=30) + self.assertIsNotNone(rc) + self.assertEqual(rc.get('correlation-id'), 'e1') + + rc = driver.send(target2, {"context": "whatever"}, + {"method": "echo", "id": "e2"}, + wait_for_reply=True, + timeout=30) + self.assertIsNotNone(rc) + self.assertEqual(rc.get('correlation-id'), 'e2') + + listener1.join(timeout=30) + self.assertFalse(listener1.isAlive()) + listener2.join(timeout=30) + self.assertFalse(listener2.isAlive()) + driver.cleanup() + + def test_messaging_patterns(self): + """Verify the direct, shared, and fanout message patterns work.""" + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target1 = messaging.Target(topic="test-topic", server="server1") + listener1 = _ListenerThread(driver.listen(target1), 4) + target2 = messaging.Target(topic="test-topic", server="server2") + listener2 = _ListenerThread(driver.listen(target2), 3) + + shared_target = messaging.Target(topic="test-topic") + fanout_target = messaging.Target(topic="test-topic", + fanout=True) + # this should go to only one server: + driver.send(shared_target, {"context": "whatever"}, + {"method": "echo", "id": "either-1"}, + wait_for_reply=True) + self.assertEqual(self._broker.topic_count, 1) + self.assertEqual(self._broker.direct_count, 1) # reply + + # this should go to the other server: + driver.send(shared_target, {"context": "whatever"}, + {"method": "echo", "id": "either-2"}, + wait_for_reply=True) + self.assertEqual(self._broker.topic_count, 2) + self.assertEqual(self._broker.direct_count, 2) # reply + + # these should only go to listener1: + driver.send(target1, {"context": "whatever"}, + {"method": "echo", "id": "server1-1"}, + wait_for_reply=True) + + driver.send(target1, {"context": "whatever"}, + {"method": "echo", "id": "server1-2"}, + wait_for_reply=True) + self.assertEqual(self._broker.direct_count, 6) # 2X(send+reply) + + # this should only go to listener2: + driver.send(target2, {"context": "whatever"}, + {"method": "echo", "id": "server2"}, + wait_for_reply=True) + self.assertEqual(self._broker.direct_count, 8) + + # both listeners should get a copy: + driver.send(fanout_target, {"context": "whatever"}, + {"method": "echo", "id": "fanout"}) + + listener1.join(timeout=30) + self.assertFalse(listener1.isAlive()) + listener2.join(timeout=30) + self.assertFalse(listener2.isAlive()) + self.assertEqual(self._broker.fanout_count, 1) + + listener1_ids = [x.message.get('id') for x in listener1.get_messages()] + listener2_ids = [x.message.get('id') for x in listener2.get_messages()] + + self.assertTrue('fanout' in listener1_ids and + 'fanout' in listener2_ids) + self.assertTrue('server1-1' in listener1_ids and + 'server1-1' not in listener2_ids) + self.assertTrue('server1-2' in listener1_ids and + 'server1-2' not in listener2_ids) + self.assertTrue('server2' in listener2_ids and + 'server2' not in listener1_ids) + if 'either-1' in listener1_ids: + self.assertTrue('either-2' in listener2_ids and + 'either-2' not in listener1_ids and + 'either-1' not in listener2_ids) + else: + self.assertTrue('either-2' in listener1_ids and + 'either-2' not in listener2_ids and + 'either-1' in listener2_ids) + driver.cleanup() + + def test_send_timeout(self): + """Verify send timeout.""" + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = messaging.Target(topic="test-topic") + listener = _ListenerThread(driver.listen(target), 1) + + # the listener will drop this message: + try: + driver.send(target, + {"context": "whatever"}, + {"method": "drop"}, + wait_for_reply=True, + timeout=1.0) + except Exception as ex: + self.assertIsInstance(ex, messaging.MessagingTimeout, ex) + else: + self.assertTrue(False, "No Exception raised!") + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + driver.cleanup() + + +class TestAmqpNotification(_AmqpBrokerTestCase): + """Test sending and receiving notifications.""" + + def test_notification(self): + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + notifications = [(messaging.Target(topic="topic-1"), 'info'), + (messaging.Target(topic="topic-1"), 'error'), + (messaging.Target(topic="topic-2"), 'debug')] + nl = driver.listen_for_notifications(notifications) + + listener = _ListenerThread(nl, 3) + targets = ['topic-1.info', + 'topic-1.bad', # should be dropped + 'bad-topic.debug', # should be dropped + 'topic-1.error', 'topic-2.debug'] + + for t in targets: + driver.send_notification(messaging.Target(topic=t), + "context", {'target': t}, + 1.0) + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + topics = [x.message.get('target') for x in listener.get_messages()] + self.assertTrue('topic-1.info' in topics) + self.assertTrue('topic-1.error' in topics) + self.assertTrue('topic-2.debug' in topics) + self.assertEqual(self._broker.dropped_count, 2) + driver.cleanup() + + +@testtools.skipUnless(pyngus, "proton modules not present") +class TestFailover(test_utils.BaseTestCase): + + def setUp(self): + super(TestFailover, self).setUp() + LOG.info("Starting Failover Test") + self._brokers = [FakeBroker(), FakeBroker()] + hosts = [] + for broker in self._brokers: + hosts.append(messaging.TransportHost(hostname=broker.host, + port=broker.port)) + self._broker_url = messaging.TransportURL(self.conf, + transport="amqp", + hosts=hosts) + + def tearDown(self): + super(TestFailover, self).tearDown() + for broker in self._brokers: + if broker.isAlive(): + broker.stop() + + def test_broker_failover(self): + """Simulate failover of one broker to another.""" + self._brokers[0].start() + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + + target = messaging.Target(topic="my-topic") + listener = _ListenerThread(driver.listen(target), 2) + + rc = driver.send(target, {"context": "whatever"}, + {"method": "echo", "id": "echo-1"}, + wait_for_reply=True, + timeout=30) + self.assertIsNotNone(rc) + self.assertEqual(rc.get('correlation-id'), 'echo-1') + # 1 request msg, 1 response: + self.assertEqual(self._brokers[0].topic_count, 1) + self.assertEqual(self._brokers[0].direct_count, 1) + + # fail broker 0 and start broker 1: + self._brokers[0].stop() + self._brokers[1].start() + deadline = time.time() + 30 + responded = False + sequence = 2 + while deadline > time.time() and not responded: + if not listener.isAlive(): + # listener may have exited after replying to an old correlation + # id: restart new listener + listener = _ListenerThread(driver.listen(target), 1) + try: + rc = driver.send(target, {"context": "whatever"}, + {"method": "echo", + "id": "echo-%d" % sequence}, + wait_for_reply=True, + timeout=2) + self.assertIsNotNone(rc) + self.assertEqual(rc.get('correlation-id'), + 'echo-%d' % sequence) + responded = True + except messaging.MessagingTimeout: + sequence += 1 + + self.assertTrue(responded) + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + + # note: stopping the broker first tests cleaning up driver without a + # connection active + self._brokers[1].stop() + driver.cleanup() + + +class FakeBroker(threading.Thread): + """A test AMQP message 'broker'.""" + + if pyngus: + class Connection(pyngus.ConnectionEventHandler): + """A single AMQP connection.""" + + def __init__(self, server, socket_, name): + """Create a Connection using socket_.""" + self.socket = socket_ + self.name = name + self.server = server + self.connection = server.container.create_connection(name, + self) + self.connection.user_context = self + self.connection.pn_sasl.mechanisms("ANONYMOUS") + self.connection.pn_sasl.server() + self.connection.open() + self.sender_links = set() + self.closed = False + + def destroy(self): + """Destroy the test connection.""" + while self.sender_links: + link = self.sender_links.pop() + link.destroy() + self.connection.destroy() + self.connection = None + self.socket.close() + + def fileno(self): + """Allows use of this in a select() call.""" + return self.socket.fileno() + + def process_input(self): + """Called when socket is read-ready.""" + try: + pyngus.read_socket_input(self.connection, self.socket) + except socket.error: + pass + self.connection.process(time.time()) + + def send_output(self): + """Called when socket is write-ready.""" + try: + pyngus.write_socket_output(self.connection, + self.socket) + except socket.error: + pass + self.connection.process(time.time()) + + # Pyngus ConnectionEventHandler callbacks: + + def connection_remote_closed(self, connection, reason): + """Peer has closed the connection.""" + self.connection.close() + + def connection_closed(self, connection): + """Connection close completed.""" + self.closed = True # main loop will destroy + + def connection_failed(self, connection, error): + """Connection failure detected.""" + self.connection_closed(connection) + + def sender_requested(self, connection, link_handle, + name, requested_source, properties): + """Create a new message source.""" + addr = requested_source or "source-" + uuid.uuid4().hex + link = FakeBroker.SenderLink(self.server, self, + link_handle, addr) + self.sender_links.add(link) + + def receiver_requested(self, connection, link_handle, + name, requested_target, properties): + """Create a new message consumer.""" + addr = requested_target or "target-" + uuid.uuid4().hex + FakeBroker.ReceiverLink(self.server, self, + link_handle, addr) + + def sasl_step(self, connection, pn_sasl): + pn_sasl.done(pn_sasl.OK) # always permit + + class SenderLink(pyngus.SenderEventHandler): + """An AMQP sending link.""" + def __init__(self, server, conn, handle, src_addr=None): + self.server = server + cnn = conn.connection + self.link = cnn.accept_sender(handle, + source_override=src_addr, + event_handler=self) + self.link.open() + self.routed = False + + def destroy(self): + """Destroy the link.""" + self._cleanup() + if self.link: + self.link.destroy() + self.link = None + + def send_message(self, message): + """Send a message over this link.""" + self.link.send(message) + + def _cleanup(self): + if self.routed: + self.server.remove_route(self.link.source_address, + self) + self.routed = False + + # Pyngus SenderEventHandler callbacks: + + def sender_active(self, sender_link): + self.server.add_route(self.link.source_address, self) + self.routed = True + + def sender_remote_closed(self, sender_link, error): + self._cleanup() + self.link.close() + + def sender_closed(self, sender_link): + self.destroy() + + class ReceiverLink(pyngus.ReceiverEventHandler): + """An AMQP Receiving link.""" + def __init__(self, server, conn, handle, addr=None): + self.server = server + cnn = conn.connection + self.link = cnn.accept_receiver(handle, + target_override=addr, + event_handler=self) + self.link.open() + self.link.add_capacity(10) + + # ReceiverEventHandler callbacks: + + def receiver_remote_closed(self, receiver_link, error): + self.link.close() + + def receiver_closed(self, receiver_link): + self.link.destroy() + self.link = None + + def message_received(self, receiver_link, message, handle): + """Forward this message out the proper sending link.""" + if self.server.forward_message(message): + self.link.message_accepted(handle) + else: + self.link.message_rejected(handle) + + if self.link.capacity < 1: + self.link.add_capacity(10) + + def __init__(self, server_prefix="exclusive", + broadcast_prefix="broadcast", + group_prefix="unicast", + address_separator=".", + sock_addr="", sock_port=0): + """Create a fake broker listening on sock_addr:sock_port.""" + if not pyngus: + raise AssertionError("pyngus module not present") + threading.Thread.__init__(self) + self._server_prefix = server_prefix + address_separator + self._broadcast_prefix = broadcast_prefix + address_separator + self._group_prefix = group_prefix + address_separator + self._address_separator = address_separator + self._wakeup_pipe = os.pipe() + self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._my_socket.bind((sock_addr, sock_port)) + self.host, self.port = self._my_socket.getsockname() + self.container = pyngus.Container("test_server_%s:%d" + % (self.host, self.port)) + self._connections = {} + self._sources = {} + # count of messages forwarded, by messaging pattern + self.direct_count = 0 + self.topic_count = 0 + self.fanout_count = 0 + self.dropped_count = 0 + + def start(self): + """Start the server.""" + LOG.info("Starting Test Broker on %s:%d", self.host, self.port) + self._shutdown = False + self.daemon = True + self._my_socket.listen(10) + super(FakeBroker, self).start() + + def stop(self): + """Shutdown the server.""" + LOG.info("Stopping test Broker %s:%d", self.host, self.port) + self._shutdown = True + os.write(self._wakeup_pipe[1], "!") + self.join() + LOG.info("Test Broker %s:%d stopped", self.host, self.port) + + def run(self): + """Process I/O and timer events until the broker is stopped.""" + LOG.info("Test Broker on %s:%d started", self.host, self.port) + while not self._shutdown: + readers, writers, timers = self.container.need_processing() + + # map pyngus Connections back to _TestConnections: + readfd = [c.user_context for c in readers] + readfd.extend([self._my_socket, self._wakeup_pipe[0]]) + writefd = [c.user_context for c in writers] + + timeout = None + if timers: + # [0] == next expiring timer + deadline = timers[0].next_tick + now = time.time() + timeout = 0 if deadline <= now else deadline - now + + readable, writable, ignore = select.select(readfd, + writefd, + [], + timeout) + worked = set() + for r in readable: + if r is self._my_socket: + # new inbound connection request received, + # create a new Connection for it: + client_socket, client_address = self._my_socket.accept() + name = str(client_address) + conn = FakeBroker.Connection(self, client_socket, name) + self._connections[conn.name] = conn + elif r is self._wakeup_pipe[0]: + os.read(self._wakeup_pipe[0], 512) + else: + r.process_input() + worked.add(r) + + for t in timers: + now = time.time() + if t.next_tick > now: + break + t.process(now) + conn = t.user_context + worked.add(conn) + + for w in writable: + w.send_output() + worked.add(w) + + # clean up any closed connections: + while worked: + conn = worked.pop() + if conn.closed: + del self._connections[conn.name] + conn.destroy() + + # Shutting down + self._my_socket.close() + for conn in self._connections.itervalues(): + conn.destroy() + return 0 + + def add_route(self, address, link): + # route from address -> link[, link ...] + if address not in self._sources: + self._sources[address] = [link] + elif link not in self._sources[address]: + self._sources[address].append(link) + + def remove_route(self, address, link): + if address in self._sources: + if link in self._sources[address]: + self._sources[address].remove(link) + if not self._sources[address]: + del self._sources[address] + + def forward_message(self, message): + # returns True if message was routed + dest = message.address + if dest not in self._sources: + self.dropped_count += 1 + return False + LOG.debug("Forwarding [%s]", dest) + # route "behavior" determined by prefix: + if dest.startswith(self._broadcast_prefix): + self.fanout_count += 1 + for link in self._sources[dest]: + LOG.debug("Broadcast to %s", dest) + link.send_message(message) + elif dest.startswith(self._group_prefix): + # round-robin: + self.topic_count += 1 + link = self._sources[dest].pop(0) + link.send_message(message) + LOG.debug("Send to %s", dest) + self._sources[dest].append(link) + else: + # unicast: + self.direct_count += 1 + LOG.debug("Unicast to %s", dest) + self._sources[dest][0].send_message(message) + return True diff --git a/tox.ini b/tox.ini index 4146e7df8..f53d7ba35 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,11 @@ commands = python setup.py build_sphinx deps = -r{toxinidir}/requirements-py3.txt -r{toxinidir}/test-requirements-py3.txt +[testenv:amqp1] +# test AMQP 1.0 driver, requires QPID Proton developer packages +deps = -r{toxinidir}/amqp1-requirements.txt + {[testenv]deps} + [flake8] show-source = True ignore = H237,H402,H405,H904