Removed custom synchronized in service_instance
Replaced using custom synchronized with lockutils.synchronized Removed redundant synchronized from set_up_service_instance Partially-implements bp setup-teardown-server-enhancements Change-Id: I3fc6fe2eb9b7062223f4a0e49ee9bfd00ad24cd4
This commit is contained in:
parent
e41faa875b
commit
33b5aacda3
1
.gitignore
vendored
1
.gitignore
vendored
@ -38,3 +38,4 @@ doc/build
|
||||
|
||||
# Lock dirs and files
|
||||
service_instance_locks
|
||||
attach_detach_locks
|
||||
|
@ -92,9 +92,6 @@ def ensure_server(f):
|
||||
return wrap
|
||||
|
||||
|
||||
synchronized = service_instance.synchronized
|
||||
|
||||
|
||||
class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
"""Executes commands relating to Shares."""
|
||||
|
||||
@ -143,8 +140,6 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
self.service_instance_manager = service_instance.\
|
||||
ServiceInstanceManager(self.db, self._helpers,
|
||||
driver_config=self.configuration)
|
||||
self.share_networks_locks = self.service_instance_manager.\
|
||||
share_networks_locks
|
||||
self.share_networks_servers = self.service_instance_manager.\
|
||||
share_networks_servers
|
||||
self._setup_helpers()
|
||||
@ -157,8 +152,7 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
self._helpers[share_proto.upper()] = helper(
|
||||
self._execute,
|
||||
self._ssh_exec,
|
||||
self.configuration,
|
||||
self.share_networks_locks)
|
||||
self.configuration)
|
||||
|
||||
@ensure_server
|
||||
def create_share(self, context, share, share_server=None):
|
||||
@ -213,38 +207,40 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
"""
|
||||
return os.path.join(self.configuration.share_mount_path, share['name'])
|
||||
|
||||
@synchronized
|
||||
def _attach_volume(self, context, share, instance_id, volume):
|
||||
"""Attaches cinder volume to service vm."""
|
||||
if volume['status'] == 'in-use':
|
||||
attached_volumes = [vol.id for vol in
|
||||
self.compute_api.instance_volumes_list(self.admin_context,
|
||||
instance_id)]
|
||||
if volume['id'] in attached_volumes:
|
||||
return volume
|
||||
else:
|
||||
raise exception.ManilaException(_('Volume %s is already '
|
||||
'attached to another instance') % volume['id'])
|
||||
self.compute_api.instance_volume_attach(self.admin_context,
|
||||
instance_id,
|
||||
volume['id'],
|
||||
)
|
||||
|
||||
t = time.time()
|
||||
while time.time() - t < self.configuration.max_time_to_attach:
|
||||
volume = self.volume_api.get(context, volume['id'])
|
||||
@lockutils.synchronized(instance_id, external=True,
|
||||
lock_path="attach_detach_locks")
|
||||
def do_attach(volume):
|
||||
if volume['status'] == 'in-use':
|
||||
break
|
||||
elif volume['status'] != 'attaching':
|
||||
raise exception.ManilaException(_('Failed to attach volume %s')
|
||||
% volume['id'])
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise exception.ManilaException(_('Volume have not been attached '
|
||||
'in %ss. Giving up') %
|
||||
self.configuration.max_time_to_attach)
|
||||
attached_volumes = [vol.id for vol in
|
||||
self.compute_api.instance_volumes_list(
|
||||
self.admin_context, instance_id)]
|
||||
if volume['id'] in attached_volumes:
|
||||
return volume
|
||||
else:
|
||||
raise exception.ManilaException(
|
||||
_('Volume %s is already attached to another instance')
|
||||
% volume['id'])
|
||||
self.compute_api.instance_volume_attach(self.admin_context,
|
||||
instance_id,
|
||||
volume['id'],
|
||||
)
|
||||
|
||||
return volume
|
||||
t = time.time()
|
||||
while time.time() - t < self.configuration.max_time_to_attach:
|
||||
volume = self.volume_api.get(context, volume['id'])
|
||||
if volume['status'] == 'in-use':
|
||||
return volume
|
||||
elif volume['status'] != 'attaching':
|
||||
raise exception.ManilaException(
|
||||
_('Failed to attach volume %s') % volume['id'])
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise exception.ManilaException(
|
||||
_('Volume have not been attached in %ss. Giving up') %
|
||||
self.configuration.max_time_to_attach)
|
||||
return do_attach(volume)
|
||||
|
||||
def _get_volume(self, context, share_id):
|
||||
"""Finds volume, associated to the specific share."""
|
||||
@ -274,30 +270,34 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
_('Error. Ambiguous volume snaphots'))
|
||||
return volume_snapshot
|
||||
|
||||
@synchronized
|
||||
def _detach_volume(self, context, share, server_details):
|
||||
"""Detaches cinder volume from service vm."""
|
||||
attached_volumes = [vol.id for vol in
|
||||
self.compute_api.instance_volumes_list(
|
||||
instance_id = server_details['instance_id']
|
||||
|
||||
@lockutils.synchronized(instance_id, external=True,
|
||||
lock_path="attach_detach_locks")
|
||||
def do_detach():
|
||||
attached_volumes = [vol.id for vol in
|
||||
self.compute_api.instance_volumes_list(
|
||||
self.admin_context, instance_id)]
|
||||
volume = self._get_volume(context, share['id'])
|
||||
if volume and volume['id'] in attached_volumes:
|
||||
self.compute_api.instance_volume_detach(
|
||||
self.admin_context,
|
||||
server_details['instance_id'])]
|
||||
volume = self._get_volume(context, share['id'])
|
||||
if volume and volume['id'] in attached_volumes:
|
||||
self.compute_api.instance_volume_detach(
|
||||
self.admin_context,
|
||||
server_details['instance_id'],
|
||||
volume['id']
|
||||
)
|
||||
t = time.time()
|
||||
while time.time() - t < self.configuration.max_time_to_attach:
|
||||
volume = self.volume_api.get(context, volume['id'])
|
||||
if volume['status'] in ('available', 'error'):
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise exception.ManilaException(_('Volume have not been '
|
||||
'detached in %ss. Giving up')
|
||||
% self.configuration.max_time_to_attach)
|
||||
instance_id,
|
||||
volume['id']
|
||||
)
|
||||
t = time.time()
|
||||
while time.time() - t < self.configuration.max_time_to_attach:
|
||||
volume = self.volume_api.get(context, volume['id'])
|
||||
if volume['status'] in ('available', 'error'):
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise exception.ManilaException(
|
||||
_('Volume have not been detached in %ss. Giving up')
|
||||
% self.configuration.max_time_to_attach)
|
||||
do_detach()
|
||||
|
||||
def _allocate_container(self, context, share, snapshot=None):
|
||||
"""Creates cinder volume, associated to share by name."""
|
||||
@ -498,8 +498,8 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
server = self.service_instance_manager.set_up_service_instance(
|
||||
context=self.admin_context,
|
||||
share_server_id=srv_id,
|
||||
share_network_id=sn_id,
|
||||
create=True)
|
||||
share_network_id=sn_id
|
||||
)
|
||||
for helper in self._helpers.values():
|
||||
helper.init_helper(server)
|
||||
return server
|
||||
@ -524,11 +524,10 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
||||
class NASHelperBase(object):
|
||||
"""Interface to work with share."""
|
||||
|
||||
def __init__(self, execute, ssh_execute, config_object, locks):
|
||||
def __init__(self, execute, ssh_execute, config_object):
|
||||
self.configuration = config_object
|
||||
self._execute = execute
|
||||
self._ssh_exec = ssh_execute
|
||||
self.share_networks_locks = locks
|
||||
|
||||
def init_helper(self, server):
|
||||
pass
|
||||
|
@ -88,31 +88,6 @@ CONF.register_opts(server_opts)
|
||||
lock = threading.Lock()
|
||||
|
||||
|
||||
def synchronized(f):
|
||||
"""Decorates function with unique locks for each share network.
|
||||
|
||||
Share network id must be provided either as value/attribute
|
||||
of one of args or as named argument.
|
||||
"""
|
||||
|
||||
def wrapped_func(self, *args, **kwargs):
|
||||
share_network_id = kwargs.get('share_network_id', None)
|
||||
if not share_network_id:
|
||||
for arg in args:
|
||||
share_network_id = getattr(arg, 'share_network_id', None)
|
||||
if isinstance(arg, dict):
|
||||
share_network_id = arg.get('share_network_id', None)
|
||||
if share_network_id:
|
||||
break
|
||||
else:
|
||||
msg = _("Could not get share network id.")
|
||||
raise exception.ServiceInstanceException(msg)
|
||||
with self.share_networks_locks.setdefault(share_network_id,
|
||||
threading.Lock()):
|
||||
return f(self, *args, **kwargs)
|
||||
return wrapped_func
|
||||
|
||||
|
||||
class ServiceInstanceManager(object):
|
||||
"""Manages nova instances for various share drivers.
|
||||
|
||||
@ -166,7 +141,6 @@ class ServiceInstanceManager(object):
|
||||
else:
|
||||
raise exception.ServiceInstanceException(_('Can not receive '
|
||||
'service tenant id.'))
|
||||
self.share_networks_locks = {}
|
||||
self.share_networks_servers = {}
|
||||
self.service_network_id = self._get_service_network()
|
||||
self.vif_driver = importutils.import_class(
|
||||
@ -291,18 +265,13 @@ class ServiceInstanceManager(object):
|
||||
'been deleted in %ss. Giving up.') %
|
||||
self.max_time_to_build_instance)
|
||||
|
||||
@synchronized
|
||||
def set_up_service_instance(self, context, share_server_id,
|
||||
share_network_id, create=False,
|
||||
return_inactive=False):
|
||||
share_network_id):
|
||||
"""Finds or creates and sets up service vm.
|
||||
|
||||
:param context: defines context, that should be used
|
||||
:param share_network_id: it provides network data for service VM
|
||||
:param share_server_id: provides server id for service VM
|
||||
:param create: allow create service VM or not
|
||||
:param return_inactive: allows to return not active VM, without
|
||||
raise of exception
|
||||
:returns: dict with data for service VM
|
||||
:raises: exception.ServiceInstanceException
|
||||
"""
|
||||
|
@ -21,6 +21,7 @@ import os
|
||||
|
||||
from manila import context
|
||||
from manila import exception
|
||||
from manila.openstack.common import lockutils
|
||||
from manila.share.drivers import service_instance
|
||||
from manila import test
|
||||
from manila.tests.db import fakes as db_fakes
|
||||
@ -67,8 +68,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
self._manager.admin_context = self._context
|
||||
self._manager._execute = mock.Mock(return_value=('', ''))
|
||||
self._manager.vif_driver = mock.Mock()
|
||||
self.stubs.Set(service_instance, 'synchronized', mock.Mock(side_effect=
|
||||
lambda f: f))
|
||||
self.stubs.Set(lockutils, 'synchronized',
|
||||
mock.Mock(return_value=lambda f: f))
|
||||
self.stubs.Set(service_instance.os.path, 'exists',
|
||||
mock.Mock(return_value=True))
|
||||
self._manager._helpers = {
|
||||
@ -221,12 +222,12 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
mock.Mock(return_value='fake_ip'))
|
||||
self.stubs.Set(self._manager.compute_api, 'server_list',
|
||||
mock.Mock(return_value=[]))
|
||||
create = mock.Mock(return_value=fake_server)
|
||||
self.stubs.Set(self._manager, '_create_service_instance',
|
||||
mock.Mock(return_value=fake_server))
|
||||
|
||||
create)
|
||||
result = self._manager.set_up_service_instance(
|
||||
self._context, share_server_id='fake_share_srv_id',
|
||||
share_network_id='fake_share_network_id', create=True)
|
||||
share_network_id='fake_share_network_id')
|
||||
|
||||
self._manager.compute_api.server_list.assert_called_once()
|
||||
self._manager._get_server_ip.assert_called_once()
|
||||
|
@ -23,6 +23,7 @@ from oslo.config import cfg
|
||||
from manila import compute
|
||||
from manila import context
|
||||
from manila import exception
|
||||
from manila.openstack.common import lockutils
|
||||
from manila.share.configuration import Configuration
|
||||
from manila.share.drivers import generic
|
||||
from manila import test
|
||||
@ -114,8 +115,8 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
share_network_id=self.fake_sn["id"], old_server_ip="fake")
|
||||
|
||||
self._driver._ssh_exec = mock.Mock(return_value=('', ''))
|
||||
self.stubs.Set(generic, 'synchronized', mock.Mock(side_effect=
|
||||
lambda f: f))
|
||||
self.stubs.Set(lockutils, 'synchronized',
|
||||
mock.Mock(return_value=lambda f: f))
|
||||
self.stubs.Set(generic.os.path, 'exists', mock.Mock(return_value=True))
|
||||
self._driver._helpers = {
|
||||
'CIFS': self._helper_cifs,
|
||||
@ -153,8 +154,8 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
self._helper_nfs.assert_called_once_with(
|
||||
self._execute,
|
||||
self._driver._ssh_exec,
|
||||
self.fake_conf,
|
||||
self._driver.share_networks_locks)
|
||||
self.fake_conf
|
||||
)
|
||||
self.assertEqual(len(self._driver._helpers), 1)
|
||||
|
||||
def test_create_share(self):
|
||||
@ -588,7 +589,7 @@ class NFSHelperTestCase(test.TestCase):
|
||||
self._ssh_exec = mock.Mock(return_value=('', ''))
|
||||
self._execute = mock.Mock(return_value=('', ''))
|
||||
self._helper = generic.NFSHelper(self._execute, self._ssh_exec,
|
||||
self.fake_conf, {})
|
||||
self.fake_conf)
|
||||
|
||||
def test_create_export(self):
|
||||
fake_server = fake_compute.FakeServer(ip='10.254.0.3')
|
||||
@ -632,7 +633,7 @@ class CIFSHelperTestCase(test.TestCase):
|
||||
self._ssh_exec = mock.Mock(return_value=('', ''))
|
||||
self._execute = mock.Mock(return_value=('', ''))
|
||||
self._helper = generic.CIFSHelper(self._execute, self._ssh_exec,
|
||||
self.fake_conf, {})
|
||||
self.fake_conf)
|
||||
|
||||
def test_create_export(self):
|
||||
#fake_server = fake_compute.FakeServer(ip='10.254.0.3',
|
||||
|
Loading…
x
Reference in New Issue
Block a user