API extension and DB support for service types

Blueprint quantum-service-type

This patch allows for managing service types through the API.
The default service type is specified in the configuration file.
The patch also provides a 'dummy' API extension, which uses the
'dummy' service plugin, as a PoC for usage of service type.
The dummy API extension is used in unit tests only.

Change-Id: I97d400b941fa7925b0efa0fd0d35c07419ff6bfa
This commit is contained in:
Salvatore Orlando 2012-12-07 06:33:48 -08:00
parent 41b4490a41
commit 98fcdc0d6f
18 changed files with 1276 additions and 61 deletions

View File

@ -41,5 +41,11 @@
"get_port": "rule:admin_or_owner", "get_port": "rule:admin_or_owner",
"update_port": "rule:admin_or_owner", "update_port": "rule:admin_or_owner",
"update_port:fixed_ips": "rule:admin_or_network_owner", "update_port:fixed_ips": "rule:admin_or_network_owner",
"delete_port": "rule:admin_or_owner" "delete_port": "rule:admin_or_owner",
"extension:service_type:view_extended": "rule:admin_only",
"create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only",
"get_service_type": "rule:regular_user"
} }

View File

@ -189,3 +189,11 @@ notification_topics = notifications
# default driver to use for quota checks # default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver # quota_driver = quantum.quota.ConfDriver
[DEFAULT_SERVICETYPE]
# Description of the default service type (optional)
# description = "default service type"
# Enter a service definition line for each advanced service provided
# by the default service type.
# Each service definition should be in the following format:
# <service>:<plugin>[:driver]

View File

@ -99,7 +99,7 @@ class Controller(object):
member_actions = [] member_actions = []
self._plugin = plugin self._plugin = plugin
self._collection = collection.replace('-', '_') self._collection = collection.replace('-', '_')
self._resource = resource self._resource = resource.replace('-', '_')
self._attr_info = attr_info self._attr_info = attr_info
self._allow_bulk = allow_bulk self._allow_bulk = allow_bulk
self._native_bulk = self._is_native_bulk_supported() self._native_bulk = self._is_native_bulk_supported()

View File

@ -247,3 +247,8 @@ class InvalidExtenstionEnv(BadRequest):
class TooManyExternalNetworks(QuantumException): class TooManyExternalNetworks(QuantumException):
message = _("More than one external network exists") message = _("More than one external network exists")
class InvalidConfigurationOption(QuantumException):
message = _("An invalid value was provided for %(opt_name)s: "
"%(opt_value)s")

View File

