Added the taskmanager class with some testing rpc code

* Fixed a bug in rpc kombu w/ a bad durable declaration
* Fixed the name of the queue exchange
* Added a bit of rpc code to the taskmanager service for consuming
* * This is mostly experimental at this point!!!
* * This should be refactored into something common!!!
This commit is contained in:
Michael Basnight 2012-03-05 07:59:34 -06:00
parent c6a27dc540
commit f2d09827cd
6 changed files with 264 additions and 4 deletions

72
bin/reddwarf-taskmanager Executable file
View File

@ -0,0 +1,72 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gettext
import optparse
import os
import sys
gettext.install('reddwarf', unicode=1)
# If ../reddwarf/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'reddwarf', '__init__.py')):
sys.path.insert(0, possible_topdir)
from reddwarf import version
from reddwarf.common import config
from reddwarf.common import wsgi
#from reddwarf.db import db_api
def create_options(parser):
"""Sets up the CLI and config-file options
:param parser: The option parser
:returns: None
"""
parser.add_option('-p', '--port', dest="port", metavar="PORT",
type=int, default=8778,
help="Port the Reddwarf Work Manager listens on. "
"Default: %default")
config.add_common_options(parser)
config.add_log_options(parser)
if __name__ == '__main__':
oparser = optparse.OptionParser(version="%%prog %s"
% version.version_string())
create_options(oparser)
(options, args) = config.parse_options(oparser)
try:
conf, app = config.Config.load_paste_app('reddwarf-taskmanager', options, args)
#db_api.configure_db(conf)
server = wsgi.Server()
server.start(app, options.get('port', conf['bind_port']),
conf['bind_host'])
server.wait()
except RuntimeError as error:
import traceback
print traceback.format_exc()
sys.exit("ERROR: %s" % error)

View File

@ -0,0 +1,92 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = True
# Address to bind the API server
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 8778
# AMQP Connection info
rabbit_password=f7999d1955c5014aa32c
# SQLAlchemy connection string for the reference implementation
# registry server. Any valid SQLAlchemy connection string is fine.
# See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine
sql_connection = sqlite:///reddwarf_test.sqlite
# sql_connection = mysql://root:root@localhost/reddwarf
#sql_connection = postgresql://reddwarf:reddwarf@localhost/reddwarf
# Period in seconds after which SQLAlchemy should reestablish its connection
# to the database.
#
# MySQL uses a default `wait_timeout` of 8 hours, after which it will drop
# idle connections. This can result in 'MySQL Gone Away' exceptions. If you
# notice this, you can lower this value to ensure that SQLAlchemy reconnects
# before MySQL can drop the connection.
sql_idle_timeout = 3600
#DB Api Implementation
db_api_implementation = "reddwarf.db.sqlalchemy.api"
# Path to the extensions
api_extensions_path = reddwarf/extensions
# Configuration options for talking to nova via the novaclient.
# These options are for an admin user in your keystone config.
# It proxy's the token received from the user to send to nova via this admin users creds,
# basically acting like the client via that proxy token.
reddwarf_proxy_admin_user = admin
reddwarf_proxy_admin_pass = 3de4922d8b6ac5a1aad9
reddwarf_proxy_admin_tenant_name = admin
reddwarf_auth_url = http://0.0.0.0:5000/v2.0
# ============ notifer queue kombu connection options ========================
notifier_queue_hostname = localhost
notifier_queue_userid = guest
notifier_queue_password = guest
notifier_queue_ssl = False
notifier_queue_port = 5672
notifier_queue_virtual_host = /
notifier_queue_transport = memory
[composite:reddwarf-taskmanager]
use = call:reddwarf.common.wsgi:versioned_urlmap
/: versions
/v0.1: reddwarf-taskmanagerapi
[app:versions]
paste.app_factory = reddwarf.versions:app_factory
[pipeline:reddwarf-taskmanagerapi]
pipeline = taskmanagerapp
#pipeline = debug extensions reddwarfapp
[filter:extensions]
paste.filter_factory = reddwarf.common.extensions:factory
[filter:tokenauth]
paste.filter_factory = keystone.middleware.auth_token:filter_factory
service_protocol = http
service_host = 127.0.0.1
service_port = 5000
auth_host = 127.0.0.1
auth_port = 35357
auth_protocol = http
auth_uri = http://127.0.0.1:5000/
admin_token = be19c524ddc92109a224
[filter:authorization]
paste.filter_factory = reddwarf.common.auth:AuthorizationMiddleware.factory
[app:taskmanagerapp]
paste.app_factory = reddwarf.taskmanager.service:app_factory
#Add this filter to log request and response for debugging
[filter:debug]
paste.filter_factory = reddwarf.common.wsgi:Debug

View File

@ -52,6 +52,9 @@ class InstanceController(BaseController):
def index(self, req, tenant_id):
"""Return all instances."""
servers = models.Instances(req.headers["X-Auth-Token"]).data()
#TODO(hub-cap): Remove this, this is only for testing communication between services
rpc.cast(context.ReddwarfContext(), "foo.ubuntu", {"method":"ZOMG", "BARRRR":"ARGGGGG"})
return wsgi.Result(views.InstancesView(servers).data(), 201)
def show(self, req, tenant_id, id):

View File

@ -149,12 +149,12 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed
"""
# Default options
options = {'durable': config.Config.get('rabbit_durable_queues', True),
options = {'durable': config.Config.get('rabbit_durable_queues', False),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=config.Config.get('control_exchange', 'nova'),
name=config.Config.get('control_exchange', 'reddwarf'),
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
@ -262,7 +262,7 @@ class TopicPublisher(Publisher):
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
config.Config.get('control_exchange', 'nova'),
config.Config.get('control_exchange', 'reddwarf'),
topic,
type='topic',
**options)
@ -334,7 +334,7 @@ class Connection(object):
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
params.setdefault('hostname', config.Config.get('rabbit_host','localhost'))
params.setdefault('hostname', config.Config.get('rabbit_host','127.0.0.1'))
params.setdefault('port', config.Config.get('rabbit_port',5672))
params.setdefault('userid', config.Config.get('rabbit_userid','guest'))
params.setdefault('password', config.Config.get('rabbit_password','f7999d1955c5014aa32c'))

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,77 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import routes
import webob.exc
from reddwarf.common import wsgi
from reddwarf import rpc
LOG = logging.getLogger('reddwarf.taskmanager.service')
class Controller(wsgi.Controller):
"""Base controller class."""
connected = False
#TODO(hub-cap):Make this not so nasty, this should not be here
def _create_connection(self, topic, host):
# Create a connection for rpc usage
if (self.connected):
return
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
topic)
# Share this same connection for these Consumers
self.conn.create_consumer(topic, self, fanout=False)
node_topic = '%s.%s' % (topic, host)
self.conn.create_consumer(node_topic, self, fanout=False)
self.conn.create_consumer(topic, self, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def index(self, req):
"""Gets a list of all tasks available"""
self._create_connection("foo", "ubuntu")
return "All Tasks -- Impl TBD"
def show(self, req, id):
"""Gets detailed information about an individual task"""
return "Single Task -- Impl TBD"
class API(wsgi.Router):
"""API"""
def __init__(self):
mapper = routes.Mapper()
super(API, self).__init__(mapper)
self._instance_router(mapper)
def _instance_router(self, mapper):
resource = Controller().create_resource()
path = "/tasks"
mapper.resource("task", path, controller=resource)
def app_factory(global_conf, **local_conf):
return API()