diff --git a/doc/messaging/changes.txt b/doc/messaging/changes.txt deleted file mode 100644 index 606897720..000000000 --- a/doc/messaging/changes.txt +++ /dev/null @@ -1,105 +0,0 @@ - - -Projects will need to do e.g. - - -- nova.config: - - from oslo import messaging - - TRANSPORT_DRIVER = None - - - def parse_args(argv, ...): - messaging.set_transport_defaults(control_exchange='nova') - cfg.CONF(...) - TRANSPORT_DRIVER = transport.get_transport(cfg.CONF) - - -- nova.scheduler.rpcapi: - - from oslo.config import cfg - from oslo import messaging - - from nova import config - - CONF = cfg.CONF - - - class SchedulerAPI(messaging.RPCClient): - - def __init__(self): - target = messaging.Target(topic=CONF.scheduler_topic, version='2.0') - super(SchedulerAPI, self).__init__(config.TRANSPORT_DRIVER, target) - - .... - - def select_hosts(self, ctxt, request_spec, filter_properties): - # FIXME(markmc): ctxt - cctxt = self.prepare(version='2.6') - return ctxt.call('select_hosts', - request_spec=request_spec, - filter_properties=filter_properties) - - -- nova.service: - - from oslo import messaging - - from nova import baserpc - from nova import config - - def start(self): - ... - target = messaging.Target(topic=self.topic, self.server) - - base_rpc = baserpc.BaseRPCAPI(self.service_name, backdoor_port) - - self.rpcserver = messaging.get_rpc_server(config.TRANSPORT_DRIVER, - target, - [self.manager, base_rpc], - executor='eventlet') - - LOG.debug(_("Starting RPC server for %(topic)s on %(server)s") % - dict(topic=self.topic, host=self.server)) - - self.rpcserver.start() - - ... - self.rpcserver.stop() - self.rpcserver.wait() - - -== notifier == - -Will need e.g. - - from oslo import messaging - - from nova import config - - def get_notifier(host=None): - global _NOTIFIER - if _NOTIFIER is None: - _NOTIFIER = messaging.Notifier(cfg.CONF, - 'compute.%s' % (host or cfg.host, - transport=config.TRANSPORT_DRIVER) - return _NOTIFIER - - - def notify_about_instance_usage(context, instance, event, ..., host=None): - usage_info = notifications.info_from_instance(context, instance, ...) - - notifier = get_notifier(host) - notify = notifier.error if event.endswith("error") else notifier.info - notify(context, 'compute.instance.%s' % event, usage_info) - -Misc changes vs openstack.common.notifier.api: - - - jsonutil.to_primitive(payload, convert_instances=True) is no longer called, - I'm figuring that you should supply a serializer instead - - need to construct a Notifier object, there is no global notifier state - - you'll need a Notifier object per-service, since publisher_id is an object - attribute - - publisher_id() has been removed, so you do 'service.host' manually - - the log levels aren't exposed by the API, instead use the info() etc. - methods on the Notifier class - - notifiy_decorator has been removed, see: - https://github.com/markmc/oslo-incubator/tree/oslo-messaging-notify-decorator diff --git a/doc/messaging/notes.txt b/doc/messaging/notes.txt deleted file mode 100644 index cc2dea936..000000000 --- a/doc/messaging/notes.txt +++ /dev/null @@ -1,98 +0,0 @@ - -TODO: - - - apps need to be able to register backwards compat aliases for - entry point names - - get_transport(conf, driver_aliases={ - 'nova.openstack.common.rpc.impl_kombu': 'rabbit', - 'nova.openstack.common.rpc.impl_qpid: 'qpid', - 'nova.openstack.common.rpc.impl_zmq: 'zmq'}) - - - we need some way for the dispatcher to take the incoming - context dict and instantiate a user-supplied request context - object with it - - - @expose decorator - - - when shutting down a dispatcher, do we need to invoke - a cleanup method on the listener? - - - ClientException - e.g. the executor should handle this being - raised by the dispatcher - - - the InvalidTarget checks seem like they're generic preconditions - that all drivers would want enforced - - - _safe_log() logs sanitized message data - - - unique_id used to reject duplicate messages - - - local.store.context used by common logging - basically, how to make - sure the context of the currently dispatching rpc is available to - logging. Need to abstract out the dependency on eventlet for this. - - - I'm not sure listener.done() is really needed - can't we ack the - message before returning it from poll() ? - - - envelope=True/False really sucks - it's a transport driver specific - flag and we're only using it to communicate whether to use the new - or older on-the-wire notification message format. Maybe we should - have a high-level "notification message format version" which each - transport driver can map to an on-the-wire format. Meh. - -Things I don't like: - - - CallContext - we already abuse the term "context" enough - - - There's something about using a context manager for prepare() that - I like: - - with client.prepare(version='2.6') as cctxt: - cctxt.call('select_host', - request_spec=request_spec, - filter_properties=filter_properties) - - but it seems a bit nonsensical - - - "endpoints" - better than api_objs, callbacks, proxyobj, etc. - - - we probably won't use BlockingRPCExecutor anywhere, but I think - it does a good job of showing the basic job of a dispatcher - implementation - - -There's a bunch of places where what fields are used in a target -is unclear: - - - in driver.listen() and server.start(): - - required: topic and server - - optional: exchange - - ignored: namespace, version, fanout - - - in dispatcher: - - required: none - - optional: namespace, version - - ignored: exchange, topic, server, fanout - - - in driver.send(): - - required: topic - - optional: server, fanout and exchange - - ignored: namespace, version - - - in client.call() and client.cast(): - - required: (topic is required by send()) - - optional: namespace, version (server, fanout, exchange optional to send()) - - ignored: none - -driver porting guide: - - - implement a BaseDriver subclass: - - - send() should be similar to call()/cast() - - listen() should be similar to create_consumer() - - - implement a base.Listener subclass - - - poll() should pull a message off the queue - - done() should ack it diff --git a/doc/messaging/test-blocking.py b/doc/messaging/test-blocking.py deleted file mode 100644 index d1aa344bd..000000000 --- a/doc/messaging/test-blocking.py +++ /dev/null @@ -1,57 +0,0 @@ - -import socket -import threading - -from oslo.config import cfg - -from oslo import messaging - -_opts = [ - cfg.StrOpt('host', default=socket.gethostname()), -] - -CONF = cfg.CONF -CONF.register_opts(_opts) - -class Server(object): - - def __init__(self, transport): - self.target = messaging.Target(topic='testtopic', - server=transport.conf.host, - version='2.5') - self._server = messaging.get_rpc_server(transport, - self.target, - [self]) - super(Server, self).__init__() - - def start(self): - self._server.start() - - def test(self, ctxt, arg): - self._server.stop() - return arg - -transport = messaging.get_transport(CONF, 'fake:///testexchange') - -server = Server(transport) -thread = threading.Thread(target=server.start) -thread.daemon = True -thread.start() - -class Client(object): - - def __init__(self, transport): - target = messaging.Target(topic='testtopic', version='2.0') - self._client = messaging.RPCClient(transport, target) - super(Client, self).__init__() - - def test(self, ctxt, arg): - cctxt = self._client.prepare(version='2.5') - return cctxt.call(ctxt, 'test', arg=arg) - - -client = Client(transport) -print client.test({'c': 'b'}, 'foo') - -while thread.isAlive(): - thread.join(.05) diff --git a/doc/messaging/test-eventlet.py b/doc/messaging/test-eventlet.py deleted file mode 100644 index fb8cace93..000000000 --- a/doc/messaging/test-eventlet.py +++ /dev/null @@ -1,57 +0,0 @@ - - -import eventlet - -eventlet.monkey_patch(os=False) - -import socket - -from oslo.config import cfg - -from oslo import messaging - -_opts = [ - cfg.StrOpt('host', default=socket.gethostname()), -] - -CONF = cfg.CONF -CONF.register_opts(_opts) - -class Server(object): - - def __init__(self, transport): - self.target = messaging.Target(topic='testtopic', - server=transport.conf.host, - version='2.5') - self._server = messaging.get_rpc_server(transport, - self.target, - [self], - executor='eventlet') - super(Server, self).__init__() - - def start(self): - self._server.start() - - def test(self, ctxt, arg): - return arg - -transport = messaging.get_transport(CONF, 'fake:///testexchange') - -server = Server(transport) -server.start() - - -class Client(object): - - def __init__(self, transport): - target = messaging.Target(topic='testtopic', version='2.0') - self._client = messaging.RPCClient(transport, target) - super(Client, self).__init__() - - def test(self, ctxt, arg): - cctxt = self._client.prepare(version='2.5') - return cctxt.call(ctxt, 'test', arg=arg) - - -client = Client(transport) -print client.test({'c': 'b'}, 'foo') diff --git a/doc/messaging/test-rabbit-client.py b/doc/messaging/test-rabbit-client.py deleted file mode 100644 index 4fe9eb22a..000000000 --- a/doc/messaging/test-rabbit-client.py +++ /dev/null @@ -1,39 +0,0 @@ -import eventlet - -eventlet.monkey_patch(os=False) - -import logging -import socket - -from oslo.config import cfg - -from oslo import messaging - -_opts = [ - cfg.StrOpt('host', default=socket.gethostname()), -] - -CONF = cfg.CONF -CONF.register_opts(_opts) - -LOG = logging.getLogger('client') - -logging.basicConfig(level=logging.DEBUG) - -CONF() -CONF.log_opt_values(LOG, logging.DEBUG) - -class Client(object): - - def __init__(self, transport): - target = messaging.Target(topic='topic') - self._client = messaging.RPCClient(transport, target) - super(Client, self).__init__() - - def ping(self, ctxt): - return self._client.call(ctxt, 'ping') - -transport = messaging.get_transport(CONF, 'rabbit:///test') - -client = Client(transport) -print client.ping({}) diff --git a/doc/messaging/test-rabbit-server.py b/doc/messaging/test-rabbit-server.py deleted file mode 100644 index ff67c00ab..000000000 --- a/doc/messaging/test-rabbit-server.py +++ /dev/null @@ -1,52 +0,0 @@ - -import eventlet - -eventlet.monkey_patch(os=False) - -import logging -import socket - -from oslo.config import cfg - -from oslo import messaging - -_opts = [ - cfg.StrOpt('host', default=socket.gethostname()), -] - -CONF = cfg.CONF -CONF.register_opts(_opts) - -LOG = logging.getLogger('server') - -CONF() -CONF.log_opt_values(LOG, logging.DEBUG) - -logging.basicConfig(level=logging.DEBUG) - -class Server(object): - - def __init__(self, transport): - self.target = messaging.Target(topic='topic', - server=transport.conf.host) - self._server = messaging.get_rpc_server(transport, - self.target, - [self], - executor='eventlet') - super(Server, self).__init__() - - def start(self): - self._server.start() - - def wait(self): - self._server.wait() - - def ping(self, ctxt): - LOG.info("PING") - return 'ping' - -transport = messaging.get_transport(CONF, 'rabbit:///test') - -server = Server(transport) -server.start() -server.wait()