@ -0,0 +1,77 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""DB support for service types
Revision ID: 48b6f43f7471
Revises: 5a875d0e5c
Create Date: 2013-01-07 13:47:29.093160
"""
# revision identifiers, used by Alembic.
revision = '48b6f43f7471'
down_revision = '5a875d0e5c'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'*'
]
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from quantum.db import migration
def upgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
op.create_table(
u'servicetypes',
sa.Column(u'tenant_id', mysql.VARCHAR(length=255), nullable=True),
sa.Column(u'id', mysql.VARCHAR(length=36), nullable=False),
sa.Column(u'name', mysql.VARCHAR(length=255), nullable=True),
sa.Column(u'description', mysql.VARCHAR(length=255), nullable=True),
sa.Column(u'default', mysql.TINYINT(display_width=1),
autoincrement=False, nullable=False),
sa.Column(u'num_instances', mysql.INTEGER(display_width=11),
autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint(u'id'))
op.create_table(
u'servicedefinitions',
sa.Column(u'id', mysql.VARCHAR(length=36), nullable=False),
sa.Column(u'service_class', mysql.VARCHAR(length=255),
nullable=False),
sa.Column(u'plugin', mysql.VARCHAR(length=255), nullable=True),
sa.Column(u'driver', mysql.VARCHAR(length=255), nullable=True),
sa.Column(u'service_type_id', mysql.VARCHAR(length=36),
nullable=False),
sa.ForeignKeyConstraint(['service_type_id'], [u'servicetypes.id'],
name=u'servicedefinitions_ibfk_1'),
sa.PrimaryKeyConstraint(u'id', u'service_class', u'service_type_id'))
def downgrade(active_plugin=None, options=None):
if not migration.should_run(active_plugin, migration_for_plugins):
return
op.drop_table(u'servicedefinitions')
op.drop_table(u'servicetypes')

View File

@ -0,0 +1,328 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Salvatore Orlando, VMware
#
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.sql import expression as expr
from quantum.common import exceptions as q_exc
from quantum import context
from quantum.db import api as db
from quantum.db import model_base
from quantum.db import models_v2
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum import policy
LOG = logging.getLogger(__name__)
DEFAULT_SVCTYPE_NAME = 'default'
default_servicetype_opts = [
cfg.StrOpt('description',
default='',
help=_('Textual description for the default service type')),
cfg.MultiStrOpt('service_definition',
help=_('Defines a provider for an advanced service '
'using the format: <service>:<plugin>[:<driver>]'))
]
cfg.CONF.register_opts(default_servicetype_opts, 'DEFAULT_SERVICETYPE')
def parse_service_definition_opt():
""" parse service definition opts and returns result """
results = []
svc_def_opt = cfg.CONF.DEFAULT_SERVICETYPE.service_definition
try:
for svc_def_str in svc_def_opt:
split = svc_def_str.split(':')
svc_def = {'service_class': split[0],
'plugin': split[1]}
try:
svc_def['driver'] = split[2]
except IndexError:
# Never mind, driver is optional
LOG.debug(_("Default service type - no driver for service "
"%(service_class)s and plugin %(plugin)s"),
svc_def)
results.append(svc_def)
return results
except (TypeError, IndexError):
raise q_exc.InvalidConfigurationOption(opt_name='service_definition',
opt_value=svc_def_opt)
class NoDefaultServiceDefinition(q_exc.QuantumException):
message = _("No default service definition in configuration file. "
"Please add service definitions using the service_definition "
"variable in the [DEFAULT_SERVICETYPE] section")
class ServiceTypeNotFound(q_exc.NotFound):
message = _("Service type %(service_type_id)s could not be found ")
class ServiceTypeInUse(q_exc.InUse):
message = _("There are still active instances of service type "
"'%(service_type_id)s'. Therefore it cannot be removed.")
class ServiceDefinition(model_base.BASEV2, models_v2.HasId):
service_class = sa.Column(sa.String(255), primary_key=True)
plugin = sa.Column(sa.String(255))
driver = sa.Column(sa.String(255))
service_type_id = sa.Column(sa.String(36),
sa.ForeignKey('servicetypes.id',
ondelete='CASCADE'),
primary_key=True)
class ServiceType(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
""" Service Type Object Model """
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
default = sa.Column(sa.Boolean(), nullable=False, default=False)
service_definitions = orm.relationship(ServiceDefinition,
backref='servicetypes',
lazy='joined',
cascade='all')
# Keep track of number of instances for this service type
num_instances = sa.Column(sa.Integer(), default=0)
def as_dict(self):
""" Convert a row into a dict """
ret_dict = {}
for c in self.__table__.columns:
ret_dict[c.name] = getattr(self, c.name)
return ret_dict
class ServiceTypeManager(object):
""" Manage service type objects in Quantum database """
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self._initialize_db()
ctx = context.get_admin_context()
# Init default service type from configuration file
svc_defs = cfg.CONF.DEFAULT_SERVICETYPE.service_definition
if not svc_defs:
raise NoDefaultServiceDefinition()
def_service_type = {'name': DEFAULT_SVCTYPE_NAME,
'description':
cfg.CONF.DEFAULT_SERVICETYPE.description,
'service_definitions':
parse_service_definition_opt(),
'default': True}
# Create or update record in database
def_svc_type_db = self._get_default_service_type(ctx)
if not def_svc_type_db:
def_svc_type_db = self._create_service_type(ctx, def_service_type)
else:
self._update_service_type(ctx,
def_svc_type_db['id'],
def_service_type,
svc_type_db=def_svc_type_db)
LOG.debug(_("Default service type record updated in Quantum database. "
"identifier is '%s'"), def_svc_type_db['id'])
def _initialize_db(self):
db.configure_db()
# Register models for service type management
# Note this might have been already done if configure_db also
# created the engine
db.register_models(models_v2.model_base.BASEV2)
def _create_service_type(self, context, service_type):
svc_defs = service_type.pop('service_definitions')
with context.session.begin(subtransactions=True):
svc_type_db = ServiceType(**service_type)
# and now insert provided service type definitions
for svc_def in svc_defs:
svc_type_db.service_definitions.append(
ServiceDefinition(**svc_def))
# sqlalchemy save-update on relationship is on by
# default, the following will save both the service
# type and its service definitions
context.session.add(svc_type_db)
return svc_type_db
def _update_service_type(self, context, id, service_type,
svc_type_db=None):
with context.session.begin(subtransactions=True):
if not svc_type_db:
svc_type_db = self._get_service_type(context, id)
try:
svc_defs_map = dict([(svc_def['service'], svc_def)
for svc_def in
service_type.pop('service_definitions')])
except KeyError:
# No service defs in request
svc_defs_map = {}
svc_type_db.update(service_type)
for svc_def_db in svc_type_db.service_definitions:
try:
svc_def_db.update(svc_defs_map.pop(
svc_def_db['service_class']))
except KeyError:
# too bad, the service def was not there
# then we should delete it.
context.session.delete(svc_def_db)
# Add remaining service definitions
for svc_def in svc_defs_map:
context.session.add(ServiceDefinition(**svc_def))
return svc_type_db
def _check_service_type_view_auth(self, context, service_type):
# FIXME(salvatore-orlando): This should be achieved via policy
# engine without need for explicit checks in manager code.
# Also, the policy in this way does not make a lot of sense
return policy.check(context,
"extension:service_type:view_extended",
service_type)
def _get_service_type(self, context, svc_type_id):
try:
query = context.session.query(ServiceType)
return query.filter(ServiceType.id == svc_type_id).one()
# filter is on primary key, do not catch MultipleResultsFound
except orm_exc.NoResultFound:
raise ServiceTypeNotFound(service_type_id=svc_type_id)
def _get_default_service_type(self, context):
try:
query = context.session.query(ServiceType)
return query.filter(ServiceType.default == expr.true()).one()
except orm_exc.NoResultFound:
return
except orm_exc.MultipleResultsFound:
# This should never happen. If it does, take the first instance
query2 = context.session.query(ServiceType)
results = query2.filter(ServiceType.default == expr.true()).all()
LOG.warning(_("Multiple default service type instances found."
"Will use instance '%s'"), results[0]['id'])
return results[0]
def _make_svc_type_dict(self, context, svc_type, fields=None):
def _make_svc_def_dict(svc_def_db):
svc_def = {'service_class': svc_def_db['service_class']}
if self._check_service_type_view_auth(context,
svc_type.as_dict()):
svc_def.update({'plugin': svc_def_db['plugin'],
'driver': svc_def_db['driver']})
return svc_def
res = {'id': svc_type['id'],
'name': svc_type['name'],
'default': svc_type['default'],
'service_definitions':
[_make_svc_def_dict(svc_def) for svc_def
in svc_type['service_definitions']]}
if self._check_service_type_view_auth(context,
svc_type.as_dict()):
res['num_instances'] = svc_type['num_instances']
# Field selection
if fields:
return dict(((k, v) for k, v in res.iteritems()
if k in fields))
return res
def get_service_type(self, context, id, fields=None):
""" Retrieve a service type record """
return self._make_svc_type_dict(context,
self._get_service_type(context, id),
fields)
def get_service_types(self, context, fields=None, filters=None):
""" Retrieve a possibly filtered list of service types """
query = context.session.query(ServiceType)
if filters:
for key, value in filters.iteritems():
column = getattr(ServiceType, key, None)
if column:
query = query.filter(column.in_(value))
return [self._make_svc_type_dict(context, svc_type, fields)
for svc_type in query.all()]
def create_service_type(self, context, service_type):
""" Create a new service type """
svc_type_data = service_type['service_type']
svc_type_db = self._create_service_type(context, svc_type_data)
LOG.debug(_("Created service type object:%s"), svc_type_db['id'])
return self._make_svc_type_dict(context, svc_type_db)
def update_service_type(self, context, id, service_type):
""" Update a service type """
svc_type_data = service_type['service_type']
svc_type_db = self._update_service_type(context, id,
svc_type_data)
return self._make_svc_type_dict(context, svc_type_db)
def delete_service_type(self, context, id):
""" Delete a service type """
# Verify that the service type is not in use.
svc_type_db = self._get_service_type(context, id)
if svc_type_db['num_instances'] > 0:
raise ServiceTypeInUse(service_type_id=svc_type_db['id'])
with context.session.begin(subtransactions=True):
context.session.delete(svc_type_db)
def increase_service_type_refcount(self, context, id):
""" Increase references count for a service type object
This method should be invoked by plugins using the service
type concept everytime an instance of an object associated
with a given service type is created.
"""
#TODO(salvatore-orlando): Devise a better solution than this
#refcount mechanisms. Perhaps adding hooks into models which
#use service types in order to enforce ref. integrity and cascade
with context.session.begin(subtransactions=True):
svc_type_db = self._get_service_type(context, id)
svc_type_db['num_instances'] = svc_type_db['num_instances'] + 1
return svc_type_db['num_instances']
def decrease_service_type_refcount(self, context, id):
""" Decrease references count for a service type object
This method should be invoked by plugins using the service
type concept everytime an instance of an object associated
with a given service type is removed
"""
#TODO(salvatore-orlando): Devise a better solution than this
#refcount mechanisms. Perhaps adding hooks into models which
#use service types in order to enforce ref. integrity and cascade
with context.session.begin(subtransactions=True):
svc_type_db = self._get_service_type(context, id)
if svc_type_db['num_instances'] == 0:
LOG.warning(_("Number of instances for service type "
"'%s' is already 0."), svc_type_db['name'])
return
svc_type_db['num_instances'] = svc_type_db['num_instances'] - 1
return svc_type_db['num_instances']

View File

@ -0,0 +1,190 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Salvatore Orlando, VMware
#
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum.api.v2 import base
from quantum import context
from quantum.db import servicetype_db
from quantum import manager
from quantum.openstack.common import log as logging
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
RESOURCE_NAME = "service-type"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
SERVICE_ATTR = 'service_class'
PLUGIN_ATTR = 'plugin'
DRIVER_ATTR = 'driver'
EXT_ALIAS = RESOURCE_NAME
# Attribute Map for Service Type Resource
RESOURCE_ATTRIBUTE_MAP = {
COLLECTION_NAME: {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'default': {'allow_post': False, 'allow_put': False,
'is_visible': True},
#TODO(salvatore-orlando): Service types should not have ownership
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
'num_instances': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'service_definitions': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:service_definitions':
None}}
}
}
def set_default_svctype_id(original_id):
if not original_id:
svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
# Fetch default service type - it must exist
res = svctype_mgr.get_service_types(context.get_admin_context(),
filters={'default': [True]})
return res[0]['id']
return original_id
def _validate_servicetype_ref(data, valid_values=None):
""" Verify the service type id exists """
svc_type_id = data
svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
try:
svctype_mgr.get_service_type(context.get_admin_context(),
svc_type_id)
except servicetype_db.ServiceTypeNotFound:
return _("The service type '%s' does not exist") % svc_type_id
def _validate_service_defs(data, valid_values=None):
""" Validate the list of service definitions. """
try:
if len(data) == 0:
return _("No service type definition was provided. At least a "
"service type definition must be provided")
f_name = _validate_service_defs.__name__
for svc_def in data:
try:
# Do a copy of the original object so we can easily
# pop out stuff from it
svc_def_copy = svc_def.copy()
try:
svc_name = svc_def_copy.pop(SERVICE_ATTR)
plugin_name = svc_def_copy.pop(PLUGIN_ATTR)
except KeyError:
msg = (_("Required attributes missing in service "
"definition: %s") % svc_def)
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
# Validate 'service' attribute
if not svc_name in constants.ALLOWED_SERVICES:
msg = (_("Service name '%s' unspecified "
"or invalid") % svc_name)
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
# Validate 'plugin' attribute
if not plugin_name:
msg = (_("Plugin name not specified in "
"service definition %s") % svc_def)
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
# TODO(salvatore-orlando): This code will need to change when
# multiple plugins for each adv service will be supported
svc_plugin = manager.QuantumManager.get_service_plugins().get(
svc_name)
if not svc_plugin:
msg = _("No plugin for service '%s'") % svc_name
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
if svc_plugin.get_plugin_name() != plugin_name:
msg = _("Plugin name '%s' is not correct ") % plugin_name
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
# Validate 'driver' attribute (just check it's a string)
# FIXME(salvatore-orlando): This should be a list
# Note: using get() instead of pop() as pop raises if the
# key is not found, which might happen for the driver
driver = svc_def_copy.get(DRIVER_ATTR)
if driver:
msg = attributes._validate_string(driver,)
if msg:
return msg
del svc_def_copy[DRIVER_ATTR]
# Anything left - it should be an error
if len(svc_def_copy):
msg = (_("Unparseable attributes found in "
"service definition %s") % svc_def)
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
except TypeError:
LOG.exception(_("Exception while parsing service "
"definition:%s"), svc_def)
msg = (_("Was expecting a dict for service definition, found "
"the following: %s") % svc_def)
LOG.error("%(f_name)s: %(msg)s", locals())
return msg
except TypeError:
return (_("%s: provided data are not iterable") %
_validate_service_defs.__name__)
attributes.validators['type:service_definitions'] = _validate_service_defs
attributes.validators['type:servicetype_ref'] = _validate_servicetype_ref
class Servicetype(object):
@classmethod
def get_name(cls):
return _("Quantum Service Type Management")
@classmethod
def get_alias(cls):
return EXT_ALIAS
@classmethod
def get_description(cls):
return _("API for retrieving and managing service types for "
"Quantum advanced services")
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/quantum/service-type/api/v1.0"
@classmethod
def get_updated(cls):
return "2013-01-20T00:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Extended Resource for service type management """
controller = base.create_resource(
COLLECTION_NAME, RESOURCE_NAME,
servicetype_db.ServiceTypeManager.get_instance(),
RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME])
return [extensions.ResourceExtension(COLLECTION_NAME,
controller)]

