Merge "make it possible to import amqp driver without dependencies"

This commit is contained in:
Jenkins 2015-05-23 01:08:42 +00:00 committed by Gerrit Code Review
commit 29c573b3f2
2 changed files with 102 additions and 74 deletions

View File

@ -26,86 +26,24 @@ import threading
import time import time
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
import proton from oslo_utils import importutils
from six import moves from six import moves
import oslo_messaging
from oslo_messaging._drivers import base from oslo_messaging._drivers import base
from oslo_messaging._drivers import common from oslo_messaging._drivers import common
from oslo_messaging._drivers.protocols.amqp import controller
from oslo_messaging import target as messaging_target from oslo_messaging import target as messaging_target
proton = importutils.try_import('proton')
controller = importutils.try_import(
'oslo_messaging._drivers.protocols.amqp.controller'
)
drivertasks = importutils.try_import(
'oslo_messaging._drivers.protocols.amqp.drivertasks'
)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class SendTask(controller.Task):
"""A task that sends a message to a target, and optionally allows for the
calling thread to wait for a reply.
"""
def __init__(self, target, request, reply_expected, deadline):
super(SendTask, self).__init__()
self._target = target
self._request = request
self._deadline = deadline
if reply_expected:
self._reply_queue = moves.queue.Queue()
else:
self._reply_queue = None
def execute(self, controller):
"""Runs on eventloop thread - sends request."""
if not self._deadline or self._deadline > time.time():
controller.request(self._target, self._request, self._reply_queue)
else:
LOG.warn("Send request to %s aborted: TTL expired.", self._target)
def get_reply(self, timeout):
"""Retrieve the reply."""
if not self._reply_queue:
return None
try:
return self._reply_queue.get(timeout=timeout)
except moves.queue.Empty:
raise oslo_messaging.MessagingTimeout(
'Timed out waiting for a reply')
class ListenTask(controller.Task):
"""A task that creates a subscription to the given target. Messages
arriving from the target are given to the listener.
"""
def __init__(self, target, listener, notifications=False):
"""Create a subscription to the target."""
super(ListenTask, self).__init__()
self._target = target
self._listener = listener
self._notifications = notifications
def execute(self, controller):
"""Run on the eventloop thread - subscribes to target. Inbound messages
are queued to the listener's incoming queue.
"""
if self._notifications:
controller.subscribe_notifications(self._target,
self._listener.incoming)
else:
controller.subscribe(self._target, self._listener.incoming)
class ReplyTask(controller.Task):
"""A task that sends 'response' message to address."""
def __init__(self, address, response, log_failure):
super(ReplyTask, self).__init__()
self._address = address
self._response = response
self._log_failure = log_failure
def execute(self, controller):
"""Run on the eventloop thread - send the response message."""
controller.response(self._address, self._response)
def marshal_response(reply=None, failure=None): def marshal_response(reply=None, failure=None):
# TODO(grs): do replies have a context? # TODO(grs): do replies have a context?
msg = proton.Message() msg = proton.Message()
@ -158,7 +96,7 @@ class ProtonIncomingMessage(base.IncomingMessage):
response = marshal_response(reply=reply, failure=failure) response = marshal_response(reply=reply, failure=failure)
response.correlation_id = self._correlation_id response.correlation_id = self._correlation_id
LOG.debug("Replying to %s", self._correlation_id) LOG.debug("Replying to %s", self._correlation_id)
task = ReplyTask(self._reply_to, response, log_failure) task = drivertasks.ReplyTask(self._reply_to, response, log_failure)
self.listener.driver._ctrl.add_task(task) self.listener.driver._ctrl.add_task(task)
else: else:
LOG.debug("Ignoring reply as no reply address available") LOG.debug("Ignoring reply as no reply address available")
@ -245,7 +183,7 @@ class ProtonDriver(base.BaseDriver):
request.ttl = int(timeout * 1000) request.ttl = int(timeout * 1000)
request.expiry_time = int(expire * 1000) request.expiry_time = int(expire * 1000)
LOG.debug("Send to %s", target) LOG.debug("Send to %s", target)
task = SendTask(target, request, wait_for_reply, expire) task = drivertasks.SendTask(target, request, wait_for_reply, expire)
self._ctrl.add_task(task) self._ctrl.add_task(task)
result = None result = None
if wait_for_reply: if wait_for_reply:
@ -273,7 +211,7 @@ class ProtonDriver(base.BaseDriver):
"""Construct a Listener for the given target.""" """Construct a Listener for the given target."""
LOG.debug("Listen to %s", target) LOG.debug("Listen to %s", target)
listener = ProtonListener(self) listener = ProtonListener(self)
self._ctrl.add_task(ListenTask(target, listener)) self._ctrl.add_task(drivertasks.ListenTask(target, listener))
return listener return listener
@_ensure_connect_called @_ensure_connect_called
@ -286,7 +224,7 @@ class ProtonDriver(base.BaseDriver):
for target, priority in targets_and_priorities: for target, priority in targets_and_priorities:
topic = '%s.%s' % (target.topic, priority) topic = '%s.%s' % (target.topic, priority)
t = messaging_target.Target(topic=topic) t = messaging_target.Target(topic=topic)
self._ctrl.add_task(ListenTask(t, listener, True)) self._ctrl.add_task(drivertasks.ListenTask(t, listener, True))
return listener return listener
def cleanup(self): def cleanup(self):

View File

@ -0,0 +1,90 @@
# Copyright 2014, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import oslo_messaging
from oslo_messaging._drivers.protocols.amqp import controller
from six import moves
LOG = logging.getLogger(__name__)
class SendTask(controller.Task):
"""A task that sends a message to a target, and optionally allows for the
calling thread to wait for a reply.
"""
def __init__(self, target, request, reply_expected, deadline):
super(SendTask, self).__init__()
self._target = target
self._request = request
self._deadline = deadline
if reply_expected:
self._reply_queue = moves.queue.Queue()
else:
self._reply_queue = None
def execute(self, controller):
"""Runs on eventloop thread - sends request."""
if not self._deadline or self._deadline > time.time():
controller.request(self._target, self._request, self._reply_queue)
else:
LOG.warn("Send request to %s aborted: TTL expired.", self._target)
def get_reply(self, timeout):
"""Retrieve the reply."""
if not self._reply_queue:
return None
try:
return self._reply_queue.get(timeout=timeout)
except moves.queue.Empty:
raise oslo_messaging.MessagingTimeout(
'Timed out waiting for a reply')
class ListenTask(controller.Task):
"""A task that creates a subscription to the given target. Messages
arriving from the target are given to the listener.
"""
def __init__(self, target, listener, notifications=False):
"""Create a subscription to the target."""
super(ListenTask, self).__init__()
self._target = target
self._listener = listener
self._notifications = notifications
def execute(self, controller):
"""Run on the eventloop thread - subscribes to target. Inbound messages
are queued to the listener's incoming queue.
"""
if self._notifications:
controller.subscribe_notifications(self._target,
self._listener.incoming)
else:
controller.subscribe(self._target, self._listener.incoming)
class ReplyTask(controller.Task):
"""A task that sends 'response' message to address."""
def __init__(self, address, response, log_failure):
super(ReplyTask, self).__init__()
self._address = address
self._response = response
self._log_failure = log_failure
def execute(self, controller):
"""Run on the eventloop thread - send the response message."""
controller.response(self._address, self._response)