Move networking from share manager to driver interface

Several things are implemented:
- Allocation/deallocation now handled by drivers instead of share manager.
It provides flexibility for drivers.
- Network plugin interface was updated to support new approach for
configuration options setting.

Config opts for network plugin can be defined via three sources:
a) using separate config group
b) using config group of back end
c) using DEFAULT config group
Variants (a) and (b) are mutually exclusive, there are switched by
opt 'network_config_group' that belongs to share driver interface.

Implements bp network-helper

Change-Id: I3b05369f01777675c1b834af5ee076d8b7219a0f
This commit is contained in:
Valeriy Ponomaryov 2014-12-12 22:25:42 +02:00
parent 1a2ec3da9b
commit 36db3f3917
13 changed files with 613 additions and 541 deletions

View File

@ -1,4 +1,5 @@
# Copyright 2013 Openstack Foundation
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,26 +16,42 @@
import abc
from oslo.config import cfg
import oslo.utils.importutils
from oslo.utils import importutils
from manila.db import base as db_base
network_opts = [
cfg.StrOpt('network_api_class',
cfg.StrOpt(
'network_api_class',
default='manila.network.neutron.'
'neutron_network_plugin.NeutronNetworkPlugin',
deprecated_group='DEFAULT',
help='The full class name of the Networking API class to use.'),
]
cfg.CONF.register_opts(network_opts)
CONF = cfg.CONF
def API():
importutils = oslo.utils.importutils
network_api_class = cfg.CONF.network_api_class
def API(config_group_name=None):
"""Selects class and config group of network plugin.
:param config_group_name: name of config group to be used for
registration of networking opts.
:returns: instance of networking plugin class
"""
CONF.register_opts(network_opts, group=config_group_name)
if config_group_name:
network_api_class = getattr(CONF, config_group_name).network_api_class
else:
network_api_class = CONF.network_api_class
cls = importutils.import_class(network_api_class)
return cls()
return cls(config_group_name=config_group_name)
class NetworkBaseAPI(object):
class NetworkBaseAPI(db_base.Base):
def __init__(self, db_driver=None):
super(NetworkBaseAPI, self).__init__(db_driver=db_driver)
@abc.abstractmethod
def allocate_network(self, context, network_id, subnet_id, **kwargs):

View File

@ -1,51 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.common import exceptions
from neutronclient.v2_0 import client as clientv20
from oslo.config import cfg
CONF = cfg.CONF
def _get_client(token=None):
params = {
'endpoint_url': CONF.neutron_url,
'timeout': CONF.neutron_url_timeout,
'insecure': CONF.neutron_api_insecure,
'ca_cert': CONF.neutron_ca_certificates_file,
}
if token:
params['token'] = token
params['auth_strategy'] = None
else:
params['username'] = CONF.neutron_admin_username
params['tenant_name'] = CONF.neutron_admin_tenant_name
params['password'] = CONF.neutron_admin_password
params['auth_url'] = CONF.neutron_admin_auth_url
params['auth_strategy'] = CONF.neutron_auth_strategy
return clientv20.Client(**params)
def get_client(context):
if context.is_admin:
token = None
elif not context.auth_token:
raise exceptions.Unauthorized()
else:
token = context.auth_token
return _get_client(token=token)

View File

@ -1,4 +1,5 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2014 Mirantis Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,77 +15,111 @@
# under the License.
from neutronclient.common import exceptions as neutron_client_exc
from neutronclient.v2_0 import client as clientv20
from oslo.config import cfg
from manila import context
from manila.db import base
from manila import exception
from manila.i18n import _LE
from manila.network import neutron
from manila.network.neutron import constants as neutron_constants
from manila.openstack.common import log as logging
neutron_opts = [
cfg.StrOpt('neutron_url',
cfg.StrOpt(
'neutron_url',
default='http://127.0.0.1:9696',
deprecated_name='quantum_url',
deprecated_group='DEFAULT',
help='URL for connecting to neutron.'),
cfg.IntOpt('neutron_url_timeout',
cfg.IntOpt(
'neutron_url_timeout',
default=30,
deprecated_name='quantum_url_timeout',
deprecated_group='DEFAULT',
help='Timeout value for connecting to neutron in seconds.'),
cfg.StrOpt('neutron_admin_username',
cfg.StrOpt(
'neutron_admin_username',
default='neutron',
deprecated_name='quantum_admin_username',
deprecated_group='DEFAULT',
help='Username for connecting to neutron in admin context.'),
cfg.StrOpt('neutron_admin_password',
deprecated_name='quantum_admin_password',
cfg.StrOpt(
'neutron_admin_password',
help='Password for connecting to neutron in admin context.',
deprecated_group='DEFAULT',
secret=True),
cfg.StrOpt('neutron_admin_tenant_name',
cfg.StrOpt(
'neutron_admin_tenant_name',
default='service',
deprecated_name='quantum_admin_tenant_name',
deprecated_group='DEFAULT',
help='Tenant name for connecting to neutron in admin context.'),
cfg.StrOpt('neutron_region_name',
deprecated_name='quantum_region_name',
help='Region name for connecting to neutron in admin context.'),
cfg.StrOpt('neutron_admin_auth_url',
deprecated_name='quantum_admin_auth_url',
cfg.StrOpt(
'neutron_admin_auth_url',
default='http://localhost:5000/v2.0',
deprecated_group='DEFAULT',
help='Auth URL for connecting to neutron in admin context.'),
cfg.BoolOpt('neutron_api_insecure',
cfg.BoolOpt(
'neutron_api_insecure',
default=False,
deprecated_name='quantum_api_insecure',
deprecated_group='DEFAULT',
help='If set, ignore any SSL validation issues.'),
cfg.StrOpt('neutron_auth_strategy',
cfg.StrOpt(
'neutron_auth_strategy',
default='keystone',
deprecated_name='quantum_auth_strategy',
help='Auth strategy for connecting to '
'neutron in admin context.'),
# TODO(berrange) temporary hack until Neutron can pass over the
# name of the OVS bridge it is configured with
cfg.StrOpt('neutron_ovs_bridge',
default='br-int',
deprecated_name='quantum_ovs_bridge',
help='Name of integration bridge used by Open vSwitch.'),
cfg.StrOpt('neutron_ca_certificates_file',
deprecated_group='DEFAULT',
help='Auth strategy for connecting to neutron in admin context.'),
cfg.StrOpt(
'neutron_ca_certificates_file',
deprecated_group='DEFAULT',
help='Location of CA certificates file to use for '
'neutron client requests.'),
]
CONF = cfg.CONF
CONF.register_opts(neutron_opts)
LOG = logging.getLogger(__name__)
class API(base.Base):
"""API for interacting with the neutron 2.x API."""
"""API for interacting with the neutron 2.x API.
def __init__(self):
:param configuration: instance of config or config group.
"""
def __init__(self, config_group_name=None):
super(API, self).__init__()
if config_group_name is None:
config_group_name = 'DEFAULT'
CONF.register_opts(neutron_opts, group=config_group_name)
self.configuration = getattr(CONF, config_group_name, CONF)
self.last_neutron_extension_sync = None
self.extensions = {}
self.client = neutron.get_client(context.get_admin_context())
self.client = self.get_client(context.get_admin_context())
def _get_client(self, token=None):
params = {
'endpoint_url': self.configuration.neutron_url,
'timeout': self.configuration.neutron_url_timeout,
'insecure': self.configuration.neutron_api_insecure,
'ca_cert': self.configuration.neutron_ca_certificates_file,
}
if token:
params['token'] = token
params['auth_strategy'] = None
else:
params['username'] = self.configuration.neutron_admin_username
params['tenant_name'] = (
self.configuration.neutron_admin_tenant_name)
params['password'] = self.configuration.neutron_admin_password
params['auth_url'] = self.configuration.neutron_admin_auth_url
params['auth_strategy'] = self.configuration.neutron_auth_strategy
return clientv20.Client(**params)
def get_client(self, context):
if context.is_admin:
token = None
elif not context.auth_token:
raise neutron_client_exc.Unauthorized()
else:
token = context.auth_token
return self._get_client(token=token)
@property
def admin_tenant_id(self):

View File

@ -13,25 +13,22 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from manila.common import constants
from manila.db import base as db_base
from manila import exception
from manila import network as manila_network
from manila import network
from manila.network.neutron import api as neutron_api
from manila.network.neutron import constants as neutron_constants
from manila.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class NeutronNetworkPlugin(manila_network.NetworkBaseAPI, db_base.Base):
class NeutronNetworkPlugin(network.NetworkBaseAPI):
def __init__(self):
super(NeutronNetworkPlugin, self).__init__()
self.neutron_api = neutron_api.API()
def __init__(self, *args, **kwargs):
db_driver = kwargs.pop('db_driver', None)
super(NeutronNetworkPlugin, self).__init__(db_driver=db_driver)
self.neutron_api = neutron_api.API(*args, **kwargs)
def allocate_network(self, context, share_server, share_network, **kwargs):
"""Allocate network resources using given network information.

View File

@ -19,38 +19,51 @@ Drivers for shares.
import time
from manila import exception
from manila.i18n import _LE
from manila.openstack.common import log as logging
from manila import utils
from oslo.config import cfg
from manila import exception
from manila.i18n import _LE
from manila import network
from manila.openstack.common import log as logging
from manila import utils
LOG = logging.getLogger(__name__)
share_opts = [
# NOTE(rushiagr): Reasonable to define this option at only one place.
cfg.IntOpt('num_shell_tries',
cfg.IntOpt(
'num_shell_tries',
default=3,
help='Number of times to attempt to run flakey shell '
'commands.'),
cfg.IntOpt('reserved_share_percentage',
help='Number of times to attempt to run flakey shell commands.'),
cfg.IntOpt(
'reserved_share_percentage',
default=0,
help='The percentage of backend capacity reserved.'),
cfg.StrOpt('share_backend_name',
cfg.StrOpt(
'share_backend_name',
default=None,
help='The backend name for a given driver implementation.'),
cfg.StrOpt(
'network_config_group',
default=None,
help="Name of the configuration group in the Manila conf file "
"to look for network config options."
"If not set, the share backend's config group will be used."
"If an option is not found within provided group, then"
"'DEFAULT' group will be used for search of option."),
]
ssh_opts = [
cfg.IntOpt('ssh_conn_timeout',
cfg.IntOpt(
'ssh_conn_timeout',
default=60,
help='Backend server SSH connection timeout.'),
cfg.IntOpt('ssh_min_pool_conn',
cfg.IntOpt(
'ssh_min_pool_conn',
default=1,
help='Minimum number of connections in the SSH pool.'),
cfg.IntOpt('ssh_max_pool_conn',
cfg.IntOpt(
'ssh_max_pool_conn',
default=10,
help='Maximum number of connections in the SSH pool.'),
]
@ -100,6 +113,11 @@ class ShareDriver(object):
self.configuration = kwargs.get('configuration', None)
if self.configuration:
self.configuration.append_config_values(share_opts)
network_config_group = (self.configuration.network_config_group or
self.configuration.config_group)
else:
network_config_group = None
self.network_api = network.API(config_group_name=network_config_group)
def create_share(self, context, share, share_server=None):
"""Is called to create share."""
@ -153,8 +171,30 @@ class ShareDriver(object):
return self._stats
def get_network_allocations_number(self):
"""Returns number of network allocations for creating VIFs."""
pass
"""Returns number of network allocations for creating VIFs.
Drivers that use Nova for share servers should return zero (0) here
same as Generic driver does.
Because Nova will handle network resources allocation.
Drivers that handle networking itself should calculate it according
to their own requirements. It can have 1+ network interfaces.
"""
raise NotImplementedError()
def allocate_network(self, context, share_server, share_network,
count=None, **kwargs):
"""Allocate network resources using given network information."""
if count is None:
count = self.get_network_allocations_number()
if count:
kwargs.update(count=count)
self.network_api.allocate_network(
context, share_server, share_network, **kwargs)
def deallocate_network(self, context, share_server_id):
"""Deallocate network resources for the given share server."""
if self.get_network_allocations_number():
self.network_api.deallocate_network(context, share_server_id)
def setup_server(self, network_info, metadata=None):
"""Set up and configures share server with given network parameters."""

View File

@ -63,7 +63,7 @@ CONF.register_opts(EMC_NAS_OPTS)
class EMCShareDriver(driver.ShareDriver):
"""EMC specific NAS driver. Allows for NFS and CIFS NAS storage usage."""
def __init__(self, *args, **kwargs):
super(EMCShareDriver, self).__init__()
super(EMCShareDriver, self).__init__(*args, **kwargs)
self.configuration = kwargs.get('configuration', None)
if self.configuration:
self.configuration.append_config_values(EMC_NAS_OPTS)

View File

@ -265,6 +265,9 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.ShareDriver):
data['free_capacity_gb'] = (smpv.f_bavail * smpv.f_frsize) >> 30
self._stats = data
def get_network_allocations_number(self):
return 0
def create_share(self, ctx, share, share_server=None):
"""Create a sub-directory/share in the GlusterFS volume."""
local_share_path = self._get_local_share_path(share)

View File

@ -373,6 +373,9 @@ class GlusterfsNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
self._restart_gluster_vol(gluster_addr)
def get_network_allocations_number(self):
return 0
def create_share(self, context, share, share_server=None):
"""Create a share using GlusterFS volume.

View File

@ -418,6 +418,9 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.ShareDriver):
LOG.error(msg)
raise exception.GPFSException(msg)
def get_network_allocations_number(self):
return 0
def create_share(self, ctx, share, share_server=None):
"""Create GPFS directory that will be represented as share."""
self._create_share(share)

View File

@ -32,7 +32,6 @@ from manila.i18n import _LE
from manila.i18n import _LI
from manila.i18n import _LW
from manila import manager
from manila import network
from manila.openstack.common import log as logging
from manila import quota
import manila.share.configuration
@ -71,7 +70,6 @@ class ShareManager(manager.SchedulerDependentManager):
share_driver = self.configuration.share_driver
self.driver = importutils.import_object(
share_driver, self.db, configuration=self.configuration)
self.network_api = network.API()
def init_host(self):
"""Initialization for a standalone service."""
@ -436,20 +434,12 @@ class ShareManager(manager.SchedulerDependentManager):
return network_info
def _setup_server(self, context, share_server, metadata=None):
# NOTE(vponomaryov): set network_allocations to 0 before 'try' block
# for case we get exception calling appropriate method. This value will
# be used in exception handling and for case 'setup_server' method was
# not called we won't make redundant actions.
allocation_number = 0
try:
share_network = self.db.share_network_get(
context, share_server['share_network_id'])
allocation_number = self.driver.get_network_allocations_number()
if allocation_number:
self.network_api.allocate_network(
context, share_server, share_network,
count=allocation_number)
# If we reach here, then share_network was updated
self.driver.allocate_network(context, share_server, share_network)
# Get share_network again in case it was updated.
share_network = self.db.share_network_get(
context, share_server['share_network_id'])
@ -483,9 +473,7 @@ class ShareManager(manager.SchedulerDependentManager):
self.db.share_server_update(context, share_server['id'],
{'status': constants.STATUS_ERROR})
if allocation_number:
self.network_api.deallocate_network(
context, share_server['id'])
self.driver.deallocate_network(context, share_server['id'])
def delete_share_server(self, context, share_server):
@ -526,9 +514,4 @@ class ShareManager(manager.SchedulerDependentManager):
_teardown_server()
LOG.info(_LI("Share server deleted successfully."))
# NOTE(vponomaryov): share servers created by Nova do not need
# explicit network allocations release. It is done by Nova itself.
# So, all drivers that use 'service_instance' module do not need
# following operation.
if self.driver.get_network_allocations_number():
self.network_api.deallocate_network(context, share_server['id'])
self.driver.deallocate_network(context, share_server['id'])

View File

@ -1,4 +1,5 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2014 Mirantis Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -21,7 +22,6 @@ from oslo.config import cfg
from manila import context
from manila.db import base
from manila import exception
from manila.network import neutron
from manila.network.neutron import api as neutron_api
from manila.network.neutron import constants as neutron_constants
from manila import test
@ -85,370 +85,485 @@ class NeutronApiTest(test.TestCase):
def setUp(self):
super(NeutronApiTest, self).setUp()
self._create_neutron_api()
@mock.patch.object(base, 'Base', fakes.FakeModel)
@mock.patch.object(context, 'get_admin_context',
mock.Mock(return_value='context'))
@mock.patch.object(neutron, 'get_client',
mock.Mock(return_value=FakeNeutronClient()))
def _create_neutron_api(self):
self.context = context.get_admin_context()
self.stubs.Set(base, 'Base', fakes.FakeModel)
self.stubs.Set(
clientv20, 'Client', mock.Mock(return_value=FakeNeutronClient()))
self.neutron_api = neutron_api.API()
@mock.patch.object(base.Base, '__init__', mock.Mock())
@mock.patch.object(context, 'get_admin_context',
mock.Mock(return_value='context'))
@mock.patch.object(neutron, 'get_client', mock.Mock())
def test_create_api_object(self):
neutron_api.API()
# instantiate Neutron API object
neutron_api_instance = neutron_api.API()
context.get_admin_context.assert_called_once_with()
neutron.get_client.assert_called_once_with('context')
base.Base.__init__.assert_called_once_with()
# Verify results
self.assertTrue(clientv20.Client.called)
self.assertTrue(hasattr(neutron_api_instance, 'client'))
self.assertTrue(hasattr(neutron_api_instance, 'configuration'))
def test_create_api_object_custom_config_group(self):
# Set up test data
fake_config_group_name = 'fake_config_group_name'
# instantiate Neutron API object
obj = neutron_api.API(fake_config_group_name)
# Verify results
self.assertTrue(clientv20.Client.called)
self.assertTrue(hasattr(obj, 'client'))
self.assertTrue(hasattr(obj, 'configuration'))
self.assertEqual(
fake_config_group_name, obj.configuration._group.name)
def test_create_port_with_all_args(self):
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net',
# Set up test data
self.stubs.Set(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True))
port_args = {
'tenant_id': 'test tenant', 'network_id': 'test net',
'host_id': 'test host', 'subnet_id': 'test subnet',
'fixed_ip': 'test ip', 'device_owner': 'test owner',
'device_id': 'test device', 'mac_address': 'test mac',
'security_group_ids': 'test group',
'dhcp_opts': 'test dhcp'}
'dhcp_opts': 'test dhcp',
}
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
# Execute method 'create_port'
port = self.neutron_api.create_port(**port_args)
# Verify results
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['network_id'],
port_args['network_id'])
self.assertEqual(port['binding:host_id'],
port_args['host_id'])
self.assertEqual(port['network_id'], port_args['network_id'])
self.assertEqual(port['binding:host_id'], port_args['host_id'])
self.assertEqual(port['fixed_ips'][0]['subnet_id'],
port_args['subnet_id'])
self.assertEqual(port['fixed_ips'][0]['ip_address'],
port_args['fixed_ip'])
self.assertEqual(port['device_owner'],
port_args['device_owner'])
self.assertEqual(port['device_owner'], port_args['device_owner'])
self.assertEqual(port['device_id'], port_args['device_id'])
self.assertEqual(port['mac_address'],
port_args['mac_address'])
self.assertEqual(port['mac_address'], port_args['mac_address'])
self.assertEqual(port['security_groups'],
port_args['security_group_ids'])
self.assertEqual(port['extra_dhcp_opts'],
port_args['dhcp_opts'])
self.assertEqual(port['extra_dhcp_opts'], port_args['dhcp_opts'])
self.neutron_api._has_port_binding_extension.assert_called_once_with()
self.assertTrue(clientv20.Client.called)
def test_create_port_with_required_args(self):
# Set up test data
self.stubs.Set(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True))
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
# Execute method 'create_port'
port = self.neutron_api.create_port(**port_args)
# Verify results
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['network_id'],
port_args['network_id'])
self.neutron_api._has_port_binding_extension.assert_called_once_with()
self.assertTrue(clientv20.Client.called)
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
def test_create_port_exception(self):
# Set up test data
self.stubs.Set(
self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True))
self.stubs.Set(
self.neutron_api.client, 'create_port',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
client_create_port_mock = mock.Mock(
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
# Execute method 'create_port'
self.assertRaises(exception.NetworkException,
self.neutron_api.create_port,
**port_args)
# Verify results
self.neutron_api._has_port_binding_extension.assert_called_once_with()
self.assertTrue(neutron_api.LOG.exception.called)
self.assertTrue(clientv20.Client.called)
self.assertTrue(self.neutron_api.client.create_port.called)
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
def test_create_port_exception_status_409(self):
# Set up test data
self.stubs.Set(
self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True))
self.stubs.Set(
self.neutron_api.client, 'create_port',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException(
status_code=409)))
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
client_create_port_mock = mock.Mock(
side_effect=neutron_client_exc.NeutronClientException(
status_code=409))
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
# Execute method 'create_port'
self.assertRaises(exception.PortLimitExceeded,
self.neutron_api.create_port,
**port_args)
# Verify results
self.neutron_api._has_port_binding_extension.assert_called_once_with()
self.assertTrue(neutron_api.LOG.exception.called)
self.assertTrue(clientv20.Client.called)
self.assertTrue(self.neutron_api.client.create_port.called)
def test_delete_port(self):
# Set up test data
self.stubs.Set(self.neutron_api.client, 'delete_port', mock.Mock())
port_id = 'test port id'
with mock.patch.object(self.neutron_api.client, 'delete_port',
mock.Mock()) as client_delete_port_mock:
# Execute method 'delete_port'
self.neutron_api.delete_port(port_id)
client_delete_port_mock.assert_called_once_with(port_id)
# Verify results
self.neutron_api.client.delete_port.assert_called_once_with(port_id)
self.assertTrue(clientv20.Client.called)
def test_list_ports(self):
# Set up test data
search_opts = {'test_option': 'test_value'}
fake_ports = [{'fake port': 'fake port info'}]
client_list_ports_mock = mock.Mock(return_value={'ports': fake_ports})
with mock.patch.object(self.neutron_api.client, 'list_ports',
client_list_ports_mock):
self.stubs.Set(
self.neutron_api.client, 'list_ports',
mock.Mock(return_value={'ports': fake_ports}))
# Execute method 'list_ports'
ports = self.neutron_api.list_ports(**search_opts)
client_list_ports_mock.assert_called_once_with(**search_opts)
self.assertEqual(ports, fake_ports)
# Verify results
self.assertEqual(fake_ports, ports)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.list_ports.assert_called_once_with(
**search_opts)
def test_show_port(self):
# Set up test data
port_id = 'test port id'
fake_port = {'fake port': 'fake port info'}
client_show_port_mock = mock.Mock(return_value={'port': fake_port})
with mock.patch.object(self.neutron_api.client, 'show_port',
client_show_port_mock):
self.stubs.Set(
self.neutron_api.client, 'show_port',
mock.Mock(return_value={'port': fake_port}))
# Execute method 'show_port'
port = self.neutron_api.show_port(port_id)
client_show_port_mock.assert_called_once_with(port_id)
self.assertEqual(port, fake_port)
# Verify results
self.assertEqual(fake_port, port)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.show_port.assert_called_once_with(port_id)
def test_get_network(self):
# Set up test data
network_id = 'test network id'
fake_network = {'fake network': 'fake network info'}
client_show_network_mock = mock.Mock(
return_value={'network': fake_network})
with mock.patch.object(self.neutron_api.client, 'show_network',
client_show_network_mock):
self.stubs.Set(
self.neutron_api.client, 'show_network',
mock.Mock(return_value={'network': fake_network}))
# Execute method 'get_network'
network = self.neutron_api.get_network(network_id)
client_show_network_mock.assert_called_once_with(network_id)
self.assertEqual(network, fake_network)
# Verify results
self.assertEqual(fake_network, network)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.show_network.assert_called_once_with(
network_id)
def test_get_subnet(self):
# Set up test data
subnet_id = 'fake subnet id'
self.stubs.Set(
self.neutron_api.client, 'show_subnet',
mock.Mock(return_value={'subnet': {}}))
with mock.patch.object(self.neutron_api.client, 'show_subnet',
mock.Mock(return_value={'subnet': {}})):
# Execute method 'get_subnet'
subnet = self.neutron_api.get_subnet(subnet_id)
# Verify results
self.assertEqual({}, subnet)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.show_subnet.assert_called_once_with(
subnet_id)
self.assertEqual(subnet, {})
def test_get_all_network(self):
# Set up test data
fake_networks = [{'fake network': 'fake network info'}]
client_list_networks_mock = mock.Mock(
return_value={'networks': fake_networks})
with mock.patch.object(self.neutron_api.client, 'list_networks',
client_list_networks_mock):
self.stubs.Set(
self.neutron_api.client, 'list_networks',
mock.Mock(return_value={'networks': fake_networks}))
# Execute method 'get_all_networks'
networks = self.neutron_api.get_all_networks()
client_list_networks_mock.assert_any_call()
self.assertEqual(networks, fake_networks)
# Verify results
self.assertEqual(fake_networks, networks)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.list_networks.assert_called_once_with()
def test_list_extensions(self):
extensions = [{'name': neutron_constants.PORTBINDING_EXT},
{'name': neutron_constants.PROVIDER_NW_EXT}]
with mock.patch.object(
self.neutron_api.client,
'list_extensions',
mock.Mock(return_value={'extensions': extensions})):
# Set up test data
extensions = [
{'name': neutron_constants.PORTBINDING_EXT},
{'name': neutron_constants.PROVIDER_NW_EXT},
]
self.stubs.Set(
self.neutron_api.client, 'list_extensions',
mock.Mock(return_value={'extensions': extensions}))
# Execute method 'list_extensions'
result = self.neutron_api.list_extensions()
self.neutron_api.client.list_extensions.assert_any_call()
self.assertTrue(neutron_constants.PORTBINDING_EXT in result)
self.assertTrue(neutron_constants.PROVIDER_NW_EXT in result)
self.assertEqual(result[neutron_constants.PORTBINDING_EXT],
extensions[0])
self.assertEqual(result[neutron_constants.PROVIDER_NW_EXT],
extensions[1])
# Verify results
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.list_extensions.assert_called_once_with()
self.assertIn(neutron_constants.PORTBINDING_EXT, result)
self.assertIn(neutron_constants.PROVIDER_NW_EXT, result)
self.assertEqual(
extensions[0], result[neutron_constants.PORTBINDING_EXT])
self.assertEqual(
extensions[1], result[neutron_constants.PROVIDER_NW_EXT])
def test_create_network(self):
# Set up test data
net_args = {'tenant_id': 'test tenant', 'name': 'test name'}
# Execute method 'network_create'
network = self.neutron_api.network_create(**net_args)
self.assertEqual(network['tenant_id'], net_args['tenant_id'])
self.assertEqual(network['name'], net_args['name'])
# Verify results
self.assertEqual(net_args['tenant_id'], network['tenant_id'])
self.assertEqual(net_args['name'], network['name'])
self.assertTrue(clientv20.Client.called)
def test_create_subnet(self):
subnet_args = {'tenant_id': 'test tenant', 'name': 'test name',
'net_id': 'test net id', 'cidr': '10.0.0.0/24'}
# Set up test data
subnet_args = {
'tenant_id': 'test tenant',
'name': 'test name',
'net_id': 'test net id',
'cidr': '10.0.0.0/24',
}
# Execute method 'subnet_create'
subnet = self.neutron_api.subnet_create(**subnet_args)
self.assertEqual(subnet['tenant_id'], subnet_args['tenant_id'])
self.assertEqual(subnet['name'], subnet_args['name'])
# Verify results
self.assertEqual(subnet_args['tenant_id'], subnet['tenant_id'])
self.assertEqual(subnet_args['name'], subnet['name'])
self.assertTrue(clientv20.Client.called)
def test_create_router(self):
# Set up test data
router_args = {'tenant_id': 'test tenant', 'name': 'test name'}
# Execute method 'router_create'
router = self.neutron_api.router_create(**router_args)
self.assertEqual(router['tenant_id'], router_args['tenant_id'])
self.assertEqual(router['name'], router_args['name'])
# Verify results
self.assertEqual(router_args['tenant_id'], router['tenant_id'])
self.assertEqual(router_args['name'], router['name'])
self.assertTrue(clientv20.Client.called)
def test_list_routers(self):
# Set up test data
fake_routers = [{'fake router': 'fake router info'}]
client_list_routers_mock = mock.Mock(
return_value={'routers': fake_routers})
with mock.patch.object(self.neutron_api.client, 'list_routers',
client_list_routers_mock):
self.stubs.Set(
self.neutron_api.client, 'list_routers',
mock.Mock(return_value={'routers': fake_routers}))
# Execute method 'router_list'
networks = self.neutron_api.router_list()
client_list_routers_mock.assert_any_call()
self.assertEqual(networks, fake_routers)
# Verify results
self.assertEqual(fake_routers, networks)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.list_routers.assert_called_once_with()
def test_create_network_exception(self):
# Set up test data
net_args = {'tenant_id': 'test tenant', 'name': 'test name'}
self.stubs.Set(
self.neutron_api.client, 'create_network',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'network_create'
self.assertRaises(
exception.NetworkException,
self.neutron_api.network_create,
**net_args)
# Verify results
self.neutron_api.client.create_network.assert_called_once_with(
{'network': net_args})
self.assertTrue(clientv20.Client.called)
def test_create_subnet_exception(self):
subnet_args = {'tenant_id': 'test tenant', 'name': 'test name',
'net_id': 'test net id', 'cidr': '10.0.0.0/24'}
# Set up test data
subnet_args = {
'tenant_id': 'test tenant',
'name': 'test name',
'net_id': 'test net id',
'cidr': '10.0.0.0/24',
}
self.stubs.Set(
self.neutron_api.client, 'create_subnet',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'subnet_create'
self.assertRaises(
exception.NetworkException,
self.neutron_api.subnet_create,
**subnet_args)
# Verify results
expected_data = {
'network_id': subnet_args['net_id'],
'tenant_id': subnet_args['tenant_id'],
'cidr': subnet_args['cidr'],
'name': subnet_args['name'],
'ip_version': 4,
}
self.neutron_api.client.create_subnet.assert_called_once_with(
{'subnet': expected_data})
self.assertTrue(clientv20.Client.called)
def test_create_router_exception(self):
# Set up test data
router_args = {'tenant_id': 'test tenant', 'name': 'test name'}
self.stubs.Set(
self.neutron_api.client, 'create_router',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'router_create'
self.assertRaises(
exception.NetworkException,
self.neutron_api.router_create,
**router_args)
# Verify results
self.neutron_api.client.create_router.assert_called_once_with(
{'router': router_args})
self.assertTrue(clientv20.Client.called)
def test_update_port_fixed_ips(self):
# Set up test data
port_id = 'test_port'
fixed_ips = {'fixed_ips': [{'subnet_id': 'test subnet'}]}
# Execute method 'update_port_fixed_ips'
port = self.neutron_api.update_port_fixed_ips(port_id, fixed_ips)
self.assertEqual(port, fixed_ips)
# Verify results
self.assertEqual(fixed_ips, port)
self.assertTrue(clientv20.Client.called)
def test_update_port_fixed_ips_exception(self):
# Set up test data
port_id = 'test_port'
fixed_ips = {'fixed_ips': [{'subnet_id': 'test subnet'}]}
self.stubs.Set(
self.neutron_api.client, 'update_port',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'update_port_fixed_ips'
self.assertRaises(
exception.NetworkException,
self.neutron_api.update_port_fixed_ips,
port_id, fixed_ips)
# Verify results
self.neutron_api.client.update_port.assert_called_once_with(
port_id, {'port': fixed_ips})
self.assertTrue(clientv20.Client.called)
def test_router_update_routes(self):
# Set up test data
router_id = 'test_router'
routes = {'routes': [{'destination': '0.0.0.0/0',
'nexthop': '8.8.8.8'}]}
routes = {
'routes': [
{'destination': '0.0.0.0/0', 'nexthop': '8.8.8.8', },
],
}
# Execute method 'router_update_routes'
router = self.neutron_api.router_update_routes(router_id, routes)
self.assertEqual(router, routes)
# Verify results
self.assertEqual(routes, router)
self.assertTrue(clientv20.Client.called)
def test_router_update_routes_exception(self):
# Set up test data
router_id = 'test_router'
routes = {'routes': [{'destination': '0.0.0.0/0',
'nexthop': '8.8.8.8'}]}
routes = {
'routes': [
{'destination': '0.0.0.0/0', 'nexthop': '8.8.8.8', },
],
}
self.stubs.Set(
self.neutron_api.client, 'update_router',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'router_update_routes'
self.assertRaises(
exception.NetworkException,
self.neutron_api.router_update_routes,
router_id, routes)
# Verify results
self.neutron_api.client.update_router.assert_called_once_with(
router_id, {'router': routes})
self.assertTrue(clientv20.Client.called)
def test_show_router(self):
# Set up test data
router_id = 'test router id'
fake_router = {'fake router': 'fake router info'}
client_show_router_mock = mock.Mock(return_value={'router':
fake_router})
with mock.patch.object(self.neutron_api.client, 'show_router',
client_show_router_mock):
self.stubs.Set(
self.neutron_api.client, 'show_router',
mock.Mock(return_value={'router': fake_router}))
# Execute method 'show_router'
port = self.neutron_api.show_router(router_id)
client_show_router_mock.assert_called_once_with(router_id)
self.assertEqual(port, fake_router)
# Verify results
self.assertEqual(fake_router, port)
self.assertTrue(clientv20.Client.called)
self.neutron_api.client.show_router.assert_called_once_with(router_id)
def test_router_add_interface(self):
# Set up test data
router_id = 'test port id'
subnet_id = 'test subnet id'
port_id = 'test port id'
with mock.patch.object(
self.neutron_api.client, 'add_interface_router',
mock.Mock()) as client_add_interface_router_mock:
self.stubs.Set(
self.neutron_api.client, 'add_interface_router', mock.Mock())
self.neutron_api.router_add_interface(router_id,
subnet_id,
port_id)
client_add_interface_router_mock.assert_called_once_with(
# Execute method 'router_add_interface'
self.neutron_api.router_add_interface(router_id, subnet_id, port_id)
# Verify results
self.neutron_api.client.add_interface_router.assert_called_once_with(
port_id, {'subnet_id': subnet_id, 'port_id': port_id})
self.assertTrue(clientv20.Client.called)
def test_router_add_interface_exception(self):
# Set up test data
router_id = 'test port id'
subnet_id = 'test subnet id'
port_id = 'test port id'
self.stubs.Set(
self.neutron_api.client, 'add_interface_router',
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
# Execute method 'router_add_interface'
self.assertRaises(
exception.NetworkException,
self.neutron_api.router_add_interface,
router_id, subnet_id, port_id)
class TestNeutronClient(test.TestCase):
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_with_token(self):
client_args = {'endpoint_url': CONF.neutron_url,
'timeout': CONF.neutron_url_timeout,
'insecure': CONF.neutron_api_insecure,
'ca_cert': CONF.neutron_ca_certificates_file,
'token': 'test_token',
'auth_strategy': None}
my_context = context.RequestContext('test_user', 'test_tenant',
auth_token='test_token',
is_admin=False)
neutron.get_client(my_context)
clientv20.Client.__init__.assert_called_once_with(**client_args)
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_no_token(self):
my_context = context.RequestContext('test_user', 'test_tenant',
is_admin=False)
self.assertRaises(neutron_client_exc.Unauthorized,
neutron.get_client,
my_context)
@mock.patch.object(clientv20.Client, '__init__',
mock.Mock(return_value=None))
def test_get_client_admin_context(self):
client_args = {'endpoint_url': CONF.neutron_url,
'timeout': CONF.neutron_url_timeout,
'insecure': CONF.neutron_api_insecure,
'ca_cert': CONF.neutron_ca_certificates_file,
'username': CONF.neutron_admin_username,
'tenant_name': CONF.neutron_admin_tenant_name,
'password': CONF.neutron_admin_password,
'auth_url': CONF.neutron_admin_auth_url,
'auth_strategy': CONF.neutron_auth_strategy}
my_context = context.RequestContext('test_user', 'test_tenant',
is_admin=True)
neutron.get_client(my_context)
clientv20.Client.__init__.assert_called_once_with(**client_args)
# Verify results
self.neutron_api.client.add_interface_router.assert_called_once_with(
router_id, {'subnet_id': subnet_id, 'port_id': port_id})
self.assertTrue(clientv20.Client.called)

View File

@ -1,4 +1,5 @@
# Copyright 2012 NetApp
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -16,8 +17,11 @@
import time
import mock
from manila import exception
import manila.share.configuration
from manila import network
from manila.share import configuration
from manila.share import driver
from manila import test
from manila import utils
@ -32,6 +36,7 @@ def fake_sleep(duration):
class ShareDriverTestCase(test.TestCase):
def setUp(self):
super(ShareDriverTestCase, self).setUp()
self.utils = utils
@ -39,11 +44,40 @@ class ShareDriverTestCase(test.TestCase):
self.time = time
self.stubs.Set(self.time, 'sleep', fake_sleep)
def tearDown(self):
super(ShareDriverTestCase, self).tearDown()
def test__try_execute(self):
execute_mixin = driver.ExecuteMixin(
configuration=manila.share.configuration.Configuration(None))
configuration=configuration.Configuration(None))
self.assertRaises(exception.ProcessExecutionError,
execute_mixin._try_execute)
def _instantiate_share_driver(self, network_config_group):
self.stubs.Set(network, 'API', mock.Mock())
config = mock.Mock()
config.append_config_values = mock.Mock()
config.config_group = 'fake_config_group'
config.network_config_group = network_config_group
share_driver = driver.ShareDriver(configuration=config)
self.assertTrue(hasattr(share_driver, 'configuration'))
config.append_config_values.assert_called_once_with(driver.share_opts)
if network_config_group:
network.API.assert_called_once_with(
config_group_name=config.network_config_group)
else:
network.API.assert_called_once_with(
config_group_name=config.config_group)
def test_instantiate_share_driver(self):
self._instantiate_share_driver(None)
def test_instantiate_share_driver_another_config_group(self):
self._instantiate_share_driver("fake_network_config_group")
def test_instantiate_share_driver_no_configuration(self):
self.stubs.Set(network, 'API', mock.Mock())
share_driver = driver.ShareDriver(configuration=None)
self.assertEqual(None, share_driver.configuration)
network.API.assert_called_once_with(config_group_name=None)

View File

@ -789,10 +789,8 @@ class ShareManagerTestCase(test.TestCase):
acs = db.share_access_get(self.context, access_id)
self.assertEqual(acs['state'], 'error')
def test_setup_server_2_net_allocations(self):
def test_setup_server(self):
# Setup required test data
allocation_number = 2
context = "fake_context"
share_server = {
'id': 'fake_id',
'share_network_id': 'fake_sn_id',
@ -805,10 +803,7 @@ class ShareManagerTestCase(test.TestCase):
# mock required stuff
self.stubs.Set(self.share_manager.db, 'share_network_get',
mock.Mock(return_value=share_network))
self.stubs.Set(self.share_manager.driver,
'get_network_allocations_number',
mock.Mock(return_value=allocation_number))
self.stubs.Set(self.share_manager.network_api, 'allocate_network',
self.stubs.Set(self.share_manager.driver, 'allocate_network',
mock.Mock())
self.stubs.Set(self.share_manager, '_form_server_setup_info',
mock.Mock(return_value=network_info))
@ -822,80 +817,29 @@ class ShareManagerTestCase(test.TestCase):
# execute method _setup_server
result = self.share_manager._setup_server(
context, share_server, metadata=metadata)
self.context, share_server, metadata=metadata)
# verify results
self.assertEqual(share_server, result)
self.share_manager.db.share_network_get.assert_has_calls([
mock.call(context, share_server['share_network_id']),
mock.call(context, share_server['share_network_id']),
mock.call(self.context, share_server['share_network_id']),
mock.call(self.context, share_server['share_network_id']),
])
self.share_manager.driver.get_network_allocations_number.\
assert_called_once_with()
self.share_manager.network_api.allocate_network.\
assert_called_once_with(context, share_server, share_network,
count=allocation_number)
self.share_manager.driver.allocate_network.assert_called_once_with(
self.context, share_server, share_network)
self.share_manager._form_server_setup_info.assert_called_once_with(
context, share_server, share_network)
self.context, share_server, share_network)
self.share_manager.driver.setup_server.assert_called_once_with(
network_info, metadata=metadata)
self.share_manager.db.share_server_backend_details_set.\
assert_called_once_with(context, share_server['id'], server_info)
assert_called_once_with(
self.context, share_server['id'], server_info)
self.share_manager.db.share_server_update.assert_called_once_with(
context, share_server['id'], {'status': constants.STATUS_ACTIVE})
self.context, share_server['id'],
{'status': constants.STATUS_ACTIVE})
def test_setup_server_no_net_allocations(self):
def test_setup_server_server_info_not_present(self):
# Setup required test data
allocation_number = 0
context = "fake_context"
share_server = {
'id': 'fake_id',
'share_network_id': 'fake_sn_id',
}
metadata = {'fake_metadata_key': 'fake_metadata_value'}
share_network = {'id': 'fake_sn_id'}
network_info = {'fake_network_info_key': 'fake_network_info_value'}
server_info = {'fake_server_info_key': 'fake_server_info_value'}
# mock required stuff
self.stubs.Set(self.share_manager.db, 'share_network_get',
mock.Mock(return_value=share_network))
self.stubs.Set(self.share_manager.driver,
'get_network_allocations_number',
mock.Mock(return_value=allocation_number))
self.stubs.Set(self.share_manager, '_form_server_setup_info',
mock.Mock(return_value=network_info))
self.stubs.Set(self.share_manager.driver, 'setup_server',
mock.Mock(return_value=server_info))
self.stubs.Set(self.share_manager.db,
'share_server_backend_details_set',
mock.Mock())
self.stubs.Set(self.share_manager.db, 'share_server_update',
mock.Mock(return_value=share_server))
# execute method _setup_server
result = self.share_manager._setup_server(
context, share_server, metadata=metadata)
# verify results
self.assertEqual(share_server, result)
self.share_manager.db.share_network_get.assert_called_once_with(
context, share_server['share_network_id'])
self.share_manager.driver.get_network_allocations_number.\
assert_called_once_with()
self.share_manager._form_server_setup_info.assert_called_once_with(
context, share_server, share_network)
self.share_manager.driver.setup_server.assert_called_once_with(
network_info, metadata=metadata)
self.share_manager.db.share_server_backend_details_set.\
assert_called_once_with(context, share_server['id'], server_info)
self.share_manager.db.share_server_update.assert_called_once_with(
context, share_server['id'], {'status': constants.STATUS_ACTIVE})
def test_setup_server_server_info_not_present_no_net_allocations(self):
# Setup required test data
allocation_number = 0
context = "fake_context"
share_server = {
'id': 'fake_id',
'share_network_id': 'fake_sn_id',
@ -908,69 +852,36 @@ class ShareManagerTestCase(test.TestCase):
# mock required stuff
self.stubs.Set(self.share_manager.db, 'share_network_get',
mock.Mock(return_value=share_network))
self.stubs.Set(self.share_manager.driver,
'get_network_allocations_number',
mock.Mock(return_value=allocation_number))
self.stubs.Set(self.share_manager, '_form_server_setup_info',
mock.Mock(return_value=network_info))
self.stubs.Set(self.share_manager.driver, 'setup_server',
mock.Mock(return_value=server_info))
self.stubs.Set(self.share_manager.db, 'share_server_update',
mock.Mock(return_value=share_server))
# execute method _setup_server
result = self.share_manager._setup_server(
context, share_server, metadata=metadata)
# verify results
self.assertEqual(share_server, result)
self.share_manager.db.share_network_get.assert_called_once_with(
context, share_server['share_network_id'])
self.share_manager.driver.get_network_allocations_number.\
assert_called_once_with()
self.share_manager._form_server_setup_info.assert_called_once_with(
context, share_server, share_network)
self.share_manager.driver.setup_server.assert_called_once_with(
network_info, metadata=metadata)
self.share_manager.db.share_server_update.assert_called_once_with(
context, share_server['id'], {'status': constants.STATUS_ACTIVE})
def test_setup_server_exception_raised(self):
# Setup required test data
context = "fake_context"
share_server = {
'id': 'fake_id',
'share_network_id': 'fake_sn_id',
}
share_network = {'id': 'fake_sn_id'}
# mock required stuff
self.stubs.Set(self.share_manager.db, 'share_network_get',
mock.Mock(return_value=share_network))
self.stubs.Set(self.share_manager.driver,
'get_network_allocations_number',
mock.Mock(side_effect=exception.ManilaException()))
self.stubs.Set(self.share_manager.db, 'share_server_update',
self.stubs.Set(self.share_manager.driver, 'allocate_network',
mock.Mock())
# execute method _setup_server
self.assertRaises(
exception.ManilaException,
self.share_manager._setup_server,
context,
share_server,
)
self.share_manager.db.share_network_get.assert_called_once_with(
context, share_server['share_network_id'])
self.share_manager.driver.get_network_allocations_number.\
assert_called_once_with()
self.share_manager.db.share_server_update.assert_called_once_with(
context, share_server['id'], {'status': constants.STATUS_ERROR})
result = self.share_manager._setup_server(
self.context, share_server, metadata=metadata)
def setup_server_raise_exception(self, allocation_number,
detail_data_proper):
# verify results
self.assertEqual(share_server, result)
self.share_manager.db.share_network_get.assert_has_calls([
mock.call(self.context, share_server['share_network_id']),
mock.call(self.context, share_server['share_network_id'])])
self.share_manager._form_server_setup_info.assert_called_once_with(
self.context, share_server, share_network)
self.share_manager.driver.setup_server.assert_called_once_with(
network_info, metadata=metadata)
self.share_manager.db.share_server_update.assert_called_once_with(
self.context, share_server['id'],
{'status': constants.STATUS_ACTIVE})
self.share_manager.driver.allocate_network.assert_called_once_with(
self.context, share_server, share_network)
def setup_server_raise_exception(self, detail_data_proper):
# Setup required test data
context = "fake_context"
share_server = {
'id': 'fake_id',
'share_network_id': 'fake_sn_id',
@ -989,14 +900,10 @@ class ShareManagerTestCase(test.TestCase):
# Mock required parameters
self.stubs.Set(self.share_manager.db, 'share_network_get',
mock.Mock(return_value=share_network))
self.stubs.Set(self.share_manager.driver,
'get_network_allocations_number',
mock.Mock(return_value=allocation_number))
self.stubs.Set(self.share_manager.db, 'share_server_update',
mock.Mock())
if allocation_number:
for m in ['deallocate_network', 'allocate_network']:
self.stubs.Set(self.share_manager.network_api, m, mock.Mock())
self.stubs.Set(self.share_manager.driver, m, mock.Mock())
self.stubs.Set(self.share_manager, '_form_server_setup_info',
mock.Mock(return_value=network_info))
self.stubs.Set(self.share_manager.db,
@ -1010,44 +917,30 @@ class ShareManagerTestCase(test.TestCase):
self.assertRaises(
exception.ManilaException,
self.share_manager._setup_server,
context,
self.context,
share_server,
)
# verify results
if detail_data_proper:
self.share_manager.db.share_server_backend_details_set.\
assert_called_once_with(
context, share_server['id'], server_info)
self.share_manager.driver.get_network_allocations_number.\
assert_called_once_with()
self.context, share_server['id'], server_info)
self.share_manager._form_server_setup_info.assert_called_once_with(
context, share_server, share_network)
self.context, share_server, share_network)
self.share_manager.db.share_server_update.assert_called_once_with(
context, share_server['id'], {'status': constants.STATUS_ERROR})
if allocation_number:
self.context, share_server['id'],
{'status': constants.STATUS_ERROR})
self.share_manager.db.share_network_get.assert_has_calls([
mock.call(context, share_server['share_network_id']),
mock.call(context, share_server['share_network_id'])])
self.share_manager.network_api.allocate_network.assert_has_calls([
mock.call(context, share_server, share_network,
count=allocation_number)])
self.share_manager.network_api.deallocate_network.assert_has_calls(
mock.call(context, share_server['id']))
else:
self.share_manager.db.share_network_get.assert_called_once_with(
context, share_server['share_network_id'])
mock.call(self.context, share_server['share_network_id']),
mock.call(self.context, share_server['share_network_id'])])
self.share_manager.driver.allocate_network.assert_has_calls([
mock.call(self.context, share_server, share_network)])
self.share_manager.driver.deallocate_network.assert_has_calls(
mock.call(self.context, share_server['id']))
def test_setup_server_incorrect_detail_data_an2(self):
# an2 - allocation number has value -> 2
self.setup_server_raise_exception(2, detail_data_proper=False)
def test_setup_server_incorrect_detail_data(self):
self.setup_server_raise_exception(detail_data_proper=False)
def test_setup_server_incorrect_detail_data_an0(self):
# an0 - allocation number has value -> 0
self.setup_server_raise_exception(0, detail_data_proper=False)
def test_setup_server_exception_in_driver_an2(self):
# an2 - allocation number has value -> 2
self.setup_server_raise_exception(2, detail_data_proper=True)
def test_setup_server_exception_in_driver_an0(self):
# an0 - allocation number has value -> 0
self.setup_server_raise_exception(2, detail_data_proper=True)
def test_setup_server_exception_in_driver(self):
self.setup_server_raise_exception(detail_data_proper=True)