View File

@ -20,6 +20,8 @@ CORE = "CORE"
DUMMY = "DUMMY" DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER" LOADBALANCER = "LOADBALANCER"
# TODO(salvatore-orlando): Move these (or derive them) from conf file
ALLOWED_SERVICES = [CORE, DUMMY, LOADBALANCER]
COMMON_PREFIXES = { COMMON_PREFIXES = {
CORE: "", CORE: "",

View File

@ -1,16 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,32 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.plugins.common import constants
from quantum.plugins.services.service_base import ServicePluginBase
class QuantumDummyPlugin(ServicePluginBase):
supported_extension_aliases = []
def __init__(self):
pass
def get_plugin_type(self):
return constants.DUMMY
def get_plugin_description(self):
return "Quantum Dummy Plugin"

View File

@ -31,6 +31,15 @@ class ServicePluginBase(extensions.PluginInterface):
quantum/plugins/common/constants.py """ quantum/plugins/common/constants.py """
pass pass
@abc.abstractmethod
def get_plugin_name(self):
""" return a symbolic name for the plugin.
Each service plugin should have a symbolic name. This name
will be used, for instance, by service definitions in service types
"""
pass
@abc.abstractmethod @abc.abstractmethod
def get_plugin_description(self): def get_plugin_description(self):
""" returns string description of the plugin """ """ returns string description of the plugin """

View File

@ -22,3 +22,8 @@ rpc_backend = quantum.openstack.common.rpc.impl_fake
[DATABASE] [DATABASE]
sql_connection = 'sqlite:///:memory:' sql_connection = 'sqlite:///:memory:'
[DEFAULT_SERVICETYPE]
description = "default service type"
service_definition=dummy:quantum.tests.unit.dummy_plugin.QuantumDummyPlugin

View File

@ -0,0 +1,139 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.api import extensions
from quantum.api.v2 import base
from quantum.common import exceptions
from quantum.db import servicetype_db
from quantum.extensions import servicetype
from quantum import manager
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
from quantum.plugins.services.service_base import ServicePluginBase
DUMMY_PLUGIN_NAME = "dummy_plugin"
RESOURCE_NAME = "dummy"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
# Attribute Map for dummy resource
RESOURCE_ATTRIBUTE_MAP = {
COLLECTION_NAME: {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
'service_type': {'allow_post': True,
'allow_put': False,
'validate': {'type:servicetype_ref': None},
'convert_to': servicetype.set_default_svctype_id,
'is_visible': True,
'default': None}
}
}
class Dummy(object):
@classmethod
def get_name(cls):
return "dummy"
@classmethod
def get_alias(cls):
return "dummy"
@classmethod
def get_description(cls):
return "Dummy stuff"
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/quantum/dummy/api/v1.0"
@classmethod
def get_updated(cls):
return "2012-11-20T10:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Extended Resource for dummy management """
q_mgr = manager.QuantumManager.get_instance()
dummy_inst = q_mgr.get_service_plugins()['DUMMY']
controller = base.create_resource(
COLLECTION_NAME, RESOURCE_NAME, dummy_inst,
RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME])
return [extensions.ResourceExtension(COLLECTION_NAME,
controller)]
class DummyServicePlugin(ServicePluginBase):
""" This is a simple plugin for managing instantes of a fictional 'dummy'
service. This plugin is provided as a proof-of-concept of how
advanced service might leverage the service type extension.
Ideally, instances of real advanced services, such as load balancing
or VPN will adopt a similar solution.
"""
supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
def __init__(self):
self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
self.dummys = {}
def get_plugin_type(self):
return constants.DUMMY
def get_plugin_name(self):
return DUMMY_PLUGIN_NAME
def get_plugin_description(self):
return "Quantum Dummy Service Plugin"
def get_dummys(self, context, filters, fields):
return self.dummys.values()
def get_dummy(self, context, id, fields):
try:
return self.dummys[id]
except KeyError:
raise exceptions.NotFound()
def create_dummy(self, context, dummy):
d = dummy['dummy']
d['id'] = uuidutils.generate_uuid()
self.dummys[d['id']] = d
self.svctype_mgr.increase_service_type_refcount(context,
d['service_type'])
return d
def update_dummy(self, context, id, dummy):
pass
def delete_dummy(self, context, id):
try:
svc_type_id = self.dummys[id]['service_type']
del self.dummys[id]
self.svctype_mgr.decrease_service_type_refcount(context,
svc_type_id)
except KeyError:
raise exceptions.NotFound()

View File

@ -67,7 +67,7 @@ def setup_metaplugin_conf():
cfg.CONF.set_override('default_l3_flavor', 'fake1', 'META') cfg.CONF.set_override('default_l3_flavor', 'fake1', 'META')
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
#TODO(nati) remove this after subnet quota change is merged #TODO(nati) remove this after subnet quota change is merged
cfg.CONF.max_dns_nameservers = 10 cfg.CONF.set_override('max_dns_nameservers', 10)
class MetaQuantumPluginV2Test(unittest.TestCase): class MetaQuantumPluginV2Test(unittest.TestCase):

View File

@ -0,0 +1,43 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from quantum.common import config
from quantum.openstack.common import cfg
class ConfigurationTest(unittest.TestCase):
def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host)
self.assertEqual(9696, cfg.CONF.bind_port)
self.assertEqual('api-paste.ini', cfg.CONF.api_paste_config)
self.assertEqual('', cfg.CONF.api_extensions_path)
self.assertEqual('policy.json', cfg.CONF.policy_file)
self.assertEqual('keystone', cfg.CONF.auth_strategy)
self.assertEqual(None, cfg.CONF.core_plugin)
self.assertEqual(0, len(cfg.CONF.service_plugins))
self.assertEqual('fa:16:3e:00:00:00', cfg.CONF.base_mac)
self.assertEqual(16, cfg.CONF.mac_generation_retries)
self.assertTrue(cfg.CONF.allow_bulk)
self.assertEqual(5, cfg.CONF.max_dns_nameservers)
self.assertEqual(20, cfg.CONF.max_subnet_host_routes)
self.assertEqual(os.path.abspath('../../..'),
cfg.CONF.state_path)
self.assertEqual(120, cfg.CONF.dhcp_lease_duration)
self.assertFalse(cfg.CONF.allow_overlapping_ips)
self.assertEqual('quantum', cfg.CONF.control_exchange)

