Remove my notes and test scripts
These aren't really appropriate for dumping in the repo like this so I've moved them here: https://github.com/markmc/oslo.messaging-notes Change-Id: Iba4c19cefd8a02831543d5c2c3b1886d5d31fa63
This commit is contained in:
parent
e987525dc0
commit
03fcbe6ca6
@ -1,105 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
Projects will need to do e.g.
|
|
||||||
|
|
||||||
-- nova.config:
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
TRANSPORT_DRIVER = None
|
|
||||||
|
|
||||||
|
|
||||||
def parse_args(argv, ...):
|
|
||||||
messaging.set_transport_defaults(control_exchange='nova')
|
|
||||||
cfg.CONF(...)
|
|
||||||
TRANSPORT_DRIVER = transport.get_transport(cfg.CONF)
|
|
||||||
|
|
||||||
-- nova.scheduler.rpcapi:
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
from nova import config
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
|
|
||||||
|
|
||||||
class SchedulerAPI(messaging.RPCClient):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
target = messaging.Target(topic=CONF.scheduler_topic, version='2.0')
|
|
||||||
super(SchedulerAPI, self).__init__(config.TRANSPORT_DRIVER, target)
|
|
||||||
|
|
||||||
....
|
|
||||||
|
|
||||||
def select_hosts(self, ctxt, request_spec, filter_properties):
|
|
||||||
# FIXME(markmc): ctxt
|
|
||||||
cctxt = self.prepare(version='2.6')
|
|
||||||
return ctxt.call('select_hosts',
|
|
||||||
request_spec=request_spec,
|
|
||||||
filter_properties=filter_properties)
|
|
||||||
|
|
||||||
-- nova.service:
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
from nova import baserpc
|
|
||||||
from nova import config
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
...
|
|
||||||
target = messaging.Target(topic=self.topic, self.server)
|
|
||||||
|
|
||||||
base_rpc = baserpc.BaseRPCAPI(self.service_name, backdoor_port)
|
|
||||||
|
|
||||||
self.rpcserver = messaging.get_rpc_server(config.TRANSPORT_DRIVER,
|
|
||||||
target,
|
|
||||||
[self.manager, base_rpc],
|
|
||||||
executor='eventlet')
|
|
||||||
|
|
||||||
LOG.debug(_("Starting RPC server for %(topic)s on %(server)s") %
|
|
||||||
dict(topic=self.topic, host=self.server))
|
|
||||||
|
|
||||||
self.rpcserver.start()
|
|
||||||
|
|
||||||
...
|
|
||||||
self.rpcserver.stop()
|
|
||||||
self.rpcserver.wait()
|
|
||||||
|
|
||||||
|
|
||||||
== notifier ==
|
|
||||||
|
|
||||||
Will need e.g.
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
from nova import config
|
|
||||||
|
|
||||||
def get_notifier(host=None):
|
|
||||||
global _NOTIFIER
|
|
||||||
if _NOTIFIER is None:
|
|
||||||
_NOTIFIER = messaging.Notifier(cfg.CONF,
|
|
||||||
'compute.%s' % (host or cfg.host,
|
|
||||||
transport=config.TRANSPORT_DRIVER)
|
|
||||||
return _NOTIFIER
|
|
||||||
|
|
||||||
|
|
||||||
def notify_about_instance_usage(context, instance, event, ..., host=None):
|
|
||||||
usage_info = notifications.info_from_instance(context, instance, ...)
|
|
||||||
|
|
||||||
notifier = get_notifier(host)
|
|
||||||
notify = notifier.error if event.endswith("error") else notifier.info
|
|
||||||
notify(context, 'compute.instance.%s' % event, usage_info)
|
|
||||||
|
|
||||||
Misc changes vs openstack.common.notifier.api:
|
|
||||||
|
|
||||||
- jsonutil.to_primitive(payload, convert_instances=True) is no longer called,
|
|
||||||
I'm figuring that you should supply a serializer instead
|
|
||||||
- need to construct a Notifier object, there is no global notifier state
|
|
||||||
- you'll need a Notifier object per-service, since publisher_id is an object
|
|
||||||
attribute
|
|
||||||
- publisher_id() has been removed, so you do 'service.host' manually
|
|
||||||
- the log levels aren't exposed by the API, instead use the info() etc.
|
|
||||||
methods on the Notifier class
|
|
||||||
- notifiy_decorator has been removed, see:
|
|
||||||
https://github.com/markmc/oslo-incubator/tree/oslo-messaging-notify-decorator
|
|
@ -1,98 +0,0 @@
|
|||||||
|
|
||||||
TODO:
|
|
||||||
|
|
||||||
- apps need to be able to register backwards compat aliases for
|
|
||||||
entry point names
|
|
||||||
|
|
||||||
get_transport(conf, driver_aliases={
|
|
||||||
'nova.openstack.common.rpc.impl_kombu': 'rabbit',
|
|
||||||
'nova.openstack.common.rpc.impl_qpid: 'qpid',
|
|
||||||
'nova.openstack.common.rpc.impl_zmq: 'zmq'})
|
|
||||||
|
|
||||||
- we need some way for the dispatcher to take the incoming
|
|
||||||
context dict and instantiate a user-supplied request context
|
|
||||||
object with it
|
|
||||||
|
|
||||||
- @expose decorator
|
|
||||||
|
|
||||||
- when shutting down a dispatcher, do we need to invoke
|
|
||||||
a cleanup method on the listener?
|
|
||||||
|
|
||||||
- ClientException - e.g. the executor should handle this being
|
|
||||||
raised by the dispatcher
|
|
||||||
|
|
||||||
- the InvalidTarget checks seem like they're generic preconditions
|
|
||||||
that all drivers would want enforced
|
|
||||||
|
|
||||||
- _safe_log() logs sanitized message data
|
|
||||||
|
|
||||||
- unique_id used to reject duplicate messages
|
|
||||||
|
|
||||||
- local.store.context used by common logging - basically, how to make
|
|
||||||
sure the context of the currently dispatching rpc is available to
|
|
||||||
logging. Need to abstract out the dependency on eventlet for this.
|
|
||||||
|
|
||||||
- I'm not sure listener.done() is really needed - can't we ack the
|
|
||||||
message before returning it from poll() ?
|
|
||||||
|
|
||||||
- envelope=True/False really sucks - it's a transport driver specific
|
|
||||||
flag and we're only using it to communicate whether to use the new
|
|
||||||
or older on-the-wire notification message format. Maybe we should
|
|
||||||
have a high-level "notification message format version" which each
|
|
||||||
transport driver can map to an on-the-wire format. Meh.
|
|
||||||
|
|
||||||
Things I don't like:
|
|
||||||
|
|
||||||
- CallContext - we already abuse the term "context" enough
|
|
||||||
|
|
||||||
- There's something about using a context manager for prepare() that
|
|
||||||
I like:
|
|
||||||
|
|
||||||
with client.prepare(version='2.6') as cctxt:
|
|
||||||
cctxt.call('select_host',
|
|
||||||
request_spec=request_spec,
|
|
||||||
filter_properties=filter_properties)
|
|
||||||
|
|
||||||
but it seems a bit nonsensical
|
|
||||||
|
|
||||||
- "endpoints" - better than api_objs, callbacks, proxyobj, etc.
|
|
||||||
|
|
||||||
- we probably won't use BlockingRPCExecutor anywhere, but I think
|
|
||||||
it does a good job of showing the basic job of a dispatcher
|
|
||||||
implementation
|
|
||||||
|
|
||||||
|
|
||||||
There's a bunch of places where what fields are used in a target
|
|
||||||
is unclear:
|
|
||||||
|
|
||||||
- in driver.listen() and server.start():
|
|
||||||
- required: topic and server
|
|
||||||
- optional: exchange
|
|
||||||
- ignored: namespace, version, fanout
|
|
||||||
|
|
||||||
- in dispatcher:
|
|
||||||
- required: none
|
|
||||||
- optional: namespace, version
|
|
||||||
- ignored: exchange, topic, server, fanout
|
|
||||||
|
|
||||||
- in driver.send():
|
|
||||||
- required: topic
|
|
||||||
- optional: server, fanout and exchange
|
|
||||||
- ignored: namespace, version
|
|
||||||
|
|
||||||
- in client.call() and client.cast():
|
|
||||||
- required: (topic is required by send())
|
|
||||||
- optional: namespace, version (server, fanout, exchange optional to send())
|
|
||||||
- ignored: none
|
|
||||||
|
|
||||||
driver porting guide:
|
|
||||||
|
|
||||||
- implement a BaseDriver subclass:
|
|
||||||
|
|
||||||
- send() should be similar to call()/cast()
|
|
||||||
- listen() should be similar to create_consumer()
|
|
||||||
|
|
||||||
- implement a base.Listener subclass
|
|
||||||
|
|
||||||
- poll() should pull a message off the queue
|
|
||||||
- done() should ack it
|
|
@ -1,57 +0,0 @@
|
|||||||
|
|
||||||
import socket
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
_opts = [
|
|
||||||
cfg.StrOpt('host', default=socket.gethostname()),
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
CONF.register_opts(_opts)
|
|
||||||
|
|
||||||
class Server(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
self.target = messaging.Target(topic='testtopic',
|
|
||||||
server=transport.conf.host,
|
|
||||||
version='2.5')
|
|
||||||
self._server = messaging.get_rpc_server(transport,
|
|
||||||
self.target,
|
|
||||||
[self])
|
|
||||||
super(Server, self).__init__()
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self._server.start()
|
|
||||||
|
|
||||||
def test(self, ctxt, arg):
|
|
||||||
self._server.stop()
|
|
||||||
return arg
|
|
||||||
|
|
||||||
transport = messaging.get_transport(CONF, 'fake:///testexchange')
|
|
||||||
|
|
||||||
server = Server(transport)
|
|
||||||
thread = threading.Thread(target=server.start)
|
|
||||||
thread.daemon = True
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
class Client(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
target = messaging.Target(topic='testtopic', version='2.0')
|
|
||||||
self._client = messaging.RPCClient(transport, target)
|
|
||||||
super(Client, self).__init__()
|
|
||||||
|
|
||||||
def test(self, ctxt, arg):
|
|
||||||
cctxt = self._client.prepare(version='2.5')
|
|
||||||
return cctxt.call(ctxt, 'test', arg=arg)
|
|
||||||
|
|
||||||
|
|
||||||
client = Client(transport)
|
|
||||||
print client.test({'c': 'b'}, 'foo')
|
|
||||||
|
|
||||||
while thread.isAlive():
|
|
||||||
thread.join(.05)
|
|
@ -1,57 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
import eventlet
|
|
||||||
|
|
||||||
eventlet.monkey_patch(os=False)
|
|
||||||
|
|
||||||
import socket
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
_opts = [
|
|
||||||
cfg.StrOpt('host', default=socket.gethostname()),
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
CONF.register_opts(_opts)
|
|
||||||
|
|
||||||
class Server(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
self.target = messaging.Target(topic='testtopic',
|
|
||||||
server=transport.conf.host,
|
|
||||||
version='2.5')
|
|
||||||
self._server = messaging.get_rpc_server(transport,
|
|
||||||
self.target,
|
|
||||||
[self],
|
|
||||||
executor='eventlet')
|
|
||||||
super(Server, self).__init__()
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self._server.start()
|
|
||||||
|
|
||||||
def test(self, ctxt, arg):
|
|
||||||
return arg
|
|
||||||
|
|
||||||
transport = messaging.get_transport(CONF, 'fake:///testexchange')
|
|
||||||
|
|
||||||
server = Server(transport)
|
|
||||||
server.start()
|
|
||||||
|
|
||||||
|
|
||||||
class Client(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
target = messaging.Target(topic='testtopic', version='2.0')
|
|
||||||
self._client = messaging.RPCClient(transport, target)
|
|
||||||
super(Client, self).__init__()
|
|
||||||
|
|
||||||
def test(self, ctxt, arg):
|
|
||||||
cctxt = self._client.prepare(version='2.5')
|
|
||||||
return cctxt.call(ctxt, 'test', arg=arg)
|
|
||||||
|
|
||||||
|
|
||||||
client = Client(transport)
|
|
||||||
print client.test({'c': 'b'}, 'foo')
|
|
@ -1,39 +0,0 @@
|
|||||||
import eventlet
|
|
||||||
|
|
||||||
eventlet.monkey_patch(os=False)
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import socket
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
_opts = [
|
|
||||||
cfg.StrOpt('host', default=socket.gethostname()),
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
CONF.register_opts(_opts)
|
|
||||||
|
|
||||||
LOG = logging.getLogger('client')
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
CONF()
|
|
||||||
CONF.log_opt_values(LOG, logging.DEBUG)
|
|
||||||
|
|
||||||
class Client(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
target = messaging.Target(topic='topic')
|
|
||||||
self._client = messaging.RPCClient(transport, target)
|
|
||||||
super(Client, self).__init__()
|
|
||||||
|
|
||||||
def ping(self, ctxt):
|
|
||||||
return self._client.call(ctxt, 'ping')
|
|
||||||
|
|
||||||
transport = messaging.get_transport(CONF, 'rabbit:///test')
|
|
||||||
|
|
||||||
client = Client(transport)
|
|
||||||
print client.ping({})
|
|
@ -1,52 +0,0 @@
|
|||||||
|
|
||||||
import eventlet
|
|
||||||
|
|
||||||
eventlet.monkey_patch(os=False)
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import socket
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from oslo import messaging
|
|
||||||
|
|
||||||
_opts = [
|
|
||||||
cfg.StrOpt('host', default=socket.gethostname()),
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
CONF.register_opts(_opts)
|
|
||||||
|
|
||||||
LOG = logging.getLogger('server')
|
|
||||||
|
|
||||||
CONF()
|
|
||||||
CONF.log_opt_values(LOG, logging.DEBUG)
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
class Server(object):
|
|
||||||
|
|
||||||
def __init__(self, transport):
|
|
||||||
self.target = messaging.Target(topic='topic',
|
|
||||||
server=transport.conf.host)
|
|
||||||
self._server = messaging.get_rpc_server(transport,
|
|
||||||
self.target,
|
|
||||||
[self],
|
|
||||||
executor='eventlet')
|
|
||||||
super(Server, self).__init__()
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self._server.start()
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
self._server.wait()
|
|
||||||
|
|
||||||
def ping(self, ctxt):
|
|
||||||
LOG.info("PING")
|
|
||||||
return 'ping'
|
|
||||||
|
|
||||||
transport = messaging.get_transport(CONF, 'rabbit:///test')
|
|
||||||
|
|
||||||
server = Server(transport)
|
|
||||||
server.start()
|
|
||||||
server.wait()
|
|
Loading…
x
Reference in New Issue
Block a user