make it possible to import amqp driver without dependencies

All plugins are supposed to be importable without their dependencies so
we can discover options and documentation. Restructure the parts of the
AMQP1 driver that depend on having proton and pyngus installed so the
driver can load without them.

Change-Id: Id0c8c2a6ae44d13f061e651c33efc9e38750a049
This commit is contained in:
Doug Hellmann 2015-05-16 14:21:32 -04:00
parent 7ded4a972f
commit 1da2231116
2 changed files with 102 additions and 74 deletions

View File

@ -26,86 +26,24 @@ import threading
import time
from oslo_serialization import jsonutils
import proton
from oslo_utils import importutils
from six import moves
import oslo_messaging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
from oslo_messaging._drivers.protocols.amqp import controller
from oslo_messaging import target as messaging_target
proton = importutils.try_import('proton')
controller = importutils.try_import(
'oslo_messaging._drivers.protocols.amqp.controller'
)
drivertasks = importutils.try_import(
'oslo_messaging._drivers.protocols.amqp.drivertasks'
)
LOG = logging.getLogger(__name__)
class SendTask(controller.Task):
"""A task that sends a message to a target, and optionally allows for the
calling thread to wait for a reply.
"""
def __init__(self, target, request, reply_expected, deadline):
super(SendTask, self).__init__()
self._target = target
self._request = request
self._deadline = deadline
if reply_expected:
self._reply_queue = moves.queue.Queue()
else:
self._reply_queue = None
def execute(self, controller):
"""Runs on eventloop thread - sends request."""
if not self._deadline or self._deadline > time.time():
controller.request(self._target, self._request, self._reply_queue)
else:
LOG.warn("Send request to %s aborted: TTL expired.", self._target)
def get_reply(self, timeout):
"""Retrieve the reply."""
if not self._reply_queue:
return None
try:
return self._reply_queue.get(timeout=timeout)
except moves.queue.Empty:
raise oslo_messaging.MessagingTimeout(
'Timed out waiting for a reply')
class ListenTask(controller.Task):
"""A task that creates a subscription to the given target. Messages
arriving from the target are given to the listener.
"""
def __init__(self, target, listener, notifications=False):
"""Create a subscription to the target."""
super(ListenTask, self).__init__()
self._target = target
self._listener = listener
self._notifications = notifications
def execute(self, controller):
"""Run on the eventloop thread - subscribes to target. Inbound messages
are queued to the listener's incoming queue.
"""
if self._notifications:
controller.subscribe_notifications(self._target,
self._listener.incoming)
else:
controller.subscribe(self._target, self._listener.incoming)
class ReplyTask(controller.Task):
"""A task that sends 'response' message to address."""
def __init__(self, address, response, log_failure):
super(ReplyTask, self).__init__()
self._address = address
self._response = response
self._log_failure = log_failure
def execute(self, controller):
"""Run on the eventloop thread - send the response message."""
controller.response(self._address, self._response)
def marshal_response(reply=None, failure=None):
# TODO(grs): do replies have a context?
msg = proton.Message()
@ -158,7 +96,7 @@ class ProtonIncomingMessage(base.IncomingMessage):
response = marshal_response(reply=reply, failure=failure)
response.correlation_id = self._correlation_id
LOG.debug("Replying to %s", self._correlation_id)
task = ReplyTask(self._reply_to, response, log_failure)
task = drivertasks.ReplyTask(self._reply_to, response, log_failure)
self.listener.driver._ctrl.add_task(task)
else:
LOG.debug("Ignoring reply as no reply address available")
@ -245,7 +183,7 @@ class ProtonDriver(base.BaseDriver):
request.ttl = int(timeout * 1000)
request.expiry_time = int(expire * 1000)
LOG.debug("Send to %s", target)
task = SendTask(target, request, wait_for_reply, expire)
task = drivertasks.SendTask(target, request, wait_for_reply, expire)
self._ctrl.add_task(task)
result = None
if wait_for_reply:
@ -273,7 +211,7 @@ class ProtonDriver(base.BaseDriver):
"""Construct a Listener for the given target."""
LOG.debug("Listen to %s", target)
listener = ProtonListener(self)
self._ctrl.add_task(ListenTask(target, listener))
self._ctrl.add_task(drivertasks.ListenTask(target, listener))
return listener
@_ensure_connect_called
@ -286,7 +224,7 @@ class ProtonDriver(base.BaseDriver):
for target, priority in targets_and_priorities:
topic = '%s.%s' % (target.topic, priority)
t = messaging_target.Target(topic=topic)
self._ctrl.add_task(ListenTask(t, listener, True))
self._ctrl.add_task(drivertasks.ListenTask(t, listener, True))
return listener
def cleanup(self):

View File

@ -0,0 +1,90 @@
# Copyright 2014, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import oslo_messaging
from oslo_messaging._drivers.protocols.amqp import controller
from six import moves
LOG = logging.getLogger(__name__)
class SendTask(controller.Task):
"""A task that sends a message to a target, and optionally allows for the
calling thread to wait for a reply.
"""
def __init__(self, target, request, reply_expected, deadline):
super(SendTask, self).__init__()
self._target = target
self._request = request
self._deadline = deadline
if reply_expected:
self._reply_queue = moves.queue.Queue()
else:
self._reply_queue = None
def execute(self, controller):
"""Runs on eventloop thread - sends request."""
if not self._deadline or self._deadline > time.time():
controller.request(self._target, self._request, self._reply_queue)
else:
LOG.warn("Send request to %s aborted: TTL expired.", self._target)
def get_reply(self, timeout):
"""Retrieve the reply."""
if not self._reply_queue:
return None
try:
return self._reply_queue.get(timeout=timeout)
except moves.queue.Empty:
raise oslo_messaging.MessagingTimeout(
'Timed out waiting for a reply')
class ListenTask(controller.Task):
"""A task that creates a subscription to the given target. Messages
arriving from the target are given to the listener.
"""
def __init__(self, target, listener, notifications=False):
"""Create a subscription to the target."""
super(ListenTask, self).__init__()
self._target = target
self._listener = listener
self._notifications = notifications
def execute(self, controller):
"""Run on the eventloop thread - subscribes to target. Inbound messages
are queued to the listener's incoming queue.
"""
if self._notifications:
controller.subscribe_notifications(self._target,
self._listener.incoming)
else:
controller.subscribe(self._target, self._listener.incoming)
class ReplyTask(controller.Task):
"""A task that sends 'response' message to address."""
def __init__(self, address, response, log_failure):
super(ReplyTask, self).__init__()
self._address = address
self._response = response
self._log_failure = log_failure
def execute(self, controller):
"""Run on the eventloop thread - send the response message."""
controller.response(self._address, self._response)