View File

@ -100,8 +100,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
# Update the plugin # Update the plugin
cfg.CONF.set_override('core_plugin', plugin) cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.max_dns_nameservers = 2 cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.max_subnet_host_routes = 2 cfg.CONF.set_override('max_subnet_host_routes', 2)
self.api = APIRouter() self.api = APIRouter()
def _is_native_bulk_supported(): def _is_native_bulk_supported():

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import types import types
import unittest2 import unittest2
@ -24,16 +25,26 @@ from quantum.manager import QuantumManager
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.plugins.common import constants from quantum.plugins.common import constants
from quantum.plugins.services.dummy.dummy_plugin import QuantumDummyPlugin from quantum.tests.unit import dummy_plugin
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
def etcdir(*p):
return os.path.join(ETCDIR, *p)
class QuantumManagerTestCase(unittest2.TestCase): class QuantumManagerTestCase(unittest2.TestCase):
def setUp(self): def setUp(self):
super(QuantumManagerTestCase, self).setUp() super(QuantumManagerTestCase, self).setUp()
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
config.parse(args=args)
def tearDown(self): def tearDown(self):
unittest2.TestCase.tearDown(self) unittest2.TestCase.tearDown(self)
@ -45,23 +56,23 @@ class QuantumManagerTestCase(unittest2.TestCase):
test_config.get('plugin_name_v2', test_config.get('plugin_name_v2',
DB_PLUGIN_KLASS)) DB_PLUGIN_KLASS))
cfg.CONF.set_override("service_plugins", cfg.CONF.set_override("service_plugins",
["quantum.plugins.services." ["quantum.tests.unit.dummy_plugin."
"dummy.dummy_plugin.QuantumDummyPlugin"]) "DummyServicePlugin"])
QuantumManager._instance = None QuantumManager._instance = None
mgr = QuantumManager.get_instance() mgr = QuantumManager.get_instance()
plugin = mgr.get_service_plugins()[constants.DUMMY] plugin = mgr.get_service_plugins()[constants.DUMMY]
self.assertTrue( self.assertTrue(
isinstance(plugin, isinstance(plugin,
(QuantumDummyPlugin, types.ClassType)), (dummy_plugin.DummyServicePlugin, types.ClassType)),
"loaded plugin should be of type QuantumDummyPlugin") "loaded plugin should be of type QuantumDummyPlugin")
def test_multiple_plugins_specified_for_service_type(self): def test_multiple_plugins_specified_for_service_type(self):
cfg.CONF.set_override("service_plugins", cfg.CONF.set_override("service_plugins",
["quantum.plugins.services." ["quantum.tests.unit.dummy_plugin."
"dummy.dummy_plugin.QuantumDummyPlugin", "QuantumDummyPlugin",
"quantum.plugins.services." "quantum.tests.unit.dummy_plugin."
"dummy.dummy_plugin.QuantumDummyPlugin"]) "QuantumDummyPlugin"])
QuantumManager._instance = None QuantumManager._instance = None
try: try:

