Merge "Create replacement resource atomically"
This commit is contained in:
commit
365678131a
heat
db/sqlalchemy
engine
objects
tests
@ -18,6 +18,7 @@ import random
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exception
|
||||
from oslo_db import options
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import utils
|
||||
@ -443,6 +444,32 @@ def resource_create(context, values):
|
||||
return resource_ref
|
||||
|
||||
|
||||
def resource_create_replacement(context,
|
||||
existing_res_id, existing_res_values,
|
||||
new_res_values,
|
||||
atomic_key, expected_engine_id=None):
|
||||
session = context.session
|
||||
try:
|
||||
with session.begin(subtransactions=True):
|
||||
new_res = resource_create(context, new_res_values)
|
||||
update_data = {'replaced_by': new_res.id}
|
||||
update_data.update(existing_res_values)
|
||||
if not resource_update(context,
|
||||
existing_res_id, update_data,
|
||||
atomic_key,
|
||||
expected_engine_id=expected_engine_id):
|
||||
data = {}
|
||||
if 'name' in new_res_values:
|
||||
data['resource_name'] = new_res_values['name']
|
||||
raise exception.UpdateInProgress(**data)
|
||||
except db_exception.DBReferenceError as exc:
|
||||
# New template_id no longer exists
|
||||
LOG.debug('Not creating replacement resource: %s', exc)
|
||||
return None
|
||||
else:
|
||||
return new_res
|
||||
|
||||
|
||||
def resource_get_all_by_stack(context, stack_id, filters=None):
|
||||
query = context.session.query(
|
||||
models.Resource
|
||||
|
@ -93,10 +93,15 @@ class CheckResource(object):
|
||||
# Another concurrent update has taken over. But there is a
|
||||
# possibility for that update to be waiting for this rsrc to
|
||||
# complete, hence retrigger current rsrc for latest traversal.
|
||||
traversal = stack.current_traversal
|
||||
latest_stack = parser.Stack.load(cnxt, stack_id=stack.id,
|
||||
self._retrigger_new_traversal(cnxt, stack.current_traversal,
|
||||
is_update,
|
||||
stack.id, rsrc_id)
|
||||
|
||||
def _retrigger_new_traversal(self, cnxt, current_traversal, is_update,
|
||||
stack_id, rsrc_id):
|
||||
latest_stack = parser.Stack.load(cnxt, stack_id=stack_id,
|
||||
force_reload=True)
|
||||
if traversal != latest_stack.current_traversal:
|
||||
if current_traversal != latest_stack.current_traversal:
|
||||
self.retrigger_check_resource(cnxt, is_update, rsrc_id,
|
||||
latest_stack)
|
||||
|
||||
@ -104,6 +109,30 @@ class CheckResource(object):
|
||||
failure_reason = u'Timed out'
|
||||
self._handle_failure(cnxt, stack, failure_reason)
|
||||
|
||||
def _handle_resource_replacement(self, cnxt,
|
||||
current_traversal, new_tmpl_id,
|
||||
rsrc, stack, adopt_stack_data):
|
||||
"""Create a replacement resource and trigger a check on it."""
|
||||
try:
|
||||
new_res_id = rsrc.make_replacement(new_tmpl_id)
|
||||
except exception.UpdateInProgress:
|
||||
LOG.info("No replacement created - "
|
||||
"resource already locked by new traversal")
|
||||
return
|
||||
if new_res_id is None:
|
||||
LOG.info("No replacement created - "
|
||||
"new traversal already in progress")
|
||||
self._retrigger_new_traversal(cnxt, current_traversal, True,
|
||||
stack.id, rsrc.id)
|
||||
return
|
||||
LOG.info("Replacing resource with new id %s", new_res_id)
|
||||
rpc_data = sync_point.serialize_input_data(self.input_data)
|
||||
self._rpc_client.check_resource(cnxt,
|
||||
new_res_id,
|
||||
current_traversal,
|
||||
rpc_data, True,
|
||||
adopt_stack_data)
|
||||
|
||||
def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data,
|
||||
is_update, rsrc, stack, adopt_stack_data):
|
||||
try:
|
||||
@ -113,15 +142,10 @@ class CheckResource(object):
|
||||
self.engine_id,
|
||||
stack, self.msg_queue)
|
||||
except resource.UpdateReplace:
|
||||
new_res_id = rsrc.make_replacement(tmpl.id)
|
||||
LOG.info("Replacing resource with new id %s",
|
||||
new_res_id)
|
||||
rpc_data = sync_point.serialize_input_data(self.input_data)
|
||||
self._rpc_client.check_resource(cnxt,
|
||||
new_res_id,
|
||||
current_traversal,
|
||||
rpc_data, is_update,
|
||||
adopt_stack_data)
|
||||
self._handle_resource_replacement(cnxt, current_traversal,
|
||||
tmpl.id,
|
||||
rsrc, stack,
|
||||
adopt_stack_data)
|
||||
return False
|
||||
|
||||
else:
|
||||
|
@ -361,11 +361,18 @@ class Resource(status.ResourceStatus):
|
||||
return resource, initial_stk_defn, curr_stack
|
||||
|
||||
def make_replacement(self, new_tmpl_id):
|
||||
"""Create a replacement resource in the database.
|
||||
|
||||
Returns the DB ID of the new resource, or None if the new resource
|
||||
cannot be created (generally because the template ID does not exist).
|
||||
Raises UpdateInProgress if another traversal has already locked the
|
||||
current resource.
|
||||
"""
|
||||
# 1. create the replacement with "replaces" = self.id
|
||||
# Don't set physical_resource_id so that a create is triggered.
|
||||
rs = {'stack_id': self.stack.id,
|
||||
'name': self.name,
|
||||
'rsrc_prop_data_id': self._create_or_replace_rsrc_prop_data(),
|
||||
'rsrc_prop_data_id': None,
|
||||
'needed_by': self.needed_by,
|
||||
'requires': self.requires,
|
||||
'replaces': self.id,
|
||||
@ -374,13 +381,39 @@ class Resource(status.ResourceStatus):
|
||||
'current_template_id': new_tmpl_id,
|
||||
'stack_name': self.stack.name,
|
||||
'root_stack_id': self.root_stack_id}
|
||||
new_rs = resource_objects.Resource.create(self.context, rs)
|
||||
update_data = {'status': self.COMPLETE}
|
||||
|
||||
# 2. update the current resource to be replaced_by the one above.
|
||||
# Retry in case a signal has updated the atomic_key
|
||||
attempts = max(cfg.CONF.client_retry_limit, 0) + 1
|
||||
|
||||
def prepare_attempt(fn, attempt):
|
||||
if attempt > 1:
|
||||
res_obj = resource_objects.Resource.get_obj(
|
||||
self.context, self.id)
|
||||
if (res_obj.engine_id is not None or
|
||||
res_obj.updated_at != self.updated_time):
|
||||
raise exception.UpdateInProgress(resource_name=self.name)
|
||||
self._atomic_key = res_obj.atomic_key
|
||||
|
||||
@tenacity.retry(
|
||||
stop=tenacity.stop_after_attempt(attempts),
|
||||
retry=tenacity.retry_if_exception_type(
|
||||
exception.UpdateInProgress),
|
||||
before=prepare_attempt,
|
||||
wait=tenacity.wait_random(max=2),
|
||||
reraise=True)
|
||||
def create_replacement():
|
||||
return resource_objects.Resource.replacement(self.context,
|
||||
self.id,
|
||||
update_data,
|
||||
rs,
|
||||
self._atomic_key)
|
||||
|
||||
new_rs = create_replacement()
|
||||
if new_rs is None:
|
||||
return None
|
||||
self._incr_atomic_key(self._atomic_key)
|
||||
self.replaced_by = new_rs.id
|
||||
resource_objects.Resource.update_by_id(
|
||||
self.context, self.id,
|
||||
{'status': self.COMPLETE, 'replaced_by': self.replaced_by})
|
||||
return new_rs.id
|
||||
|
||||
def reparse(self, client_resolve=True):
|
||||
|
@ -199,6 +199,21 @@ class Resource(
|
||||
return cls._from_db_object(cls(context), context,
|
||||
db_api.resource_create(context, values))
|
||||
|
||||
@classmethod
|
||||
def replacement(cls, context,
|
||||
existing_res_id, existing_res_values,
|
||||
new_res_values,
|
||||
atomic_key=0, expected_engine_id=None):
|
||||
replacement = db_api.resource_create_replacement(context,
|
||||
existing_res_id,
|
||||
existing_res_values,
|
||||
new_res_values,
|
||||
atomic_key,
|
||||
expected_engine_id)
|
||||
if replacement is None:
|
||||
return None
|
||||
return cls._from_db_object(cls(context), context, replacement)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, context, resource_id):
|
||||
db_api.resource_delete(context, resource_id)
|
||||
|
@ -1387,6 +1387,7 @@ def create_resource(ctx, stack, legacy_prop_data=False, **kwargs):
|
||||
'status_reason': 'create_complete',
|
||||
'rsrc_metadata': json.loads('{"foo": "123"}'),
|
||||
'stack_id': stack.id,
|
||||
'atomic_key': 1,
|
||||
}
|
||||
if not legacy_prop_data:
|
||||
values['rsrc_prop_data'] = rpd
|
||||
@ -2543,6 +2544,135 @@ class DBAPIResourceTest(common.HeatTestCase):
|
||||
self.assertEqual({'engine-001', 'engine-002'}, engines)
|
||||
|
||||
|
||||
class DBAPIResourceReplacementTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
self.useFixture(utils.ForeignKeyConstraintFixture())
|
||||
super(DBAPIResourceReplacementTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
self.template = create_raw_template(self.ctx)
|
||||
self.user_creds = create_user_creds(self.ctx)
|
||||
self.stack = create_stack(self.ctx, self.template, self.user_creds)
|
||||
|
||||
def test_resource_create_replacement(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
|
||||
repl = db_api.resource_create_replacement(
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id, 'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
self.assertIsNotNone(repl)
|
||||
self.assertEqual(orig.name, repl.name)
|
||||
self.assertNotEqual(orig.id, repl.id)
|
||||
self.assertEqual(orig.id, repl.replaces)
|
||||
|
||||
def test_resource_create_replacement_template_gone(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
other_ctx = utils.dummy_context()
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
db_api.raw_template_delete(other_ctx, tmpl_id)
|
||||
|
||||
repl = db_api.resource_create_replacement(
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id, 'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
self.assertIsNone(repl)
|
||||
|
||||
def test_resource_create_replacement_updated(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
other_ctx = utils.dummy_context()
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
db_api.resource_update_and_save(other_ctx, orig.id, {'atomic_key': 2})
|
||||
|
||||
self.assertRaises(exception.UpdateInProgress,
|
||||
db_api.resource_create_replacement,
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id,
|
||||
'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
def test_resource_create_replacement_updated_concurrent(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
other_ctx = utils.dummy_context()
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
|
||||
def update_atomic_key(*args, **kwargs):
|
||||
db_api.resource_update_and_save(other_ctx, orig.id,
|
||||
{'atomic_key': 2})
|
||||
|
||||
self.patchobject(db_api, 'resource_update',
|
||||
new=mock.Mock(wraps=db_api.resource_update,
|
||||
side_effect=update_atomic_key))
|
||||
|
||||
self.assertRaises(exception.UpdateInProgress,
|
||||
db_api.resource_create_replacement,
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id,
|
||||
'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
def test_resource_create_replacement_locked(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
other_ctx = utils.dummy_context()
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
db_api.resource_update_and_save(other_ctx, orig.id, {'engine_id': 'a',
|
||||
'atomic_key': 2})
|
||||
|
||||
self.assertRaises(exception.UpdateInProgress,
|
||||
db_api.resource_create_replacement,
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id,
|
||||
'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
def test_resource_create_replacement_locked_concurrent(self):
|
||||
orig = create_resource(self.ctx, self.stack)
|
||||
|
||||
other_ctx = utils.dummy_context()
|
||||
tmpl_id = create_raw_template(self.ctx).id
|
||||
|
||||
def lock_resource(*args, **kwargs):
|
||||
db_api.resource_update_and_save(other_ctx, orig.id,
|
||||
{'engine_id': 'a',
|
||||
'atomic_key': 2})
|
||||
|
||||
self.patchobject(db_api, 'resource_update',
|
||||
new=mock.Mock(wraps=db_api.resource_update,
|
||||
side_effect=lock_resource))
|
||||
|
||||
self.assertRaises(exception.UpdateInProgress,
|
||||
db_api.resource_create_replacement,
|
||||
self.ctx,
|
||||
orig.id,
|
||||
{'status_reason': 'test replacement'},
|
||||
{'name': orig.name, 'replaces': orig.id,
|
||||
'stack_id': orig.stack_id,
|
||||
'current_template_id': tmpl_id},
|
||||
1, None)
|
||||
|
||||
|
||||
class DBAPIStackLockTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(DBAPIStackLockTest, self).setUp()
|
||||
|
@ -15,6 +15,7 @@ import random
|
||||
import string
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mox
|
||||
from oslo_config import cfg
|
||||
from oslo_db import options
|
||||
@ -187,3 +188,15 @@ class JsonEquals(mox.Comparator):
|
||||
|
||||
def __repr__(self):
|
||||
return "<equals to json '%s'>" % self.other_json
|
||||
|
||||
|
||||
class ForeignKeyConstraintFixture(fixtures.Fixture):
|
||||
def __init__(self, sqlite_fk=True):
|
||||
self.enable_fkc = sqlite_fk
|
||||
|
||||
def _setUp(self):
|
||||
new_context = db_api.db_context.make_new_manager()
|
||||
new_context.configure(sqlite_fk=self.enable_fkc)
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(db_api, '_facade', None))
|
||||
self.addCleanup(db_api.db_context.patch_factory(new_context._factory))
|
||||
|
Loading…
x
Reference in New Issue
Block a user