Revert "Rolling upgrades of ceph osd cluster"

This reverts commit 5b2cebfdc4.

Change-Id: Ic6f371fcc2879886b705fdce4d59bc99e41eea89
This commit is contained in:
Chris Holcombe 2016-03-25 15:02:50 +00:00
parent 5b2cebfdc4
commit db09fdce93
7 changed files with 34 additions and 1713 deletions

View File

@ -5,7 +5,6 @@ include:
- cli
- fetch
- contrib.storage.linux:
- ceph
- utils
- contrib.openstack.alternatives
- contrib.network.ip

View File

@ -19,10 +19,11 @@ from charmhelpers.cli.host import mounts
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
cmp_pkgrevno,
lsb_release,
service_stop,
service_restart)
service_stop
)
from charmhelpers.core.hookenv import (
log,
ERROR,
@ -57,112 +58,6 @@ def ceph_user():
return "root"
class CrushLocation(object):
def __init__(self,
name,
identifier,
host,
rack,
row,
datacenter,
chassis,
root):
self.name = name
self.identifier = identifier
self.host = host
self.rack = rack
self.row = row
self.datacenter = datacenter
self.chassis = chassis
self.root = root
def __str__(self):
return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
"chassis :{} root: {}".format(self.name, self.identifier,
self.host, self.rack, self.row,
self.datacenter, self.chassis,
self.root)
def __eq__(self, other):
return not self.name < other.name and not other.name < self.name
def __ne__(self, other):
return self.name < other.name or other.name < self.name
def __gt__(self, other):
return self.name > other.name
def __ge__(self, other):
return not self.name < other.name
def __le__(self, other):
return self.name < other.name
def get_osd_tree(service):
"""
Returns the current osd map in JSON.
:return: List. :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', '--id', service,
'osd', 'tree', '--format=json'])
try:
json_tree = json.loads(tree)
crush_list = []
# Make sure children are present in the json
if not json_tree['nodes']:
return None
child_ids = json_tree['nodes'][0]['children']
for child in json_tree['nodes']:
if child['id'] in child_ids:
crush_list.append(
CrushLocation(
name=child.get('name'),
identifier=child['id'],
host=child.get('host'),
rack=child.get('rack'),
row=child.get('row'),
datacenter=child.get('datacenter'),
chassis=child.get('chassis'),
root=child.get('root')
)
)
return crush_list
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def get_local_osd_ids():
"""
This will list the /var/lib/ceph/osd/* directories and try
to split the ID off of the directory name and return it in
a list
:return: list. A list of osd identifiers :raise: OSError if
something goes wrong with listing the directory.
"""
osd_ids = []
osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd')
if os.path.exists(osd_path):
try:
dirs = os.listdir(osd_path)
for osd_dir in dirs:
osd_id = osd_dir.split('-')[1]
osd_ids.append(osd_id)
except OSError:
raise
return osd_ids
def get_version():
'''Derive Ceph release from an installed package.'''
import apt_pkg as apt
@ -413,7 +308,6 @@ def rescan_osd_devices():
_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring"
def is_bootstrapped():
@ -439,21 +333,6 @@ def import_osd_bootstrap_key(key):
]
subprocess.check_call(cmd)
def import_osd_upgrade_key(key):
if not os.path.exists(_upgrade_keyring):
cmd = [
"sudo",
"-u",
ceph_user(),
'ceph-authtool',
_upgrade_keyring,
'--create-keyring',
'--name=client.osd-upgrade',
'--add-key={}'.format(key)
]
subprocess.check_call(cmd)
# OSD caps taken from ceph-create-keys
_osd_bootstrap_caps = {
'mon': [
@ -620,7 +499,7 @@ def update_monfs():
def maybe_zap_journal(journal_dev):
if is_osd_disk(journal_dev):
if (is_osd_disk(journal_dev)):
log('Looks like {} is already an OSD data'
' or journal, skipping.'.format(journal_dev))
return
@ -664,7 +543,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
log('Path {} is not a block device - bailing'.format(dev))
return
if is_osd_disk(dev) and not reformat_osd:
if (is_osd_disk(dev) and not reformat_osd):
log('Looks like {} is already an'
' OSD data or journal, skipping.'.format(dev))
return
@ -738,7 +617,7 @@ def filesystem_mounted(fs):
def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons"""
'''Returns a list of the pids of the current running OSD daemons'''
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)

View File

@ -9,16 +9,12 @@
import glob
import os
import random
import shutil
import subprocess
import sys
import tempfile
import socket
import time
import ceph
from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
log,
ERROR,
@ -35,8 +31,8 @@ from charmhelpers.core.hookenv import (
from charmhelpers.core.host import (
umount,
mkdir,
cmp_pkgrevno,
service_stop, service_start)
cmp_pkgrevno
)
from charmhelpers.fetch import (
add_source,
apt_install,
@ -44,216 +40,24 @@ from charmhelpers.fetch import (
filter_installed_packages,
)
from charmhelpers.core.sysctl import create as create_sysctl
from charmhelpers.core import host
from utils import (
get_host_ip,
get_networks,
assert_charm_supports_ipv6,
render_template)
render_template,
)
from charmhelpers.contrib.openstack.alternatives import install_alternative
from charmhelpers.contrib.network.ip import (
get_ipv6_addr,
format_ipv6_addr,
)
from charmhelpers.contrib.storage.linux.ceph import (
monitor_key_set,
monitor_key_exists,
monitor_key_get)
from charmhelpers.contrib.charmsupport import nrpe
hooks = Hooks()
# A dict of valid ceph upgrade paths. Mapping is old -> new
upgrade_paths = {
'cloud:trusty-juno': 'cloud:trusty-kilo',
'cloud:trusty-kilo': 'cloud:trusty-liberty',
'cloud:trusty-liberty': 'cloud:trusty-mitaka',
}
def pretty_print_upgrade_paths():
lines = []
for key, value in upgrade_paths.iteritems():
lines.append("{} -> {}".format(key, value))
return lines
def check_for_upgrade():
release_info = host.lsb_release()
if not release_info['DISTRIB_CODENAME'] == 'trusty':
log("Invalid upgrade path from {}. Only trusty is currently "
"supported".format(release_info['DISTRIB_CODENAME']))
return
c = hookenv.config()
old_version = c.previous('source')
log('old_version: {}'.format(old_version))
# Strip all whitespace
new_version = hookenv.config('source')
if new_version:
# replace all whitespace
new_version = new_version.replace(' ', '')
log('new_version: {}'.format(new_version))
if old_version in upgrade_paths:
if new_version == upgrade_paths[old_version]:
log("{} to {} is a valid upgrade path. Proceeding.".format(
old_version, new_version))
roll_osd_cluster(new_version)
else:
# Log a helpful error message
log("Invalid upgrade path from {} to {}. "
"Valid paths are: {}".format(old_version,
new_version,
pretty_print_upgrade_paths()))
def lock_and_roll(my_name):
start_timestamp = time.time()
log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
monitor_key_set('osd-upgrade', "{}_start".format(my_name), start_timestamp)
log("Rolling")
# This should be quick
upgrade_osd()
log("Done")
stop_timestamp = time.time()
# Set a key to inform others I am finished
log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
monitor_key_set('osd-upgrade', "{}_done".format(my_name), stop_timestamp)
def wait_on_previous_node(previous_node):
log("Previous node is: {}".format(previous_node))
previous_node_finished = monitor_key_exists(
'osd-upgrade',
"{}_done".format(previous_node))
while previous_node_finished is False:
log("{} is not finished. Waiting".format(previous_node))
# Has this node been trying to upgrade for longer than
# 10 minutes?
# If so then move on and consider that node dead.
# NOTE: This assumes the clusters clocks are somewhat accurate
# If the hosts clock is really far off it may cause it to skip
# the previous node even though it shouldn't.
current_timestamp = time.time()
previous_node_start_time = monitor_key_get(
'osd-upgrade',
"{}_start".format(previous_node))
if (current_timestamp - (10 * 60)) > previous_node_start_time:
# Previous node is probably dead. Lets move on
if previous_node_start_time is not None:
log(
"Waited 10 mins on node {}. current time: {} > "
"previous node start time: {} Moving on".format(
previous_node,
(current_timestamp - (10 * 60)),
previous_node_start_time))
return
else:
# I have to wait. Sleep a random amount of time and then
# check if I can lock,upgrade and roll.
wait_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
previous_node_finished = monitor_key_exists(
'osd-upgrade',
"{}_done".format(previous_node))
def get_upgrade_position(osd_sorted_list, match_name):
for index, item in enumerate(osd_sorted_list):
if item.name == match_name:
return index
return None
# Edge cases:
# 1. Previous node dies on upgrade, can we retry?
# 2. This assumes that the osd failure domain is not set to osd.
# It rolls an entire server at a time.
def roll_osd_cluster(new_version):
"""
This is tricky to get right so here's what we're going to do.
There's 2 possible cases: Either I'm first in line or not.
If I'm not first in line I'll wait a random time between 5-30 seconds
and test to see if the previous osd is upgraded yet.
TODO: If you're not in the same failure domain it's safe to upgrade
1. Examine all pools and adopt the most strict failure domain policy
Example: Pool 1: Failure domain = rack
Pool 2: Failure domain = host
Pool 3: Failure domain = row
outcome: Failure domain = host
"""
log('roll_osd_cluster called with {}'.format(new_version))
my_name = socket.gethostname()
osd_tree = ceph.get_osd_tree(service='osd-upgrade')
# A sorted list of osd unit names
osd_sorted_list = sorted(osd_tree)
log("osd_sorted_list: {}".format(osd_sorted_list))
try:
position = get_upgrade_position(osd_sorted_list, my_name)
log("upgrade position: {}".format(position))
if position == 0:
# I'm first! Roll
# First set a key to inform others I'm about to roll
lock_and_roll(my_name=my_name)
else:
# Check if the previous node has finished
status_set('blocked',
'Waiting on {} to finish upgrading'.format(
osd_sorted_list[position - 1].name))
wait_on_previous_node(
previous_node=osd_sorted_list[position - 1].name)
lock_and_roll(my_name=my_name)
except ValueError:
log("Failed to find name {} in list {}".format(
my_name, osd_sorted_list))
status_set('blocked', 'failed to upgrade osd')
def upgrade_osd():
current_version = ceph.get_version()
status_set("maintenance", "Upgrading osd")
log("Current ceph version is {}".format(current_version))
new_version = config('release-version')
log("Upgrading to: {}".format(new_version))
try:
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if ceph.systemd():
for osd_id in ceph.get_local_osd_ids():
service_stop('ceph-osd@{}'.format(osd_id))
else:
service_stop('ceph-osd-all')
apt_install(packages=ceph.PACKAGES, fatal=True)
if ceph.systemd():
for osd_id in ceph.get_local_osd_ids():
service_start('ceph-osd@{}'.format(osd_id))
else:
service_start('ceph-osd-all')
except subprocess.CalledProcessError as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
def install_upstart_scripts():
# Only install upstart configurations for older versions
@ -320,7 +124,6 @@ def emit_cephconf():
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
charm_ceph_conf, 90)
JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
@ -355,9 +158,6 @@ def check_overlap(journaldevs, datadevs):
@hooks.hook('config-changed')
def config_changed():
# Check if an upgrade was requested
check_for_upgrade()
# Pre-flight checks
if config('osd-format') not in ceph.DISK_FORMATS:
log('Invalid OSD disk format configuration specified', level=ERROR)
@ -371,7 +171,7 @@ def config_changed():
create_sysctl(sysctl_dict, '/etc/sysctl.d/50-ceph-osd-charm.conf')
e_mountpoint = config('ephemeral-unmount')
if e_mountpoint and ceph.filesystem_mounted(e_mountpoint):
if (e_mountpoint and ceph.filesystem_mounted(e_mountpoint)):
umount(e_mountpoint)
prepare_disks_and_activate()
@ -401,14 +201,8 @@ def get_mon_hosts():
hosts = []
for relid in relation_ids('mon'):
for unit in related_units(relid):
addr = \
relation_get('ceph-public-address',
unit,
relid) or get_host_ip(
relation_get(
'private-address',
unit,
relid))
addr = relation_get('ceph-public-address', unit, relid) or \
get_host_ip(relation_get('private-address', unit, relid))
if addr:
hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr))
@ -464,12 +258,10 @@ def get_journal_devices():
'mon-relation-departed')
def mon_relation():
bootstrap_key = relation_get('osd_bootstrap_key')
upgrade_key = relation_get('osd_upgrade_key')
if get_fsid() and get_auth() and bootstrap_key:
log('mon has provided conf- scanning disks')
emit_cephconf()
ceph.import_osd_bootstrap_key(bootstrap_key)
ceph.import_osd_upgrade_key(upgrade_key)
prepare_disks_and_activate()
else:
log('mon cluster has not yet provided conf')

File diff suppressed because it is too large Load Diff

View File

@ -33,8 +33,6 @@ cluster addr = {{ cluster_addr }}
osd crush location = {{crush_location}}
{% endif %}
[client.osd-upgrade]
keyring = /var/lib/ceph/osd/ceph.client.osd-upgrade.keyring
[mon]
keyring = /var/lib/ceph/mon/$cluster-$id/keyring

View File

@ -43,8 +43,8 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
and the rest of the service are from lp branches that are
compatible with the local charm (e.g. stable or next).
"""
this_service = {'name': 'ceph-osd', 'units': 3}
other_services = [{'name': 'ceph-mon', 'units': 3},
this_service = {'name': 'ceph-osd'}
other_services = [{'name': 'ceph', 'units': 3},
{'name': 'mysql'},
{'name': 'keystone'},
{'name': 'rabbitmq-server'},
@ -60,18 +60,18 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'nova-compute:shared-db': 'mysql:shared-db',
'nova-compute:amqp': 'rabbitmq-server:amqp',
'nova-compute:image-service': 'glance:image-service',
'nova-compute:ceph': 'ceph-mon:client',
'nova-compute:ceph': 'ceph:client',
'keystone:shared-db': 'mysql:shared-db',
'glance:shared-db': 'mysql:shared-db',
'glance:identity-service': 'keystone:identity-service',
'glance:amqp': 'rabbitmq-server:amqp',
'glance:ceph': 'ceph-mon:client',
'glance:ceph': 'ceph:client',
'cinder:shared-db': 'mysql:shared-db',
'cinder:identity-service': 'keystone:identity-service',
'cinder:amqp': 'rabbitmq-server:amqp',
'cinder:image-service': 'glance:image-service',
'cinder:ceph': 'ceph-mon:client',
'ceph-osd:mon': 'ceph-mon:osd'
'cinder:ceph': 'ceph:client',
'ceph-osd:mon': 'ceph:osd'
}
super(CephOsdBasicDeployment, self)._add_relations(relations)
@ -86,6 +86,9 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
'auth-supported': 'none',
'fsid': '6547bd3e-1397-11e2-82e5-53567c8d32dc',
'monitor-secret': 'AQCXrnZQwI7KGBAAiPofmKEXKxu5bUzoYLVkbQ==',
'osd-reformat': 'yes',
'ephemeral-unmount': '/mnt',
'osd-devices': '/dev/vdb /srv/ceph'
}
# Include a non-existent device as osd-devices is a whitelist,
@ -99,7 +102,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
configs = {'keystone': keystone_config,
'mysql': mysql_config,
'cinder': cinder_config,
'ceph-mon': ceph_config,
'ceph': ceph_config,
'ceph-osd': ceph_osd_config}
super(CephOsdBasicDeployment, self)._configure_services(configs)
@ -112,12 +115,10 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
self.nova_sentry = self.d.sentry.unit['nova-compute/0']
self.glance_sentry = self.d.sentry.unit['glance/0']
self.cinder_sentry = self.d.sentry.unit['cinder/0']
self.ceph0_sentry = self.d.sentry.unit['ceph-mon/0']
self.ceph1_sentry = self.d.sentry.unit['ceph-mon/1']
self.ceph2_sentry = self.d.sentry.unit['ceph-mon/2']
self.ceph0_sentry = self.d.sentry.unit['ceph/0']
self.ceph1_sentry = self.d.sentry.unit['ceph/1']
self.ceph2_sentry = self.d.sentry.unit['ceph/2']
self.ceph_osd_sentry = self.d.sentry.unit['ceph-osd/0']
self.ceph_osd1_sentry = self.d.sentry.unit['ceph-osd/1']
self.ceph_osd2_sentry = self.d.sentry.unit['ceph-osd/2']
u.log.debug('openstack release val: {}'.format(
self._get_openstack_release()))
u.log.debug('openstack release str: {}'.format(
@ -176,6 +177,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
# Process name and quantity of processes to expect on each unit
ceph_processes = {
'ceph-mon': 1,
'ceph-osd': 2
}
# Units with process names and PID quantities expected
@ -212,6 +214,9 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
ceph_services = [
'ceph-mon-all',
'ceph-mon id=`hostname`',
'ceph-osd-all',
'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(0)),
'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(1))
]
services[self.ceph0_sentry] = ceph_services
services[self.ceph1_sentry] = ceph_services
@ -228,16 +233,16 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
def test_200_ceph_osd_ceph_relation(self):
"""Verify the ceph-osd to ceph relation data."""
u.log.debug('Checking ceph-osd:ceph-mon relation data...')
u.log.debug('Checking ceph-osd:ceph mon relation data...')
unit = self.ceph_osd_sentry
relation = ['mon', 'ceph-mon:osd']
relation = ['mon', 'ceph:osd']
expected = {
'private-address': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('ceph-osd to ceph-mon', ret)
message = u.relation_error('ceph-osd to ceph', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_201_ceph0_to_ceph_osd_relation(self):

View File

@ -1,157 +0,0 @@
import time
__author__ = 'chris'
from mock import patch, call, MagicMock
import sys
sys.path.append('/home/chris/repos/ceph-osd/hooks')
from ceph import CrushLocation
import test_utils
import ceph_hooks
TO_PATCH = [
'apt_install',
'apt_update',
'add_source',
'config',
'ceph',
'get_conf',
'hookenv',
'host',
'log',
'service_start',
'service_stop',
'socket',
'status_set',
]
def config_side_effect(*args):
if args[0] == 'source':
return 'cloud:trusty-kilo'
elif args[0] == 'key':
return 'key'
elif args[0] == 'release-version':
return 'cloud:trusty-kilo'
previous_node_start_time = time.time() - (9 * 60)
def monitor_key_side_effect(*args):
if args[1] == \
'ip-192-168-1-2_done':
return False
elif args[1] == \
'ip-192-168-1-2_start':
# Return that the previous node started 9 minutes ago
return previous_node_start_time
class UpgradeRollingTestCase(test_utils.CharmTestCase):
def setUp(self):
super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH)
@patch('ceph_hooks.roll_osd_cluster')
def test_check_for_upgrade(self, roll_osd_cluster):
self.host.lsb_release.return_value = {
'DISTRIB_CODENAME': 'trusty',
}
previous_mock = MagicMock().return_value
previous_mock.previous.return_value = "cloud:trusty-juno"
self.hookenv.config.side_effect = [previous_mock,
config_side_effect('source')]
ceph_hooks.check_for_upgrade()
roll_osd_cluster.assert_called_with('cloud:trusty-kilo')
@patch('ceph_hooks.upgrade_osd')
@patch('ceph_hooks.monitor_key_set')
def test_lock_and_roll(self, monitor_key_set, upgrade_osd):
monitor_key_set.monitor_key_set.return_value = None
ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2')
upgrade_osd.assert_called_once_with()
def test_upgrade_osd(self):
self.config.side_effect = config_side_effect
self.ceph.get_version.return_value = "0.80"
self.ceph.systemd.return_value = False
ceph_hooks.upgrade_osd()
self.service_stop.assert_called_with('ceph-osd-all')
self.service_start.assert_called_with('ceph-osd-all')
self.status_set.assert_has_calls([
call('maintenance', 'Upgrading osd'),
])
@patch('ceph_hooks.lock_and_roll')
@patch('ceph_hooks.get_upgrade_position')
def test_roll_osd_cluster_first(self,
get_upgrade_position,
lock_and_roll):
self.socket.gethostname.return_value = "ip-192-168-1-2"
self.ceph.get_osd_tree.return_value = ""
get_upgrade_position.return_value = 0
ceph_hooks.roll_osd_cluster('0.94.1')
lock_and_roll.assert_called_with(my_name="ip-192-168-1-2")
@patch('ceph_hooks.lock_and_roll')
@patch('ceph_hooks.get_upgrade_position')
@patch('ceph_hooks.wait_on_previous_node')
def test_roll_osd_cluster_second(self,
wait_on_previous_node,
get_upgrade_position,
lock_and_roll):
wait_on_previous_node.return_value = None
self.socket.gethostname.return_value = "ip-192-168-1-3"
self.ceph.get_osd_tree.return_value = [
CrushLocation(
name="ip-192-168-1-2",
identifier='a',
host='host-a',
rack='rack-a',
row='row-a',
datacenter='dc-1',
chassis='chassis-a',
root='ceph'),
CrushLocation(
name="ip-192-168-1-3",
identifier='a',
host='host-b',
rack='rack-a',
row='row-a',
datacenter='dc-1',
chassis='chassis-a',
root='ceph')
]
get_upgrade_position.return_value = 1
ceph_hooks.roll_osd_cluster('0.94.1')
self.status_set.assert_called_with(
'blocked',
'Waiting on ip-192-168-1-2 to finish upgrading')
lock_and_roll.assert_called_with(my_name="ip-192-168-1-3")
@patch('ceph_hooks.monitor_key_get')
@patch('ceph_hooks.monitor_key_exists')
def test_wait_on_previous_node(self,
monitor_key_exists,
monitor_key_get):
monitor_key_get.side_effect = monitor_key_side_effect
monitor_key_exists.return_value = False
ceph_hooks.wait_on_previous_node("ip-192-168-1-2")
# Make sure we checked to see if the previous node started
monitor_key_get.assert_has_calls(
[call('osd-upgrade', 'ip-192-168-1-2_start')]
)
# Make sure we checked to see if the previous node was finished
monitor_key_exists.assert_has_calls(
[call('osd-upgrade', 'ip-192-168-1-2_done')]
)
# Make sure we waited at last once before proceeding
self.log.assert_has_calls(
[call('Previous node is: ip-192-168-1-2')],
[call('ip-192-168-1-2 is not finished. Waiting')],
)