This patch implements the spec of reverting volume to latest snapshot. Related tempest and client patches: [1] https://review.openstack.org/#/c/463906/ [2] https://review.openstack.org/#/c/464903/ APIImpact DocImpact Partial-Implements: blueprint revert-volume-to-snapshot Change-Id: Ib20d749c2118c350b5fa0361ed1811296d518a17
913 lines
38 KiB
913 lines
38 KiB
# Copyright 2015 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import ddt
import uuid
from iso8601 import iso8601
import mock
from oslo_versionedobjects import fields
from sqlalchemy import sql
from cinder import context
from cinder import db
from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import objects
from cinder.objects import fields as c_fields
from cinder import test
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_objects
from cinder.tests.unit import objects as test_objects
class TestCinderObjectVersionHistory(test_objects.BaseObjectsTestCase):
def test_add(self):
history = test_objects.obj_base.CinderObjectVersionsHistory()
v10 = {'Backup': '2.0'}
v11 = {'Backup': '2.1'}
history.add('1.0', v10)
history.add('1.1', v11)
# We have 3 elements because we have the liberty version by default
self.assertEqual(2 + 1, len(history))
expected_v10 = history['liberty'].copy()
expected_v11 = history['liberty'].copy()
self.assertEqual('1.1', history.get_current())
self.assertEqual(expected_v11, history.get_current_versions())
self.assertEqual(expected_v10, history['1.0'])
def test_add_existing(self):
history = test_objects.obj_base.CinderObjectVersionsHistory()
history.add('1.0', {'Backup': '1.0'})
history.add, '1.0', {'Backup': '1.0'})
class TestCinderObject(test_objects.BaseObjectsTestCase):
"""Tests methods from CinderObject."""
def setUp(self):
super(TestCinderObject, self).setUp()
self.obj = fake_objects.ChildObject(
def test_cinder_obj_get_changes_no_changes(self):
self.assertDictEqual({}, self.obj.cinder_obj_get_changes())
def test_cinder_obj_get_changes_other_changes(self):
self.obj.text = 'text2'
self.assertDictEqual({'text': 'text2'},
def test_cinder_obj_get_changes_datetime_no_tz(self):
now = datetime.datetime.utcnow()
self.obj.scheduled_at = now
self.assertDictEqual({'scheduled_at': now},
def test_cinder_obj_get_changes_datetime_tz_utc(self):
now_tz = iso8601.parse_date('2015-06-26T22:00:01Z')
now = now_tz.replace(tzinfo=None)
self.obj.scheduled_at = now_tz
self.assertDictEqual({'scheduled_at': now},
def test_cinder_obj_get_changes_datetime_tz_non_utc_positive(self):
now_tz = iso8601.parse_date('2015-06-26T22:00:01+01')
now = now_tz.replace(tzinfo=None) - datetime.timedelta(hours=1)
self.obj.scheduled_at = now_tz
self.assertDictEqual({'scheduled_at': now},
def test_cinder_obj_get_changes_datetime_tz_non_utc_negative(self):
now_tz = iso8601.parse_date('2015-06-26T10:00:01-05')
now = now_tz.replace(tzinfo=None) + datetime.timedelta(hours=5)
self.obj.scheduled_at = now_tz
self.assertDictEqual({'scheduled_at': now},
def test_refresh(self, get_by_id):
class MyTestObject(objects.base.CinderObject,
fields = {'id': fields.UUIDField(),
'name': fields.StringField()}
test_obj = MyTestObject(id=fake.OBJECT_ID, name='foo')
refresh_obj = MyTestObject(id=fake.OBJECT_ID, name='bar')
get_by_id.return_value = refresh_obj
self._compare(self, refresh_obj, test_obj)
def test_refresh_readonly(self, get_by_id_mock):
class MyTestObject(objects.base.CinderObject,
fields = {'id': fields.UUIDField(),
'name': fields.StringField(read_only=True)}
test_obj = MyTestObject(id=fake.OBJECT_ID, name='foo')
refresh_obj = MyTestObject(id=fake.OBJECT_ID, name='bar')
get_by_id_mock.return_value = refresh_obj
self._compare(self, refresh_obj, test_obj)
def test_refresh_no_id_field(self):
class MyTestObjectNoId(objects.base.CinderObject,
fields = {'uuid': fields.UUIDField()}
test_obj = MyTestObjectNoId(uuid=fake.OBJECT_ID, name='foo')
self.assertRaises(NotImplementedError, test_obj.refresh)
@mock.patch('cinder.objects.base.objects', mock.Mock())
def test_cls_init(self):
"""Test that class init method gets called on registration."""
class MyTestObject(objects.base.CinderObject,
cinder_ovo_cls_init = mock.Mock()
class TestCinderComparableObject(test_objects.BaseObjectsTestCase):
def test_comparable_objects(self):
class MyComparableObj(objects.base.CinderObject,
fields = {'foo': fields.Field(fields.Integer())}
class NonVersionedObject(object):
obj1 = MyComparableObj(foo=1)
obj2 = MyComparableObj(foo=1)
obj3 = MyComparableObj(foo=2)
obj4 = NonVersionedObject()
self.assertTrue(obj1 == obj2)
self.assertFalse(obj1 == obj3)
self.assertFalse(obj1 == obj4)
self.assertNotEqual(obj1, None)
class TestCinderObjectConditionalUpdate(test.TestCase):
def setUp(self):
super(TestCinderObjectConditionalUpdate, self).setUp()
self.context = context.get_admin_context()
def _create_volume(self):
vol = {
'display_description': 'Test Desc',
'size': 1,
'status': 'available',
'availability_zone': 'az',
'host': 'dummy',
'attach_status': c_fields.VolumeAttachStatus.DETACHED,
volume = objects.Volume(context=self.context, **vol)
return volume
def _create_snapshot(self, volume):
snapshot = objects.Snapshot(context=self.context, volume_id=volume.id)
return snapshot
def _check_volume(self, volume, status, size, reload=False, dirty_keys=(),
if reload:
volume = objects.Volume.get_by_id(self.context, volume.id)
self.assertEqual(status, volume.status)
self.assertEqual(size, volume.size)
dirty = volume.cinder_obj_get_changes()
self.assertEqual(list(dirty_keys), list(dirty.keys()))
for key, value in kwargs.items():
self.assertEqual(value, getattr(volume, key))
def test_conditional_update_non_iterable_expected(self):
volume = self._create_volume()
# We also check that we can check for None values
{'status': 'deleting', 'size': 2},
{'status': 'available', 'migration_status': None}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 2)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 2, True)
def test_conditional_update_non_iterable_expected_model_field(self):
volume = self._create_volume()
# We also check that we can check for None values
{'status': 'deleting', 'size': 2,
'previous_status': volume.model.status},
{'status': 'available', 'migration_status': None}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 2, previous_status='available')
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 2, True,
def test_conditional_update_non_iterable_expected_save_all(self):
volume = self._create_volume()
volume.size += 1
# We also check that we can check for not None values
{'status': 'deleting'},
{'status': 'available', 'availability_zone': volume.Not(None)},
# Check that the object in memory has been updated and that the size
# is not a dirty key
self._check_volume(volume, 'deleting', 2)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 2, True)
def test_conditional_update_non_iterable_expected_dont_save_all(self):
volume = self._create_volume()
volume.size += 1
{'status': 'deleting'},
{'status': 'available'}, save_all=False))
# Check that the object in memory has been updated with the new status
# but that size has not been saved and is a dirty key
self._check_volume(volume, 'deleting', 2, False, ['size'])
# Check that the volume in the DB also has been updated but not the
# size
self._check_volume(volume, 'deleting', 1, True)
def test_conditional_update_fail_non_iterable_expected_save_all(self):
volume = self._create_volume()
volume.size += 1
{'status': 'available'},
{'status': 'deleting'}, save_all=True))
# Check that the object in memory has not been updated and that the
# size is still a dirty key
self._check_volume(volume, 'available', 2, False, ['size'])
# Check that the volume in the DB hasn't been updated
self._check_volume(volume, 'available', 1, True)
def test_default_conditional_update_non_iterable_expected(self):
volume = self._create_volume()
self.assertTrue(volume.conditional_update({'status': 'deleting'}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 1, True)
def test_default_conditional_fail_update_non_iterable_expected(self):
volume_in_db = self._create_volume()
volume = objects.Volume.get_by_id(self.context, volume_in_db.id)
volume_in_db.size += 1
# This will fail because size in DB is different
self.assertFalse(volume.conditional_update({'status': 'deleting'}))
# Check that the object in memory has not been updated
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB hasn't changed the status but has
# the size we changed before the conditional update
self._check_volume(volume_in_db, 'available', 2, True)
def test_default_conditional_update_non_iterable_expected_with_dirty(self):
volume_in_db = self._create_volume()
volume = objects.Volume.get_by_id(self.context, volume_in_db.id)
volume_in_db.size += 1
volume.size = 33
# This will fail because even though we have excluded the size from
# the default condition when we dirtied it in the volume object, we
# still have the last update timestamp that will be included in the
# condition
self.assertFalse(volume.conditional_update({'status': 'deleting'}))
# Check that the object in memory has not been updated
self._check_volume(volume, 'available', 33, False, ['size'])
# Check that the volume in the DB hasn't changed the status but has
# the size we changed before the conditional update
self._check_volume(volume_in_db, 'available', 2, True)
def test_conditional_update_negated_non_iterable_expected(self):
volume = self._create_volume()
{'status': 'deleting', 'size': 2},
{'status': db.Not('in-use'), 'size': db.Not(2)}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 2)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 2, True)
def test_conditional_update_non_iterable_expected_filter(self):
# Volume we want to change
volume = self._create_volume()
# Another volume that has no snapshots
volume2 = self._create_volume()
# A volume with snapshots
volume3 = self._create_volume()
# Update only it it has no snapshot
filters = (~sql.exists().where(
models.Snapshot.volume_id == models.Volume.id),)
{'status': 'deleting', 'size': 2},
{'status': 'available'},
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 2)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 2, True)
# Check that the other volumes in the DB haven't changed
self._check_volume(volume2, 'available', 1, True)
self._check_volume(volume3, 'available', 1, True)
def test_conditional_update_iterable_expected(self):
volume = self._create_volume()
{'status': 'deleting', 'size': 20},
{'status': ('error', 'available'), 'size': range(10)}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 20)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 20, True)
def test_conditional_update_negated_iterable_expected(self):
volume = self._create_volume()
{'status': 'deleting', 'size': 20},
{'status': db.Not(('creating', 'in-use')), 'size': range(10)}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 20)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 20, True)
def test_conditional_update_fail_non_iterable_expected(self):
volume = self._create_volume()
{'status': 'deleting'},
{'status': 'available', 'size': 2}))
# Check that the object in memory hasn't changed
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB hasn't changed either
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_fail_negated_non_iterable_expected(self):
volume = self._create_volume()
result = volume.conditional_update({'status': 'deleting'},
{'status': db.Not('in-use'),
'size': 2})
# Check that the object in memory hasn't changed
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB hasn't changed either
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_fail_iterable_expected(self):
volume = self._create_volume()
{'status': 'available'},
{'status': ('error', 'creating'), 'size': range(2, 10)}))
# Check that the object in memory hasn't changed
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB hasn't changed either
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_fail_negated_iterable_expected(self):
volume = self._create_volume()
{'status': 'error'},
{'status': db.Not(('available', 'in-use')), 'size': range(2, 10)}))
# Check that the object in memory hasn't changed
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB hasn't changed either
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_fail_non_iterable_expected_filter(self):
# Volume we want to change
volume = self._create_volume()
# A volume that has no snapshots
volume2 = self._create_volume()
# Another volume with snapshots
volume3 = self._create_volume()
# Update only it it has no snapshot
filters = (~sql.exists().where(
models.Snapshot.volume_id == models.Volume.id),)
{'status': 'deleting', 'size': 2},
{'status': 'available'},
# Check that the object in memory hasn't been updated
self._check_volume(volume, 'available', 1)
# Check that no volume in the DB also has been updated
self._check_volume(volume, 'available', 1, True)
self._check_volume(volume2, 'available', 1, True)
self._check_volume(volume3, 'available', 1, True)
def test_conditional_update_non_iterable_case_value(self):
# Volume we want to change and has snapshots
volume = self._create_volume()
# Filter that checks if a volume has snapshots
has_snapshot_filter = sql.exists().where(
models.Snapshot.volume_id == models.Volume.id)
# We want the updated value to depend on whether it has snapshots or
# not
case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
self.assertTrue(volume.conditional_update({'status': case_values},
{'status': 'available'}))
# Check that the object in memory has been updated
self._check_volume(volume, 'has-snapshot', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'has-snapshot', 1, True)
def test_conditional_update_non_iterable_case_value_else(self):
# Volume we want to change
volume = self._create_volume()
# Filter that checks if a volume has snapshots
has_snapshot_filter = sql.exists().where(
models.Snapshot.volume_id == models.Volume.id)
# We want the updated value to depend on whether it has snapshots or
# not
case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
self.assertTrue(volume.conditional_update({'status': case_values},
{'status': 'available'}))
# Check that the object in memory has been updated
self._check_volume(volume, 'no-snapshot', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'no-snapshot', 1, True)
def test_conditional_update_non_iterable_case_value_fail(self):
# Volume we want to change doesn't have snapshots
volume = self._create_volume()
# Filter that checks if a volume has snapshots
has_snapshot_filter = sql.exists().where(
models.Snapshot.volume_id == models.Volume.id)
# We want the updated value to depend on whether it has snapshots or
# not
case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
# We won't update because volume status is available
self.assertFalse(volume.conditional_update({'status': case_values},
{'status': 'deleting'}))
# Check that the object in memory has not been updated
self._check_volume(volume, 'available', 1)
# Check that the volume in the DB also hasn't been updated either
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_iterable_with_none_expected(self):
volume = self._create_volume()
# We also check that we can check for None values in an iterable
{'status': 'deleting'},
{'status': (None, 'available'),
'migration_status': (None, 'finished')}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 1, True)
def test_conditional_update_iterable_with_not_none_expected(self):
volume = self._create_volume()
# We also check that we can check for None values in a negated iterable
{'status': 'deleting'},
{'status': volume.Not((None, 'in-use'))}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 1, True)
def test_conditional_update_iterable_with_not_includes_null(self):
volume = self._create_volume()
# We also check that negation includes None values by default like we
# do in Python and not like MySQL does
{'status': 'deleting'},
{'status': 'available',
'migration_status': volume.Not(('migrating', 'error'))}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', 1)
# Check that the volume in the DB also has been updated
self._check_volume(volume, 'deleting', 1, True)
def test_conditional_update_iterable_with_not_includes_null_fails(self):
volume = self._create_volume()
# We also check that negation excludes None values if we ask it to
{'status': 'deleting'},
{'status': 'available',
'migration_status': volume.Not(('migrating', 'error'),
# Check that the object in memory has not been updated
self._check_volume(volume, 'available', 1, False)
# Check that the volume in the DB hasn't been updated
self._check_volume(volume, 'available', 1, True)
def test_conditional_update_use_operation_in_value(self):
volume = self._create_volume()
expected_size = volume.size + 1
# We also check that using fields in requested changes will work as
# expected
{'status': 'deleting',
'size': volume.model.size + 1},
{'status': 'available'}))
# Check that the object in memory has been updated
self._check_volume(volume, 'deleting', expected_size, False)
# Check that the volume in the DB has also been updated
self._check_volume(volume, 'deleting', expected_size, True)
def test_conditional_update_auto_order(self):
volume = self._create_volume()
has_snapshot_filter = sql.exists().where(
models.Snapshot.volume_id == models.Volume.id)
case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
values = {'status': 'deleting',
'previous_status': volume.model.status,
'migration_status': case_values}
with mock.patch('cinder.db.sqlalchemy.api.model_query') as model_query:
update = model_query.return_value.filter.return_value.update
update.return_value = 0
values, {'status': 'available'}))
# We check that we are passing values to update to SQLAlchemy in the
# right order
self.assertEqual(1, update.call_count)
[('previous_status', volume.model.status),
('migration_status', mock.ANY),
('status', 'deleting')],
{'synchronize_session': False,
'update_args': {'preserve_parameter_order': True}},
def test_conditional_update_force_order(self):
volume = self._create_volume()
has_snapshot_filter = sql.exists().where(
models.Snapshot.volume_id == models.Volume.id)
case_values = volume.Case([(has_snapshot_filter, 'has-snapshot')],
values = {'status': 'deleting',
'previous_status': volume.model.status,
'migration_status': case_values}
order = ['status']
with mock.patch('cinder.db.sqlalchemy.api.model_query') as model_query:
update = model_query.return_value.filter.return_value.update
update.return_value = 0
values, {'status': 'available'}, order=order))
# We check that we are passing values to update to SQLAlchemy in the
# right order
self.assertEqual(1, update.call_count)
[('status', 'deleting'),
('previous_status', volume.model.status),
('migration_status', mock.ANY)],
{'synchronize_session': False,
'update_args': {'preserve_parameter_order': True}},
def test_conditional_update_no_order(self):
volume = self._create_volume()
values = {'status': 'deleting',
'previous_status': 'available',
'migration_status': None}
with mock.patch('cinder.db.sqlalchemy.api.model_query') as model_query:
update = model_query.return_value.filter.return_value.update
update.return_value = 0
values, {'status': 'available'}))
# Check that arguments passed to SQLAlchemy's update are correct (order
# is not relevant).
self.assertEqual(1, update.call_count)
arg = update.call_args[0][0]
self.assertIsInstance(arg, dict)
self.assertEqual(set(values.keys()), set(arg.keys()))
def test_conditional_update_multitable_fail(self):
volume = self._create_volume()
{'status': 'deleting',
objects.Snapshot.model.status: 'available'},
{'status': 'available'})
def test_conditional_update_multitable_fail_fields_different_models(self):
volume = self._create_volume()
{objects.Backup.model.status: 'available',
objects.Snapshot.model.status: 'available'})
def test_conditional_update_not_multitable(self):
volume = self._create_volume()
with mock.patch('cinder.db.sqlalchemy.api._create_facade_lazily') as m:
res = volume.conditional_update(
{objects.Volume.model.status: 'deleting',
objects.Volume.model.size: 12}, reflect_changes=False)
@ddt.data(('available', 'error', None),
('error', 'rolling_back', [{'fake_filter': 'faked'}]))
def test_update_status_where(self, value, expected, filters, mock_update):
volume = self._create_volume()
if filters:
volume.update_single_status_where(value, expected, filters)
mock_update.assert_called_with({'status': value},
{'status': expected},
volume.update_single_status_where(value, expected)
mock_update.assert_called_with({'status': value},
{'status': expected},
class TestCinderDictObject(test_objects.BaseObjectsTestCase):
class TestDictObject(objects.base.CinderObjectDictCompat,
obj_extra_fields = ['foo']
fields = {
'abc': fields.StringField(nullable=True),
'def': fields.IntegerField(nullable=True),
def foo(self):
return 42
def test_dict_objects(self):
obj = self.TestDictObject()
self.assertNotIn('non_existing', obj)
self.assertEqual('val', obj.get('abc', 'val'))
self.assertNotIn('abc', obj)
obj.abc = 'val2'
self.assertEqual('val2', obj.get('abc', 'val'))
self.assertEqual(42, obj.get('foo'))
self.assertEqual(42, obj.get('foo', None))
self.assertIn('foo', obj)
self.assertIn('abc', obj)
self.assertNotIn('def', obj)
@mock.patch('cinder.objects.base.OBJ_VERSIONS', fake_objects.MyHistory())
class TestCinderObjectSerializer(test_objects.BaseObjectsTestCase):
BACKPORT_MSG = ('Backporting %(obj_name)s from version %(src_vers)s to '
'version %(dst_vers)s')
def setUp(self):
super(TestCinderObjectSerializer, self).setUp()
self.obj = fake_objects.ChildObject(scheduled_at=None,
self.parent = fake_objects.ParentObject(uuid=uuid.uuid4(),
self.parent_list = fake_objects.ParentObjectList(objects=[self.parent])
def test_serialize_init_current_has_no_manifest(self):
"""Test that pinned to current version we have no manifest."""
serializer = objects.base.CinderObjectSerializer('1.6')
# Serializer should not have a manifest
def test_serialize_init_no_cap_has_no_manifest(self):
"""Test that without cap we have no manifest."""
serializer = objects.base.CinderObjectSerializer()
# Serializer should not have a manifest
def test_serialize_init_pinned_has_manifest(self):
"""Test that pinned to older version we have manifest."""
objs_version = '1.5'
serializer = objects.base.CinderObjectSerializer(objs_version)
# Serializer should have the right manifest
def test_serialize_entity_unknown_version(self):
"""Test that bad cap version will prevent serializer creation."""
objects.base.CinderObjectSerializer, '0.9')
def test_serialize_entity_basic_no_backport(self, log_debug_mock):
"""Test single element serializer with no backport."""
serializer = objects.base.CinderObjectSerializer('1.6')
primitive = serializer.serialize_entity(self.context, self.obj)
self.assertEqual('1.2', primitive['versioned_object.version'])
data = primitive['versioned_object.data']
self.assertEqual(1, data['integer'])
self.assertEqual('text', data['text'])
def test_serialize_entity_basic_backport(self, log_debug_mock):
"""Test single element serializer with backport."""
serializer = objects.base.CinderObjectSerializer('1.5')
primitive = serializer.serialize_entity(self.context, self.obj)
self.assertEqual('1.1', primitive['versioned_object.version'])
data = primitive['versioned_object.data']
self.assertNotIn('integer', data)
self.assertEqual('text', data['text'])
{'obj_name': 'ChildObject',
'src_vers': '1.2',
'dst_vers': '1.1'})
def test_serialize_entity_full_no_backport(self, log_debug_mock):
"""Test related elements serialization with no backport."""
serializer = objects.base.CinderObjectSerializer('1.6')
primitive = serializer.serialize_entity(self.context, self.parent_list)
self.assertEqual('1.1', primitive['versioned_object.version'])
parent = primitive['versioned_object.data']['objects'][0]
self.assertEqual('1.1', parent['versioned_object.version'])
child = parent['versioned_object.data']['child']
self.assertEqual('1.2', child['versioned_object.version'])
def test_serialize_entity_full_backport_last_children(self,
"""Test related elements serialization with backport of the last child.
Test that using the manifest we properly backport a child object even
when all its parents have not changed their version.
serializer = objects.base.CinderObjectSerializer('1.5')
primitive = serializer.serialize_entity(self.context, self.parent_list)
self.assertEqual('1.1', primitive['versioned_object.version'])
parent = primitive['versioned_object.data']['objects'][0]
self.assertEqual('1.1', parent['versioned_object.version'])
# Only the child has been backported
child = parent['versioned_object.data']['child']
self.assertEqual('1.1', child['versioned_object.version'])
# Check that the backport has been properly done
data = child['versioned_object.data']
self.assertNotIn('integer', data)
self.assertEqual('text', data['text'])
{'obj_name': 'ChildObject',
'src_vers': '1.2',
'dst_vers': '1.1'})
def test_serialize_entity_full_backport(self, log_debug_mock):
"""Test backport of the whole tree of related elements."""
serializer = objects.base.CinderObjectSerializer('1.3')
primitive = serializer.serialize_entity(self.context, self.parent_list)
# List has been backported
self.assertEqual('1.0', primitive['versioned_object.version'])
parent = primitive['versioned_object.data']['objects'][0]
# Parent has been backported as well
self.assertEqual('1.0', parent['versioned_object.version'])
# And the backport has been properly done
data = parent['versioned_object.data']
self.assertNotIn('scheduled_at', data)
# And child as well
child = parent['versioned_object.data']['child']
self.assertEqual('1.1', child['versioned_object.version'])
# Check that the backport has been properly done
data = child['versioned_object.data']
self.assertNotIn('integer', data)
self.assertEqual('text', data['text'])
mock.call(self.BACKPORT_MSG, {'obj_name': 'ParentObjectList',
'src_vers': '1.1',
'dst_vers': '1.0'}),
mock.call(self.BACKPORT_MSG, {'obj_name': 'ParentObject',
'src_vers': '1.1',
'dst_vers': '1.0'}),
mock.call(self.BACKPORT_MSG, {'obj_name': 'ChildObject',
'src_vers': '1.2',
'dst_vers': '1.1'})])