View File

@ -0,0 +1,440 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Salvatore Orlando, VMware
#
import contextlib
import logging
import unittest2 as unittest
import mock
import webob.exc as webexc
import webtest
from quantum.api import extensions
from quantum import context
from quantum.db import api as db_api
from quantum.db import models_v2
from quantum.db import servicetype_db
from quantum.extensions import servicetype
from quantum import manager
from quantum.openstack.common import cfg
from quantum.plugins.common import constants
from quantum.tests.unit import dummy_plugin as dp
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
LOG = logging.getLogger(__name__)
DEFAULT_SERVICE_DEFS = [{'service_class': constants.DUMMY,
'plugin': dp.DUMMY_PLUGIN_NAME}]
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
class TestServiceTypeExtensionManager(object):
""" Mock extensions manager """
def get_resources(self):
return (servicetype.Servicetype.get_resources() +
dp.Dummy.get_resources())
def get_actions(self):
return []
def get_request_extensions(self):
return []
class ServiceTypeTestCaseBase(unittest.TestCase):
def setUp(self):
# This is needed because otherwise a failure will occur due to
# nonexisting core_plugin
cfg.CONF.set_override('core_plugin', test_db_plugin.DB_PLUGIN_KLASS)
cfg.CONF.set_override('service_plugins',
["%s.%s" % (dp.__name__,
dp.DummyServicePlugin.__name__)])
# Make sure at each test a new instance of the plugin is returned
manager.QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
extensions.PluginAwareExtensionManager._instance = None
ext_mgr = TestServiceTypeExtensionManager()
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
self.resource_name = servicetype.RESOURCE_NAME.replace('-', '_')
def tearDown(self):
self.api = None
cfg.CONF.reset()
class ServiceTypeExtensionTestCase(ServiceTypeTestCaseBase):
def setUp(self):
self._patcher = mock.patch(
"%s.%s" % (servicetype_db.__name__,
servicetype_db.ServiceTypeManager.__name__),
autospec=True)
self.mock_mgr = self._patcher.start()
self.mock_mgr.get_instance.return_value = self.mock_mgr.return_value
super(ServiceTypeExtensionTestCase, self).setUp()
def tearDown(self):
self._patcher.stop()
super(ServiceTypeExtensionTestCase, self).tearDown()
def _test_service_type_create(self, env=None,
expected_status=webexc.HTTPCreated.code):
tenant_id = 'fake'
if env and 'quantum.context' in env:
tenant_id = env['quantum.context'].tenant_id
data = {self.resource_name:
{'name': 'test',
'tenant_id': tenant_id,
'service_definitions':
[{'service_class': constants.DUMMY,
'plugin': dp.DUMMY_PLUGIN_NAME}]}}
return_value = data[self.resource_name].copy()
svc_type_id = _uuid()
return_value['id'] = svc_type_id
instance = self.mock_mgr.return_value
instance.create_service_type.return_value = return_value
expect_errors = expected_status >= webexc.HTTPBadRequest.code
res = self.api.post_json(_get_path('service-types'), data,
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_status)
if not expect_errors:
instance.create_service_type.assert_called_with(mock.ANY,
service_type=data)
self.assertTrue(self.resource_name in res.json)
svc_type = res.json[self.resource_name]
self.assertEqual(svc_type['id'], svc_type_id)
# NOTE(salvatore-orlando): The following two checks are
# probably not essential
self.assertEqual(svc_type['service_definitions'],
data[self.resource_name]['service_definitions'])
def _test_service_type_update(self, env=None,
expected_status=webexc.HTTPOk.code):
svc_type_name = 'updated'
tenant_id = 'fake'
if env and 'quantum.context' in env:
tenant_id = env['quantum.context'].tenant_id
data = {self.resource_name: {'name': svc_type_name,
'tenant-id': tenant_id}}
svc_type_id = _uuid()
return_value = {'id': svc_type_id,
'name': svc_type_name}
instance = self.mock_mgr.return_value
expect_errors = expected_status >= webexc.HTTPBadRequest.code
instance.update_service_type.return_value = return_value
res = self.api.put_json(_get_path('service-types/%s' % svc_type_id),
data)
if not expect_errors:
instance.update_service_type.assert_called_with(mock.ANY,
svc_type_id,
service_type=data)
self.assertEqual(res.status_int, webexc.HTTPOk.code)
self.assertTrue(self.resource_name in res.json)
svc_type = res.json[self.resource_name]
self.assertEqual(svc_type['id'], svc_type_id)
self.assertEqual(svc_type['name'],
data[self.resource_name]['name'])
def test_service_type_create(self):
self._test_service_type_create()
def test_service_type_update(self):
self._test_service_type_update()
def test_service_type_delete(self):
svctype_id = _uuid()
instance = self.mock_mgr.return_value
res = self.api.delete(_get_path('service-types/%s' % svctype_id))
instance.delete_service_type.assert_called_with(mock.ANY,
svctype_id)
self.assertEqual(res.status_int, webexc.HTTPNoContent.code)
def test_service_type_get(self):
svctype_id = _uuid()
return_value = {self.resource_name: {'name': 'test',
'service_definitions': [],
'id': svctype_id}}
instance = self.mock_mgr.return_value
instance.get_service_type.return_value = return_value
res = self.api.get(_get_path('service-types/%s' % svctype_id))
instance.get_service_type.assert_called_with(mock.ANY,
svctype_id,
fields=mock.ANY)
self.assertEqual(res.status_int, webexc.HTTPOk.code)
def test_service_type_list(self):
svctype_id = _uuid()
return_value = [{self.resource_name: {'name': 'test',
'service_definitions': [],
'id': svctype_id}}]
instance = self.mock_mgr.return_value
instance.get_service_types.return_value = return_value
res = self.api.get(_get_path('service-types'))
instance.get_service_types.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(res.status_int, webexc.HTTPOk.code)
def test_create_service_type_nonadminctx_returns_403(self):
tenant_id = _uuid()
env = {'quantum.context': context.Context('', tenant_id,
is_admin=False)}
self._test_service_type_create(
env=env, expected_status=webexc.HTTPForbidden.code)
def test_create_service_type_adminctx_returns_200(self):
env = {'quantum.context': context.Context('', '', is_admin=True)}
self._test_service_type_create(env=env)
def test_update_service_type_nonadminctx_returns_403(self):
tenant_id = _uuid()
env = {'quantum.context': context.Context('', tenant_id,
is_admin=False)}
self._test_service_type_update(
env=env, expected_status=webexc.HTTPForbidden.code)
def test_update_service_type_adminctx_returns_200(self):
env = {'quantum.context': context.Context('', '', is_admin=True)}
self._test_service_type_update(env=env)
class ServiceTypeManagerTestCase(ServiceTypeTestCaseBase):
def setUp(self):
db_api._ENGINE = None
db_api._MAKER = None
# Blank out service type manager instance
servicetype_db.ServiceTypeManager._instance = None
plugin_name = "%s.%s" % (dp.__name__, dp.DummyServicePlugin.__name__)
cfg.CONF.set_override('service_definition', ['dummy:%s' % plugin_name],
group='DEFAULT_SERVICETYPE')
super(ServiceTypeManagerTestCase, self).setUp()
def tearDown(self):
super(ServiceTypeManagerTestCase, self).tearDown()
db_api.clear_db()
@contextlib.contextmanager
def service_type(self, name='svc_type',
default=True,
service_defs=None,
do_delete=True):
if not service_defs:
service_defs = [{'service_class': constants.DUMMY,
'plugin': dp.DUMMY_PLUGIN_NAME}]
res = self._create_service_type(name, service_defs)
svc_type = res.json
if res.status_int >= 400:
raise webexc.HTTPClientError(code=res.status_int)
yield svc_type
if do_delete:
# The do_delete parameter allows you to control whether the
# created network is immediately deleted again. Therefore, this
# function is also usable in tests, which require the creation
# of many networks.
self._delete_service_type(svc_type[self.resource_name]['id'])
def _list_service_types(self):
return self.api.get(_get_path('service-types'))
def _show_service_type(self, svctype_id, expect_errors=False):
return self.api.get(_get_path('service-types/%s' % str(svctype_id)),
expect_errors=expect_errors)
def _create_service_type(self, name, service_defs,
default=None, expect_errors=False):
data = {self.resource_name:
{'name': name,
'service_definitions': service_defs}
}
if default:
data[self.resource_name]['default'] = default
if not 'tenant_id' in data[self.resource_name]:
data[self.resource_name]['tenant_id'] = 'fake'
return self.api.post_json(_get_path('service-types'), data,
expect_errors=expect_errors)
def _create_dummy(self, dummyname='dummyobject'):
data = {'dummy': {'name': dummyname,
'tenant_id': 'fake'}}
dummy_res = self.api.post_json(_get_path('dummys'), data)
return dummy_res.json['dummy']
def _update_service_type(self, svc_type_id, name, service_defs,
default=None, expect_errors=False):
data = {self.resource_name:
{'name': name}}
if service_defs is not None:
data[self.resource_name]['service_definitions'] = service_defs
# set this attribute only if True
if default:
data[self.resource_name]['default'] = default
return self.api.put_json(
_get_path('service-types/%s' % str(svc_type_id)), data,
expect_errors=expect_errors)
def _delete_service_type(self, svctype_id, expect_errors=False):
return self.api.delete(_get_path('service-types/%s' % str(svctype_id)),
expect_errors=expect_errors)
def _validate_service_type(self, res, name, service_defs,
svc_type_id=None):
self.assertTrue(self.resource_name in res.json)
svc_type = res.json[self.resource_name]
if svc_type_id:
self.assertEqual(svc_type['id'], svc_type_id)
if name:
self.assertEqual(svc_type['name'], name)
if service_defs:
# unspecified drivers will value None in response
for svc_def in service_defs:
svc_def['driver'] = svc_def.get('driver')
self.assertEqual(svc_type['service_definitions'],
service_defs)
self.assertEqual(svc_type['default'], False)
def _test_service_type_create(self, name='test',
service_defs=DEFAULT_SERVICE_DEFS,
default=None,
expected_status=webexc.HTTPCreated.code):
expect_errors = expected_status >= webexc.HTTPBadRequest.code
res = self._create_service_type(name, service_defs,
default, expect_errors)
self.assertEqual(res.status_int, expected_status)
if not expect_errors:
self.assertEqual(res.status_int, webexc.HTTPCreated.code)
self._validate_service_type(res, name, service_defs)
def _test_service_type_update(self, svc_type_id, name='test-updated',
default=None, service_defs=None,
expected_status=webexc.HTTPOk.code):
expect_errors = expected_status >= webexc.HTTPBadRequest.code
res = self._update_service_type(svc_type_id, name, service_defs,
default, expect_errors)
if not expect_errors:
self.assertEqual(res.status_int, webexc.HTTPOk.code)
self._validate_service_type(res, name, service_defs, svc_type_id)
def test_service_type_create(self):
self._test_service_type_create()
def test_create_service_type_default_returns_400(self):
self._test_service_type_create(
default=True, expected_status=webexc.HTTPBadRequest.code)
def test_create_service_type_no_svcdef_returns_400(self):
self._test_service_type_create(
service_defs=None,
expected_status=webexc.HTTPBadRequest.code)
def test_service_type_update_name(self):
with self.service_type() as svc_type:
self._test_service_type_update(svc_type[self.resource_name]['id'])
def test_service_type_update_set_default_returns_400(self):
with self.service_type() as svc_type:
self._test_service_type_update(
svc_type[self.resource_name]['id'], default=True,
expected_status=webexc.HTTPBadRequest.code)
def test_service_type_update_clear_svc_defs_returns_400(self):
with self.service_type() as svc_type:
self._test_service_type_update(
svc_type[self.resource_name]['id'], service_defs=[],
expected_status=webexc.HTTPBadRequest.code)
def test_service_type_update_svc_defs(self):
with self.service_type() as svc_type:
svc_defs = [{'service': constants.DUMMY,
'plugin': 'foobar'}]
self._test_service_type_update(
svc_type[self.resource_name]['id'], service_defs=svc_defs,
expected_status=webexc.HTTPBadRequest.code)
def test_list_service_types(self):
with contextlib.nested(self.service_type('st1'),
self.service_type('st2')):
res = self._list_service_types()
self.assertEqual(res.status_int, webexc.HTTPOk.code)
data = res.json
self.assertTrue('service_types' in data)
# it must be 3 because we have the default service type too!
self.assertEquals(len(data['service_types']), 3)
def test_get_default_service_type(self):
res = self._list_service_types()
self.assertEqual(res.status_int, webexc.HTTPOk.code)
data = res.json
self.assertTrue('service_types' in data)
self.assertEquals(len(data['service_types']), 1)
def_svc_type = data['service_types'][0]
self.assertEqual(def_svc_type['default'], True)
def test_get_service_type(self):
with self.service_type() as svc_type:
svc_type_data = svc_type[self.resource_name]
res = self._show_service_type(svc_type_data['id'])
self.assertEqual(res.status_int, webexc.HTTPOk.code)
self._validate_service_type(res, svc_type_data['name'],
svc_type_data['service_definitions'],
svc_type_data['id'])
def test_delete_service_type_in_use_returns_409(self):
with self.service_type() as svc_type:
svc_type_data = svc_type[self.resource_name]
mgr = servicetype_db.ServiceTypeManager.get_instance()
ctx = context.Context('', '', is_admin=True)
mgr.increase_service_type_refcount(ctx, svc_type_data['id'])
res = self._delete_service_type(svc_type_data['id'], True)
self.assertEquals(res.status_int, webexc.HTTPConflict.code)
mgr.decrease_service_type_refcount(ctx, svc_type_data['id'])
def test_create_dummy_increases_service_type_refcount(self):
dummy = self._create_dummy()
svc_type_res = self._show_service_type(dummy['service_type'])
svc_type = svc_type_res.json[self.resource_name]
self.assertEquals(svc_type['num_instances'], 1)
def test_delete_dummy_decreases_service_type_refcount(self):
dummy = self._create_dummy()
svc_type_res = self._show_service_type(dummy['service_type'])
svc_type = svc_type_res.json[self.resource_name]
self.assertEquals(svc_type['num_instances'], 1)
self.api.delete(_get_path('dummys/%s' % str(dummy['id'])))
svc_type_res = self._show_service_type(dummy['service_type'])
svc_type = svc_type_res.json[self.resource_name]
self.assertEquals(svc_type['num_instances'], 0)