Similar to doing listen() on the server side, if the driver throws an
exception when we do a cast() or call() we should translate it into
a transport-agnostic exception.
Currently, if there are no servers listening on a topic then a message
to that topic just gets dropped by the fake driver.
This makes the tests intermittently fail if the server takes longer to
start.
Turn things on their head so that the client always creates the queues
on the exchange so that messages can get queued up even if there is no
server listening.
Now we also need to delete the "duplicate server on topic" test - it's
actually fine to have multiple servers listening on the one topic.
Transport drivers can raise transport driver specific exceptions
in listen(). Catch such exceptions and wrap them in a ServerListenError
which callers to start() can explicitly handle.
This is something I think Doug has been trying to tell me to do from the
start :-)
The main idea is to remove all the MessageHandlingServer subclasses and,
instead, if you want a server which is hooked up with the RPC dispatcher
you just use this convenience function:
server = rpc_server.get_rpc_server(transport, target, endpoints)
This means the dispatcher interface is now part of the public API, but
that should be fine since it's very simple - it's a callable that takes
a request context and message.
However, we also need to be able to construct a MessageHandlingServer
with a specific executor. By having an executor_cls parameter to the
constructor as part of the public API, we'd be exposing the executor
interface which is quite likely to change. Instead - and this seems
obvious in retrospect - just use stevedore to load executors and allow
them to be requested by name:
server = rpc_server.get_rpc_server(transport, target, endpoints,
executor='eventlet')
This also means we can get rid of openstack.common.messaging.eventlet.
We really don't want to depend on openstack.common.local since it
implies a dependency on eventlet.corolocal.
Instead, make the check_for_lock parameter a callable which is given the
ConfigOpts object and returns a list of held locks. To hook it up to
lockutils, just do:
client = messaging.RPCClient(transport, target,
check_for_lock=lockutils.check_for_lock)
Although you probably want to use lockutils.debug_check_for_lock() which
only does the check if debugging is enabled.
There's really no reason not to - you could have a single client object
invoking methods on a variety of different targets so long as they're
all available via the same transport.
If an endpoint has no target set, or if it is set to None, just act
like the target is Target(namespace=None, version='1.0')
Also, refactor the dispatcher's inspection of endpoints a little.
This mimics what we do with amqp.ProxyCallback.
It might be nice to have errors like "no such method" and "unspupported
version" raised before spawning a greenthread, but that would mean
either turning the dispatcher into a two step lookup/invoke interface or
having I/O framework specific dispatchers.
This is just an implementation detail of the public EventletRPCServer
and BlockingRPCServer classes.
This is important because I'm not sure we've got the right separation
of concerns between executors and dispatchers yet:
http://lists.openstack.org/pipermail/openstack-dev/2013-June/009934.html
That implies that you need to pair a tulip-aware executor with a
tulip-aware dispatcher. We'll probably need to do something similar for
eventlet too.
I think we need the RPC dispatcher to just know about RPC specific stuff
and there's a single abstraction for I/O framework integration.