Remove eventlet related code in amqp driver
Because driver should rely on executor and not directly on eventlet, delete eventlet related code. This also drop the old driver API. This is the amqp part. Change-Id: Ic6060058dafa4dabbc5e8c68bf231c818a7fec25
This commit is contained in:
parent
33202134bd
commit
e57a15deb8
oslo/messaging/_drivers
@ -24,19 +24,14 @@ uses AMQP, but is deprecated and predates this code.
|
||||
"""
|
||||
|
||||
import collections
|
||||
import inspect
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from eventlet import greenpool
|
||||
from eventlet import queue
|
||||
from oslo.config import cfg
|
||||
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging._drivers import pool
|
||||
from oslo.messaging.openstack.common import excutils
|
||||
|
||||
# FIXME(markmc): remove this
|
||||
_ = lambda s: s
|
||||
@ -338,275 +333,5 @@ def _add_unique_id(msg):
|
||||
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
|
||||
|
||||
|
||||
class _ThreadPoolWithWait(object):
|
||||
"""Base class for a delayed invocation manager.
|
||||
|
||||
Used by the Connection class to start up green threads
|
||||
to handle incoming messages.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, connection_pool):
|
||||
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
|
||||
self.connection_pool = connection_pool
|
||||
self.conf = conf
|
||||
|
||||
def wait(self):
|
||||
"""Wait for all callback threads to exit."""
|
||||
self.pool.waitall()
|
||||
|
||||
|
||||
class CallbackWrapper(_ThreadPoolWithWait):
|
||||
"""Wraps a straight callback.
|
||||
|
||||
Allows it to be invoked in a green thread.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, callback, connection_pool):
|
||||
"""Initiates CallbackWrapper object.
|
||||
|
||||
:param conf: cfg.CONF instance
|
||||
:param callback: a callable (probably a function)
|
||||
:param connection_pool: connection pool as returned by
|
||||
get_connection_pool()
|
||||
"""
|
||||
super(CallbackWrapper, self).__init__(
|
||||
conf=conf,
|
||||
connection_pool=connection_pool,
|
||||
)
|
||||
self.callback = callback
|
||||
|
||||
def __call__(self, message_data):
|
||||
self.pool.spawn_n(self.callback, message_data)
|
||||
|
||||
|
||||
class ProxyCallback(_ThreadPoolWithWait):
|
||||
"""Calls methods on a proxy object based on method and args."""
|
||||
|
||||
def __init__(self, conf, proxy, connection_pool):
|
||||
super(ProxyCallback, self).__init__(
|
||||
conf=conf,
|
||||
connection_pool=connection_pool,
|
||||
)
|
||||
self.proxy = proxy
|
||||
self.msg_id_cache = _MsgIdCache()
|
||||
|
||||
def __call__(self, message_data):
|
||||
"""Consumer callback to call a method on a proxy object.
|
||||
|
||||
Parses the message for validity and fires off a thread to call the
|
||||
proxy object method.
|
||||
|
||||
Message data should be a dictionary with two keys:
|
||||
method: string representing the method to call
|
||||
args: dictionary of arg: value
|
||||
|
||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
||||
|
||||
"""
|
||||
# It is important to clear the context here, because at this point
|
||||
# the previous context is stored in local.store.context
|
||||
#if hasattr(local.store, 'context'):
|
||||
# del local.store.context
|
||||
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
||||
self.msg_id_cache.check_duplicate_message(message_data)
|
||||
ctxt = unpack_context(self.conf, message_data)
|
||||
method = message_data.get('method')
|
||||
args = message_data.get('args', {})
|
||||
version = message_data.get('version')
|
||||
namespace = message_data.get('namespace')
|
||||
if not method:
|
||||
LOG.warn(_('no method for message: %s') % message_data)
|
||||
ctxt.reply(_('No method for message: %s') % message_data,
|
||||
connection_pool=self.connection_pool)
|
||||
return
|
||||
self.pool.spawn_n(self._process_data, ctxt, version, method,
|
||||
namespace, args)
|
||||
|
||||
def _process_data(self, ctxt, version, method, namespace, args):
|
||||
"""Process a message in a new thread.
|
||||
|
||||
If the proxy object we have has a dispatch method
|
||||
(see rpc.dispatcher.RpcDispatcher), pass it the version,
|
||||
method, and args and let it dispatch as appropriate. If not, use
|
||||
the old behavior of magically calling the specified method on the
|
||||
proxy we have here.
|
||||
"""
|
||||
ctxt.update_store()
|
||||
try:
|
||||
rval = self.proxy.dispatch(ctxt, version, method, namespace,
|
||||
**args)
|
||||
# Check if the result was a generator
|
||||
if inspect.isgenerator(rval):
|
||||
for x in rval:
|
||||
ctxt.reply(x, None, connection_pool=self.connection_pool)
|
||||
else:
|
||||
ctxt.reply(rval, None, connection_pool=self.connection_pool)
|
||||
# This final None tells multicall that it is done.
|
||||
ctxt.reply(ending=True, connection_pool=self.connection_pool)
|
||||
except rpc_common.ClientException as e:
|
||||
LOG.debug(_('Expected exception during message handling (%s)') %
|
||||
e._exc_info[1])
|
||||
ctxt.reply(None, e._exc_info,
|
||||
connection_pool=self.connection_pool,
|
||||
log_failure=False)
|
||||
except Exception:
|
||||
# sys.exc_info() is deleted by LOG.exception().
|
||||
exc_info = sys.exc_info()
|
||||
LOG.error(_('Exception during message handling'),
|
||||
exc_info=exc_info)
|
||||
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
|
||||
|
||||
|
||||
class MulticallProxyWaiter(object):
|
||||
def __init__(self, conf, msg_id, timeout, connection_pool):
|
||||
self._msg_id = msg_id
|
||||
self._timeout = timeout or conf.rpc_response_timeout
|
||||
self._reply_proxy = connection_pool.reply_proxy
|
||||
self._done = False
|
||||
self._got_ending = False
|
||||
self._conf = conf
|
||||
self._dataqueue = queue.LightQueue()
|
||||
# Add this caller to the reply proxy's call_waiters
|
||||
self._reply_proxy.add_call_waiter(self, self._msg_id)
|
||||
self.msg_id_cache = _MsgIdCache()
|
||||
|
||||
def put(self, data):
|
||||
self._dataqueue.put(data)
|
||||
|
||||
def done(self):
|
||||
if self._done:
|
||||
return
|
||||
self._done = True
|
||||
# Remove this caller from reply proxy's call_waiters
|
||||
self._reply_proxy.del_call_waiter(self._msg_id)
|
||||
|
||||
def _process_data(self, data):
|
||||
result = None
|
||||
self.msg_id_cache.check_duplicate_message(data)
|
||||
if data['failure']:
|
||||
failure = data['failure']
|
||||
result = rpc_common.deserialize_remote_exception(self._conf,
|
||||
failure)
|
||||
elif data.get('ending', False):
|
||||
self._got_ending = True
|
||||
else:
|
||||
result = data['result']
|
||||
return result
|
||||
|
||||
def __iter__(self):
|
||||
"""Return a result until we get a reply with an 'ending' flag."""
|
||||
if self._done:
|
||||
raise StopIteration
|
||||
while True:
|
||||
try:
|
||||
data = self._dataqueue.get(timeout=self._timeout)
|
||||
result = self._process_data(data)
|
||||
except queue.Empty:
|
||||
self.done()
|
||||
raise rpc_common.Timeout()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.done()
|
||||
if self._got_ending:
|
||||
self.done()
|
||||
raise StopIteration
|
||||
if isinstance(result, Exception):
|
||||
self.done()
|
||||
raise result
|
||||
yield result
|
||||
|
||||
|
||||
def create_connection(conf, new, connection_pool):
|
||||
"""Create a connection."""
|
||||
return ConnectionContext(conf, connection_pool, pooled=not new)
|
||||
|
||||
|
||||
_reply_proxy_create_sem = threading.Lock()
|
||||
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||
"""Make a call that returns multiple times."""
|
||||
LOG.debug(_('Making synchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
|
||||
with _reply_proxy_create_sem:
|
||||
if not connection_pool.reply_proxy:
|
||||
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
|
||||
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
|
||||
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
|
||||
return wait_msg
|
||||
|
||||
|
||||
def call(conf, context, topic, msg, timeout, connection_pool):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(conf, context, topic, msg, timeout, connection_pool)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
return
|
||||
return rv[-1]
|
||||
|
||||
|
||||
def cast(conf, context, topic, msg, connection_pool):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
conn.topic_send(topic, rpc_common.serialize_msg(msg))
|
||||
|
||||
|
||||
def fanout_cast(conf, context, topic, msg, connection_pool):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
|
||||
|
||||
|
||||
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
|
||||
"""Sends a message on a topic to a specific server."""
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||
server_params=server_params) as conn:
|
||||
conn.topic_send(topic, rpc_common.serialize_msg(msg))
|
||||
|
||||
|
||||
def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
||||
connection_pool):
|
||||
"""Sends a message on a fanout exchange to a specific server."""
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||
server_params=server_params) as conn:
|
||||
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, connection_pool, envelope):
|
||||
"""Sends a notification event on a topic."""
|
||||
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
|
||||
dict(event_type=msg.get('event_type'),
|
||||
topic=topic))
|
||||
_add_unique_id(msg)
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
if envelope:
|
||||
msg = rpc_common.serialize_msg(msg)
|
||||
conn.notify_send(topic, msg)
|
||||
|
||||
|
||||
def cleanup(connection_pool):
|
||||
if connection_pool:
|
||||
connection_pool.empty()
|
||||
|
||||
|
||||
def get_control_exchange(conf):
|
||||
return conf.control_exchange
|
||||
|
@ -698,124 +698,6 @@ class Connection(object):
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
self.proxy_callbacks.append(proxy_cb)
|
||||
|
||||
if fanout:
|
||||
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
|
||||
else:
|
||||
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
|
||||
|
||||
self._register_consumer(consumer)
|
||||
|
||||
return consumer
|
||||
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
self.proxy_callbacks.append(proxy_cb)
|
||||
|
||||
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
|
||||
name=pool_name)
|
||||
|
||||
self._register_consumer(consumer)
|
||||
|
||||
return consumer
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
Exactly one member of a given pool will receive each message.
|
||||
|
||||
A message will be delivered to multiple pools, if more than
|
||||
one is created.
|
||||
"""
|
||||
callback_wrapper = rpc_amqp.CallbackWrapper(
|
||||
conf=self.conf,
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
|
||||
consumer = TopicConsumer(conf=self.conf,
|
||||
session=self.session,
|
||||
topic=topic,
|
||||
callback=callback_wrapper,
|
||||
name=pool_name,
|
||||
exchange_name=exchange_name)
|
||||
|
||||
self._register_consumer(consumer)
|
||||
return consumer
|
||||
|
||||
|
||||
def create_connection(conf, new=True):
|
||||
"""Create a connection."""
|
||||
return rpc_amqp.create_connection(
|
||||
conf, new,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
return rpc_amqp.multicall(
|
||||
conf, context, topic, msg, timeout,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def call(conf, context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
return rpc_amqp.call(
|
||||
conf, context, topic, msg, timeout,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def cast(conf, context, topic, msg):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
return rpc_amqp.cast(
|
||||
conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def fanout_cast(conf, context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
return rpc_amqp.fanout_cast(
|
||||
conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def cast_to_server(conf, context, server_params, topic, msg):
|
||||
"""Sends a message on a topic to a specific server."""
|
||||
return rpc_amqp.cast_to_server(
|
||||
conf, context, server_params, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
||||
"""Sends a message on a fanout exchange to a specific server."""
|
||||
return rpc_amqp.fanout_cast_to_server(
|
||||
conf, context, server_params, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, envelope):
|
||||
"""Sends a notification event on a topic."""
|
||||
return rpc_amqp.notify(conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection),
|
||||
envelope)
|
||||
|
||||
|
||||
def cleanup():
|
||||
return rpc_amqp.cleanup(Connection.pool)
|
||||
|
||||
|
||||
class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
|
@ -741,111 +741,6 @@ class Connection(object):
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
self.proxy_callbacks.append(proxy_cb)
|
||||
|
||||
if fanout:
|
||||
self.declare_fanout_consumer(topic, proxy_cb)
|
||||
else:
|
||||
self.declare_topic_consumer(topic, proxy_cb)
|
||||
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker that calls a method in a proxy object."""
|
||||
proxy_cb = rpc_amqp.ProxyCallback(
|
||||
self.conf, proxy,
|
||||
rpc_amqp.get_connection_pool(self.conf, Connection))
|
||||
self.proxy_callbacks.append(proxy_cb)
|
||||
self.declare_topic_consumer(topic, proxy_cb, pool_name)
|
||||
|
||||
def join_consumer_pool(self, callback, pool_name, topic,
|
||||
exchange_name=None):
|
||||
"""Register as a member of a group of consumers for a given topic from
|
||||
the specified exchange.
|
||||
|
||||
Exactly one member of a given pool will receive each message.
|
||||
|
||||
A message will be delivered to multiple pools, if more than
|
||||
one is created.
|
||||
"""
|
||||
callback_wrapper = rpc_amqp.CallbackWrapper(
|
||||
conf=self.conf,
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
self.declare_topic_consumer(
|
||||
queue_name=pool_name,
|
||||
topic=topic,
|
||||
exchange_name=exchange_name,
|
||||
callback=callback_wrapper,
|
||||
)
|
||||
|
||||
|
||||
def create_connection(conf, new=True):
|
||||
"""Create a connection."""
|
||||
return rpc_amqp.create_connection(
|
||||
conf, new,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
return rpc_amqp.multicall(
|
||||
conf, context, topic, msg, timeout,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def call(conf, context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
return rpc_amqp.call(
|
||||
conf, context, topic, msg, timeout,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def cast(conf, context, topic, msg):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
return rpc_amqp.cast(
|
||||
conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def fanout_cast(conf, context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
return rpc_amqp.fanout_cast(
|
||||
conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def cast_to_server(conf, context, server_params, topic, msg):
|
||||
"""Sends a message on a topic to a specific server."""
|
||||
return rpc_amqp.cast_to_server(
|
||||
conf, context, server_params, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def fanout_cast_to_server(conf, context, server_params, topic, msg):
|
||||
"""Sends a message on a fanout exchange to a specific server."""
|
||||
return rpc_amqp.fanout_cast_to_server(
|
||||
conf, context, server_params, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection))
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, envelope):
|
||||
"""Sends a notification event on a topic."""
|
||||
return rpc_amqp.notify(
|
||||
conf, context, topic, msg,
|
||||
rpc_amqp.get_connection_pool(conf, Connection),
|
||||
envelope)
|
||||
|
||||
|
||||
def cleanup():
|
||||
return rpc_amqp.cleanup(Connection.pool)
|
||||
|
||||
|
||||
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user