diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index 03dacc35a..cc2e1be8e 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -26,86 +26,24 @@ import threading import time from oslo_serialization import jsonutils -import proton +from oslo_utils import importutils from six import moves -import oslo_messaging from oslo_messaging._drivers import base from oslo_messaging._drivers import common -from oslo_messaging._drivers.protocols.amqp import controller from oslo_messaging import target as messaging_target +proton = importutils.try_import('proton') +controller = importutils.try_import( + 'oslo_messaging._drivers.protocols.amqp.controller' +) +drivertasks = importutils.try_import( + 'oslo_messaging._drivers.protocols.amqp.drivertasks' +) LOG = logging.getLogger(__name__) -class SendTask(controller.Task): - """A task that sends a message to a target, and optionally allows for the - calling thread to wait for a reply. - """ - def __init__(self, target, request, reply_expected, deadline): - super(SendTask, self).__init__() - self._target = target - self._request = request - self._deadline = deadline - if reply_expected: - self._reply_queue = moves.queue.Queue() - else: - self._reply_queue = None - - def execute(self, controller): - """Runs on eventloop thread - sends request.""" - if not self._deadline or self._deadline > time.time(): - controller.request(self._target, self._request, self._reply_queue) - else: - LOG.warn("Send request to %s aborted: TTL expired.", self._target) - - def get_reply(self, timeout): - """Retrieve the reply.""" - if not self._reply_queue: - return None - try: - return self._reply_queue.get(timeout=timeout) - except moves.queue.Empty: - raise oslo_messaging.MessagingTimeout( - 'Timed out waiting for a reply') - - -class ListenTask(controller.Task): - """A task that creates a subscription to the given target. Messages - arriving from the target are given to the listener. - """ - def __init__(self, target, listener, notifications=False): - """Create a subscription to the target.""" - super(ListenTask, self).__init__() - self._target = target - self._listener = listener - self._notifications = notifications - - def execute(self, controller): - """Run on the eventloop thread - subscribes to target. Inbound messages - are queued to the listener's incoming queue. - """ - if self._notifications: - controller.subscribe_notifications(self._target, - self._listener.incoming) - else: - controller.subscribe(self._target, self._listener.incoming) - - -class ReplyTask(controller.Task): - """A task that sends 'response' message to address.""" - def __init__(self, address, response, log_failure): - super(ReplyTask, self).__init__() - self._address = address - self._response = response - self._log_failure = log_failure - - def execute(self, controller): - """Run on the eventloop thread - send the response message.""" - controller.response(self._address, self._response) - - def marshal_response(reply=None, failure=None): # TODO(grs): do replies have a context? msg = proton.Message() @@ -158,7 +96,7 @@ class ProtonIncomingMessage(base.IncomingMessage): response = marshal_response(reply=reply, failure=failure) response.correlation_id = self._correlation_id LOG.debug("Replying to %s", self._correlation_id) - task = ReplyTask(self._reply_to, response, log_failure) + task = drivertasks.ReplyTask(self._reply_to, response, log_failure) self.listener.driver._ctrl.add_task(task) else: LOG.debug("Ignoring reply as no reply address available") @@ -245,7 +183,7 @@ class ProtonDriver(base.BaseDriver): request.ttl = int(timeout * 1000) request.expiry_time = int(expire * 1000) LOG.debug("Send to %s", target) - task = SendTask(target, request, wait_for_reply, expire) + task = drivertasks.SendTask(target, request, wait_for_reply, expire) self._ctrl.add_task(task) result = None if wait_for_reply: @@ -273,7 +211,7 @@ class ProtonDriver(base.BaseDriver): """Construct a Listener for the given target.""" LOG.debug("Listen to %s", target) listener = ProtonListener(self) - self._ctrl.add_task(ListenTask(target, listener)) + self._ctrl.add_task(drivertasks.ListenTask(target, listener)) return listener @_ensure_connect_called @@ -286,7 +224,7 @@ class ProtonDriver(base.BaseDriver): for target, priority in targets_and_priorities: topic = '%s.%s' % (target.topic, priority) t = messaging_target.Target(topic=topic) - self._ctrl.add_task(ListenTask(t, listener, True)) + self._ctrl.add_task(drivertasks.ListenTask(t, listener, True)) return listener def cleanup(self): diff --git a/oslo_messaging/_drivers/protocols/amqp/drivertasks.py b/oslo_messaging/_drivers/protocols/amqp/drivertasks.py new file mode 100644 index 000000000..63a32292e --- /dev/null +++ b/oslo_messaging/_drivers/protocols/amqp/drivertasks.py @@ -0,0 +1,90 @@ +# Copyright 2014, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import time + +import oslo_messaging +from oslo_messaging._drivers.protocols.amqp import controller + +from six import moves + +LOG = logging.getLogger(__name__) + + +class SendTask(controller.Task): + """A task that sends a message to a target, and optionally allows for the + calling thread to wait for a reply. + """ + def __init__(self, target, request, reply_expected, deadline): + super(SendTask, self).__init__() + self._target = target + self._request = request + self._deadline = deadline + if reply_expected: + self._reply_queue = moves.queue.Queue() + else: + self._reply_queue = None + + def execute(self, controller): + """Runs on eventloop thread - sends request.""" + if not self._deadline or self._deadline > time.time(): + controller.request(self._target, self._request, self._reply_queue) + else: + LOG.warn("Send request to %s aborted: TTL expired.", self._target) + + def get_reply(self, timeout): + """Retrieve the reply.""" + if not self._reply_queue: + return None + try: + return self._reply_queue.get(timeout=timeout) + except moves.queue.Empty: + raise oslo_messaging.MessagingTimeout( + 'Timed out waiting for a reply') + + +class ListenTask(controller.Task): + """A task that creates a subscription to the given target. Messages + arriving from the target are given to the listener. + """ + def __init__(self, target, listener, notifications=False): + """Create a subscription to the target.""" + super(ListenTask, self).__init__() + self._target = target + self._listener = listener + self._notifications = notifications + + def execute(self, controller): + """Run on the eventloop thread - subscribes to target. Inbound messages + are queued to the listener's incoming queue. + """ + if self._notifications: + controller.subscribe_notifications(self._target, + self._listener.incoming) + else: + controller.subscribe(self._target, self._listener.incoming) + + +class ReplyTask(controller.Task): + """A task that sends 'response' message to address.""" + def __init__(self, address, response, log_failure): + super(ReplyTask, self).__init__() + self._address = address + self._response = response + self._log_failure = log_failure + + def execute(self, controller): + """Run on the eventloop thread - send the response message.""" + controller.response(self._address, self._response)