Merge "Fix concurrent usage of update_access method for share instances"
This commit is contained in:
commit
b280413d2c
manila
@ -38,6 +38,7 @@ from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.sql.expression import true
|
||||
@ -1666,7 +1667,7 @@ def _share_access_get_query(context, session, values, read_deleted='no'):
|
||||
|
||||
def _share_instance_access_query(context, session, access_id=None,
|
||||
instance_id=None):
|
||||
filters = {}
|
||||
filters = {'deleted': 'False'}
|
||||
|
||||
if access_id is not None:
|
||||
filters.update({'access_id': access_id})
|
||||
@ -1765,9 +1766,10 @@ def share_access_get_all_for_instance(context, instance_id, session=None):
|
||||
return _share_access_get_query(context, session, {}).join(
|
||||
models.ShareInstanceAccessMapping,
|
||||
models.ShareInstanceAccessMapping.access_id ==
|
||||
models.ShareAccessMapping.id).filter(
|
||||
models.ShareInstanceAccessMapping.share_instance_id ==
|
||||
instance_id).all()
|
||||
models.ShareAccessMapping.id).filter(and_(
|
||||
models.ShareInstanceAccessMapping.share_instance_id ==
|
||||
instance_id, models.ShareInstanceAccessMapping.deleted ==
|
||||
"False")).all()
|
||||
|
||||
|
||||
@require_context
|
||||
|
@ -18,6 +18,7 @@ import six
|
||||
|
||||
from manila.common import constants
|
||||
from manila.i18n import _LI
|
||||
from manila import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -41,6 +42,26 @@ class ShareInstanceAccess(object):
|
||||
be deleted.
|
||||
:param share_server: Share server model or None
|
||||
"""
|
||||
share_instance = self.db.share_instance_get(
|
||||
context, share_instance_id, with_share_data=True)
|
||||
share_id = share_instance["share_id"]
|
||||
|
||||
@utils.synchronized(
|
||||
"update_access_rules_for_share_%s" % share_id, external=True)
|
||||
def _update_access_rules_locked(*args, **kwargs):
|
||||
return self._update_access_rules(*args, **kwargs)
|
||||
|
||||
_update_access_rules_locked(
|
||||
context=context,
|
||||
share_instance_id=share_instance_id,
|
||||
add_rules=add_rules,
|
||||
delete_rules=delete_rules,
|
||||
share_server=share_server,
|
||||
)
|
||||
|
||||
def _update_access_rules(self, context, share_instance_id, add_rules=None,
|
||||
delete_rules=None, share_server=None):
|
||||
# Reget share instance
|
||||
share_instance = self.db.share_instance_get(
|
||||
context, share_instance_id, with_share_data=True)
|
||||
|
||||
@ -63,8 +84,9 @@ class ShareInstanceAccess(object):
|
||||
context, share_instance['id'])
|
||||
rules = []
|
||||
else:
|
||||
rules = self.db.share_access_get_all_for_instance(
|
||||
_rules = self.db.share_access_get_all_for_instance(
|
||||
context, share_instance['id'])
|
||||
rules = _rules
|
||||
if delete_rules:
|
||||
delete_ids = [rule['id'] for rule in delete_rules]
|
||||
rules = list(filter(lambda r: r['id'] not in delete_ids,
|
||||
@ -72,7 +94,9 @@ class ShareInstanceAccess(object):
|
||||
# NOTE(ganso): trigger maintenance mode
|
||||
if share_instance['access_rules_status'] == (
|
||||
constants.STATUS_ERROR):
|
||||
remove_rules = delete_rules
|
||||
remove_rules = [
|
||||
rule for rule in _rules
|
||||
if rule["id"] in delete_ids]
|
||||
delete_rules = []
|
||||
|
||||
try:
|
||||
@ -109,8 +133,8 @@ class ShareInstanceAccess(object):
|
||||
with_share_data=True)
|
||||
|
||||
if self._check_needs_refresh(context, rules, share_instance):
|
||||
self.update_access_rules(context, share_instance_id,
|
||||
share_server=share_server)
|
||||
self._update_access_rules(context, share_instance_id,
|
||||
share_server=share_server)
|
||||
else:
|
||||
self.db.share_instance_update_access_status(
|
||||
context,
|
||||
|
@ -136,7 +136,9 @@ def locked_share_replica_operation(operation):
|
||||
def wrapped(*args, **kwargs):
|
||||
share_id = kwargs.get('share_id')
|
||||
|
||||
@utils.synchronized("%s" % share_id, external=True)
|
||||
@utils.synchronized(
|
||||
"locked_share_replica_operation_by_share_%s" % share_id,
|
||||
external=True)
|
||||
def locked_operation(*_args, **_kwargs):
|
||||
return operation(*_args, **_kwargs)
|
||||
return locked_operation(*args, **kwargs)
|
||||
|
@ -116,10 +116,7 @@ class ShareInstanceAccessTestCase(test.TestCase):
|
||||
self.share_instance,
|
||||
add_rules[0],
|
||||
share_server=None)
|
||||
self.driver.deny_access.assert_called_with(self.context,
|
||||
self.share_instance,
|
||||
delete_rules[0],
|
||||
share_server=None)
|
||||
self.assertFalse(self.driver.deny_access.called)
|
||||
db.share_instance_update_access_status.assert_called_with(
|
||||
self.context, self.share_instance['id'], constants.STATUS_ACTIVE)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user