diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 7c03815ab8..d77ba7a566 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -649,43 +649,67 @@ class ConductorManager(base_manager.BaseConductorManager):
     def _get_node_next_clean_steps(self, task, skip_current_step=True):
         """Get the task's node's next clean steps.
 
-        This returns the list of remaining (ordered) clean steps to be done
-        on the node. If no clean steps have been started yet, all the clean
-        steps are returned. Otherwise, the clean steps after the current
-        clean step are returned. The current clean step is also returned if
-        'skip_current_step' is False.
+        This determines what the next (remaining) clean steps are, and
+        returns the index into the clean steps list that corresponds to the
+        next clean step. The remaining clean steps are determined as follows:
+
+        * If no clean steps have been started yet, all the clean steps
+          must be executed
+        * If skip_current_step is False, the remaining clean steps start
+          with the current clean step. Otherwise, the remaining clean steps
+          start with the clean step after the current one.
+
+        All the clean steps for an automated or manual cleaning are in
+        node.driver_internal_info['clean_steps']. node.clean_step is the
+        current clean step that was just executed (or None, {} if no steps
+        have been executed yet). node.driver_internal_info['clean_step_index']
+        is the index into the clean steps list (or None, doesn't exist if no
+        steps have been executed yet) and corresponds to node.clean_step.
 
         :param task: A TaskManager object
         :param skip_current_step: True to skip the current clean step; False to
                                   include it.
         :raises: NodeCleaningFailure if an internal error occurred when
                  getting the next clean steps
-        :returns: ordered list of clean step dictionaries
+        :returns: index of the next clean step; None if there are no clean
+                  steps to execute.
 
         """
         node = task.node
-        next_steps = node.driver_internal_info.get('clean_steps', [])
         if not node.clean_step:
-            # first time through, return all steps
-            return next_steps
+            # first time through, all steps need to be done. Return the
+            # index of the first step in the list.
+            return 0
 
-        try:
-            # Trim off all previous steps up to (and maybe including) the
-            # current clean step.
-            ind = next_steps.index(node.clean_step)
-            if skip_current_step:
-                ind += 1
-            next_steps = next_steps[ind:]
-        except ValueError:
-            msg = (_('Node %(node)s got an invalid last step for '
-                     '%(state)s: %(step)s.') %
-                   {'node': node.uuid, 'step': node.clean_step,
-                    'state': node.provision_state})
-            LOG.exception(msg)
-            utils.cleaning_error_handler(task, msg)
-            raise exception.NodeCleaningFailure(node=node.uuid,
-                                                reason=msg)
-        return next_steps
+        ind = None
+        if 'clean_step_index' in node.driver_internal_info:
+            ind = node.driver_internal_info['clean_step_index']
+        else:
+            # TODO(rloo). driver_internal_info['clean_step_index'] was
+            # added in Mitaka. We need to maintain backwards compatibility
+            # so this uses the original code to get the index of the current
+            # step. This will be deleted in the Newton cycle.
+            try:
+                next_steps = node.driver_internal_info['clean_steps']
+                ind = next_steps.index(node.clean_step)
+            except (KeyError, ValueError):
+                msg = (_('Node %(node)s got an invalid last step for '
+                         '%(state)s: %(step)s.') %
+                       {'node': node.uuid, 'step': node.clean_step,
+                        'state': node.provision_state})
+                LOG.exception(msg)
+                utils.cleaning_error_handler(task, msg)
+                raise exception.NodeCleaningFailure(node=node.uuid,
+                                                    reason=msg)
+        if ind is None:
+            return None
+
+        if skip_current_step:
+            ind += 1
+        if ind >= len(node.driver_internal_info['clean_steps']):
+            # no steps left to do
+            ind = None
+        return ind
 
     @messaging.expected_exceptions(exception.InvalidParameterValue,
                                    exception.InvalidStateRequested,
@@ -773,7 +797,7 @@ class ConductorManager(base_manager.BaseConductorManager):
         LOG.debug("RPC continue_node_clean called for node %s.", node_id)
 
         with task_manager.acquire(context, node_id, shared=False,
-                                  purpose='node cleaning') as task:
+                                  purpose='continue node cleaning') as task:
             node = task.node
             if node.target_provision_state == states.MANAGEABLE:
                 target_state = states.MANAGEABLE
@@ -802,7 +826,7 @@ class ConductorManager(base_manager.BaseConductorManager):
                 node.driver_internal_info = info
                 node.save()
 
-            next_steps = self._get_node_next_clean_steps(
+            next_step_index = self._get_node_next_clean_steps(
                 task, skip_current_step=skip_current_step)
 
             # If this isn't the final clean step in the cleaning operation
@@ -810,7 +834,7 @@ class ConductorManager(base_manager.BaseConductorManager):
             # finished, we abort the cleaning operation.
             if node.clean_step.get('abort_after'):
                 step_name = node.clean_step['step']
-                if next_steps:
+                if next_step_index is not None:
                     LOG.debug('The cleaning operation for node %(node)s was '
                               'marked to be aborted after step "%(step)s '
                               'completed. Aborting now that it has completed.',
@@ -842,7 +866,7 @@ class ConductorManager(base_manager.BaseConductorManager):
             task.spawn_after(
                 self._spawn_worker,
                 self._do_next_clean_step,
-                task, next_steps)
+                task, next_step_index)
 
     def _do_node_clean(self, task, clean_steps=None):
         """Internal RPC method to perform cleaning of a node.
@@ -881,7 +905,6 @@ class ConductorManager(base_manager.BaseConductorManager):
             return utils.cleaning_error_handler(task, msg)
 
         if manual_clean:
-            node.clean_step = {}
             info = node.driver_internal_info
             info['clean_steps'] = clean_steps
             node.driver_internal_info = info
@@ -924,32 +947,41 @@ class ConductorManager(base_manager.BaseConductorManager):
                    % {'node': node.uuid, 'msg': e})
             return utils.cleaning_error_handler(task, msg)
 
-        self._do_next_clean_step(
-            task,
-            node.driver_internal_info.get('clean_steps', []))
+        steps = node.driver_internal_info.get('clean_steps', [])
+        step_index = 0 if steps else None
+        self._do_next_clean_step(task, step_index)
 
-    def _do_next_clean_step(self, task, steps):
-        """Start executing cleaning steps.
+    def _do_next_clean_step(self, task, step_index):
+        """Do cleaning, starting from the specified clean step.
 
         :param task: a TaskManager instance with an exclusive lock
-        :param steps: The ordered list of remaining steps that need to be
-                      executed on the node. A step is a dictionary with
-                      required keys 'interface' and 'step'. 'args' is an
-                      optional key.
+        :param step_index: The first clean step in the list to execute. This
+            is the index (from 0) into the list of clean steps in the node's
+            driver_internal_info['clean_steps']. Is None if there are no steps
+            to execute.
         """
         node = task.node
         # For manual cleaning, the target provision state is MANAGEABLE,
         # whereas for automated cleaning, it is AVAILABLE.
         manual_clean = node.target_provision_state == states.MANAGEABLE
 
+        driver_internal_info = node.driver_internal_info
+        if step_index is None:
+            steps = []
+        else:
+            steps = driver_internal_info['clean_steps'][step_index:]
+
         LOG.info(_LI('Executing %(state)s on node %(node)s, remaining steps: '
                      '%(steps)s'), {'node': node.uuid, 'steps': steps,
                                     'state': node.provision_state})
+
         # Execute each step until we hit an async step or run out of steps
-        for step in steps:
+        for ind, step in enumerate(steps):
             # Save which step we're about to start so we can restart
             # if necessary
             node.clean_step = step
+            driver_internal_info['clean_step_index'] = step_index + ind
+            node.driver_internal_info = driver_internal_info
             node.save()
             interface = getattr(task.driver, step.get('interface'))
             LOG.info(_LI('Executing %(step)s on node %(node)s'),
@@ -996,8 +1028,8 @@ class ConductorManager(base_manager.BaseConductorManager):
 
         # Clear clean_step
         node.clean_step = None
-        driver_internal_info = node.driver_internal_info
         driver_internal_info['clean_steps'] = None
+        driver_internal_info.pop('clean_step_index', None)
         node.driver_internal_info = driver_internal_info
         node.save()
         try:
diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py
index 99b0236d22..1ee2f8cd7c 100644
--- a/ironic/conductor/utils.py
+++ b/ironic/conductor/utils.py
@@ -204,22 +204,26 @@ def provisioning_error_handler(e, node, provision_state,
 def cleaning_error_handler(task, msg, tear_down_cleaning=True,
                            set_fail_state=True):
     """Put a failed node in CLEANFAIL and maintenance."""
-    # Reset clean step, msg should include current step
-    if task.node.provision_state in (states.CLEANING, states.CLEANWAIT):
-        task.node.clean_step = {}
+    node = task.node
+    if node.provision_state in (states.CLEANING, states.CLEANWAIT):
+        # Clear clean step, msg should already include current step
+        node.clean_step = {}
+        info = node.driver_internal_info
+        info.pop('clean_step_index', None)
+        node.driver_internal_info = info
     # For manual cleaning, the target provision state is MANAGEABLE, whereas
     # for automated cleaning, it is AVAILABLE.
-    manual_clean = task.node.target_provision_state == states.MANAGEABLE
-    task.node.last_error = msg
-    task.node.maintenance = True
-    task.node.maintenance_reason = msg
-    task.node.save()
+    manual_clean = node.target_provision_state == states.MANAGEABLE
+    node.last_error = msg
+    node.maintenance = True
+    node.maintenance_reason = msg
+    node.save()
     if tear_down_cleaning:
         try:
             task.driver.deploy.tear_down_cleaning(task)
         except Exception as e:
             msg = (_LE('Failed to tear down cleaning on node %(uuid)s, '
-                       'reason: %(err)s'), {'err': e, 'uuid': task.node.uuid})
+                       'reason: %(err)s'), {'err': e, 'uuid': node.uuid})
             LOG.exception(msg)
 
     if set_fail_state:
@@ -307,16 +311,16 @@ def set_node_cleaning_steps(task):
              clean steps.
     """
     node = task.node
+    driver_internal_info = node.driver_internal_info
+
     # For manual cleaning, the target provision state is MANAGEABLE, whereas
     # for automated cleaning, it is AVAILABLE.
     manual_clean = node.target_provision_state == states.MANAGEABLE
 
     if not manual_clean:
         # Get the prioritized steps for automated cleaning
-        driver_internal_info = node.driver_internal_info
         driver_internal_info['clean_steps'] = _get_cleaning_steps(task,
                                                                   enabled=True)
-        node.driver_internal_info = driver_internal_info
     else:
         # For manual cleaning, the list of cleaning steps was specified by the
         # user and already saved in node.driver_internal_info['clean_steps'].
@@ -326,6 +330,8 @@ def set_node_cleaning_steps(task):
         _validate_user_clean_steps(task, steps)
 
     node.clean_step = {}
+    driver_internal_info['clean_step_index'] = None
+    node.driver_internal_info = driver_internal_info
     node.save()
 
 
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 0ed5048994..713de154c8 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -1370,7 +1370,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         # Automated cleaning should be executed in this order
         self.clean_steps = [self.deploy_erase, self.power_update,
                             self.deploy_update]
-        self.next_clean_steps = self.clean_steps[1:]
+        self.next_clean_step_index = 1
         # Manual clean step
         self.deploy_raid = {
             'step': 'build_raid', 'priority': 0, 'interface': 'deploy'}
@@ -1532,7 +1532,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.CLEANING, node.provision_state)
         self.assertEqual(tgt_prv_state, node.target_provision_state)
         mock_spawn.assert_called_with(self.service._do_next_clean_step,
-                                      mock.ANY, self.next_clean_steps)
+                                      mock.ANY, self.next_clean_step_index)
 
     def test_continue_node_clean_automated(self):
         self._continue_node_clean(states.CLEANWAIT)
@@ -1558,13 +1558,13 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.service._worker_pool.waitall()
         node.refresh()
         if skip:
-            expected_steps = self.next_clean_steps
+            expected_step_index = 1
         else:
             self.assertFalse(
                 'skip_current_clean_step' in node.driver_internal_info)
-            expected_steps = self.clean_steps
+            expected_step_index = 0
         mock_spawn.assert_called_with(self.service._do_next_clean_step,
-                                      mock.ANY, expected_steps)
+                                      mock.ANY, expected_step_index)
 
     def test_continue_node_clean_skip_step(self):
         self._continue_node_clean_skip_step()
@@ -1603,7 +1603,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         last_clean_step = self.clean_steps[0]
         last_clean_step['abortable'] = False
         last_clean_step['abort_after'] = True
-        driver_info = {'clean_steps': [self.clean_steps[0]]}
+        driver_info = {'clean_steps': [self.clean_steps[0]],
+                       'clean_step_index': 0}
         tgt_prov_state = states.MANAGEABLE if manual else states.AVAILABLE
         node = obj_utils.create_test_node(
             self.context, driver='fake', provision_state=states.CLEANWAIT,
@@ -1670,6 +1671,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.NOSTATE, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
         self.assertIsNone(node.driver_internal_info.get('clean_steps'))
+        self.assertIsNone(node.driver_internal_info.get('clean_step_index'))
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.prepare_cleaning')
     def __do_node_clean_prepare_clean_fail(self, mock_prep, clean_steps=None):
@@ -1756,18 +1758,19 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
     @mock.patch('ironic.drivers.modules.fake.FakePower.validate')
     def __do_node_clean(self, mock_validate, mock_next_step, mock_steps,
                         clean_steps=None):
-        if clean_steps is None:
-            clean_steps = []
-        tgt_prov_state = states.MANAGEABLE if clean_steps else states.AVAILABLE
+        if clean_steps:
+            tgt_prov_state = states.MANAGEABLE
+            driver_info = {}
+        else:
+            tgt_prov_state = states.AVAILABLE
+            driver_info = {'clean_steps': self.clean_steps}
         node = obj_utils.create_test_node(
             self.context, driver='fake',
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
             power_state=states.POWER_OFF,
-            driver_internal_info={'clean_steps': clean_steps})
-
-        mock_steps.return_value = self.clean_steps
+            driver_internal_info=driver_info)
 
         self._start_service()
         with task_manager.acquire(
@@ -1778,8 +1781,11 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         node.refresh()
 
         mock_validate.assert_called_once_with(task)
-        mock_next_step.assert_called_once_with(mock.ANY, clean_steps)
+        mock_next_step.assert_called_once_with(mock.ANY, 0)
         mock_steps.assert_called_once_with(task)
+        if clean_steps:
+            self.assertEqual(clean_steps,
+                             node.driver_internal_info['clean_steps'])
 
         # Check that state didn't change
         self.assertEqual(states.CLEANING, node.provision_state)
@@ -1795,28 +1801,38 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
     def _do_next_clean_step_first_step_async(self, return_state, mock_execute,
                                              clean_steps=None):
         # Execute the first async clean step on a node
-        tgt_prov_state = states.MANAGEABLE if clean_steps else states.AVAILABLE
+        driver_internal_info = {'clean_step_index': None}
+        if clean_steps:
+            tgt_prov_state = states.MANAGEABLE
+            driver_internal_info['clean_steps'] = clean_steps
+        else:
+            tgt_prov_state = states.AVAILABLE
+            driver_internal_info['clean_steps'] = self.clean_steps
+
         node = obj_utils.create_test_node(
             self.context, driver='fake',
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info=driver_internal_info,
             clean_step={})
         mock_execute.return_value = return_state
+        expected_first_step = node.driver_internal_info['clean_steps'][0]
 
         self._start_service()
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.clean_steps)
+            self.service._do_next_clean_step(task, 0)
 
         self.service._worker_pool.waitall()
         node.refresh()
 
         self.assertEqual(states.CLEANWAIT, node.provision_state)
         self.assertEqual(tgt_prov_state, node.target_provision_state)
-        self.assertEqual(self.clean_steps[0], node.clean_step)
-        mock_execute.assert_called_once_with(mock.ANY, self.clean_steps[0])
+        self.assertEqual(expected_first_step, node.clean_step)
+        self.assertEqual(0, node.driver_internal_info['clean_step_index'])
+        mock_execute.assert_called_once_with(mock.ANY, expected_first_step)
 
     def test_do_next_clean_step_automated_first_step_async(self):
         self._do_next_clean_step_first_step_async(states.CLEANWAIT)
@@ -1839,6 +1855,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info={'clean_steps': self.clean_steps,
+                                  'clean_step_index': 0},
             clean_step=self.clean_steps[0])
         mock_execute.return_value = return_state
 
@@ -1846,7 +1864,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.next_clean_steps)
+            self.service._do_next_clean_step(task, self.next_clean_step_index)
 
         self.service._worker_pool.waitall()
         node.refresh()
@@ -1854,6 +1872,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.CLEANWAIT, node.provision_state)
         self.assertEqual(tgt_prov_state, node.target_provision_state)
         self.assertEqual(self.clean_steps[1], node.clean_step)
+        self.assertEqual(1, node.driver_internal_info['clean_step_index'])
         mock_execute.assert_called_once_with(mock.ANY, self.clean_steps[1])
 
     def test_do_next_clean_step_continue_from_last_cleaning(self):
@@ -1870,18 +1889,21 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
     def _do_next_clean_step_last_step_noop(self, mock_execute, manual=False):
         # Resume where last_step is the last cleaning step, should be noop
         tgt_prov_state = states.MANAGEABLE if manual else states.AVAILABLE
+        info = {'clean_steps': self.clean_steps,
+                'clean_step_index': len(self.clean_steps) - 1}
         node = obj_utils.create_test_node(
             self.context, driver='fake',
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info=info,
             clean_step=self.clean_steps[-1])
 
         self._start_service()
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, [])
+            self.service._do_next_clean_step(task, None)
 
         self.service._worker_pool.waitall()
         node.refresh()
@@ -1890,6 +1912,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(tgt_prov_state, node.provision_state)
         self.assertEqual(states.NOSTATE, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
+        self.assertFalse('clean_step_index' in node.driver_internal_info)
+        self.assertEqual(None, node.driver_internal_info['clean_steps'])
         self.assertFalse(mock_execute.called)
 
     def test__do_next_clean_step_automated_last_step_noop(self):
@@ -1909,6 +1933,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info={'clean_steps': self.clean_steps,
+                                  'clean_step_index': None},
             clean_step={})
         mock_deploy_execute.return_value = None
         mock_power_execute.return_value = None
@@ -1917,7 +1943,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.clean_steps)
+            self.service._do_next_clean_step(task, 0)
 
         self.service._worker_pool.waitall()
         node.refresh()
@@ -1926,6 +1952,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(tgt_prov_state, node.provision_state)
         self.assertEqual(states.NOSTATE, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
+        self.assertFalse('clean_step_index' in node.driver_internal_info)
+        self.assertEqual(None, node.driver_internal_info['clean_steps'])
         mock_power_execute.assert_called_once_with(mock.ANY,
                                                    self.clean_steps[1])
         mock_deploy_execute.assert_has_calls = [
@@ -1950,6 +1978,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info={'clean_steps': self.clean_steps,
+                                  'clean_step_index': None},
             clean_step={})
         mock_execute.side_effect = Exception()
 
@@ -1957,7 +1987,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.clean_steps)
+            self.service._do_next_clean_step(task, 0)
             tear_mock.assert_called_once_with(task.driver.deploy, task)
 
         self.service._worker_pool.waitall()
@@ -1967,6 +1997,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.CLEANFAIL, node.provision_state)
         self.assertEqual(tgt_prov_state, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
+        self.assertFalse('clean_step_index' in node.driver_internal_info)
         self.assertIsNotNone(node.last_error)
         self.assertTrue(node.maintenance)
         mock_execute.assert_called_once_with(mock.ANY, self.clean_steps[0])
@@ -1988,6 +2019,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info={'clean_steps': self.clean_steps,
+                                  'clean_step_index': None},
             clean_step={})
 
         mock_execute.return_value = None
@@ -1997,7 +2030,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.clean_steps)
+            self.service._do_next_clean_step(task, 0)
 
         self.service._worker_pool.waitall()
         node.refresh()
@@ -2006,6 +2039,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.CLEANFAIL, node.provision_state)
         self.assertEqual(tgt_prov_state, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
+        self.assertFalse('clean_step_index' in node.driver_internal_info)
         self.assertIsNotNone(node.last_error)
         self.assertEqual(1, tear_mock.call_count)
         self.assertTrue(node.maintenance)
@@ -2019,30 +2053,36 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_clean_step')
     def _do_next_clean_step_no_steps(self, mock_execute, manual=False):
-        # Resume where there are no steps, should be a noop
-        tgt_prov_state = states.MANAGEABLE if manual else states.AVAILABLE
-        node = obj_utils.create_test_node(
-            self.context, driver='fake',
-            provision_state=states.CLEANING,
-            target_provision_state=tgt_prov_state,
-            last_error=None,
-            clean_step={})
 
-        self._start_service()
+        for info in ({'clean_steps': None, 'clean_step_index': None},
+                     {'clean_steps': None}):
+            # Resume where there are no steps, should be a noop
+            tgt_prov_state = states.MANAGEABLE if manual else states.AVAILABLE
+            node = obj_utils.create_test_node(
+                self.context, driver='fake',
+                uuid=uuidutils.generate_uuid(),
+                provision_state=states.CLEANING,
+                target_provision_state=tgt_prov_state,
+                last_error=None,
+                driver_internal_info=info,
+                clean_step={})
 
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(
-                task, [])
+            self._start_service()
 
-        self.service._worker_pool.waitall()
-        node.refresh()
+            with task_manager.acquire(
+                    self.context, node.uuid, shared=False) as task:
+                self.service._do_next_clean_step(task, None)
 
-        # Cleaning should be complete without calling additional steps
-        self.assertEqual(tgt_prov_state, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        self.assertEqual({}, node.clean_step)
-        self.assertFalse(mock_execute.called)
+            self.service._worker_pool.waitall()
+            node.refresh()
+
+            # Cleaning should be complete without calling additional steps
+            self.assertEqual(tgt_prov_state, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            self.assertEqual({}, node.clean_step)
+            self.assertFalse('clean_step_index' in node.driver_internal_info)
+            self.assertFalse(mock_execute.called)
+            mock_execute.reset_mock()
 
     def test__do_next_clean_step_automated_no_steps(self):
         self._do_next_clean_step_no_steps()
@@ -2061,6 +2101,8 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             provision_state=states.CLEANING,
             target_provision_state=tgt_prov_state,
             last_error=None,
+            driver_internal_info={'clean_steps': self.clean_steps,
+                                  'clean_step_index': None},
             clean_step={})
         deploy_exec_mock.return_value = "foo"
 
@@ -2068,7 +2110,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
 
         with task_manager.acquire(
                 self.context, node.uuid, shared=False) as task:
-            self.service._do_next_clean_step(task, self.clean_steps)
+            self.service._do_next_clean_step(task, 0)
 
         self.service._worker_pool.waitall()
         node.refresh()
@@ -2077,6 +2119,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertEqual(states.CLEANFAIL, node.provision_state)
         self.assertEqual(tgt_prov_state, node.target_provision_state)
         self.assertEqual({}, node.clean_step)
+        self.assertFalse('clean_step_index' in node.driver_internal_info)
         self.assertIsNotNone(node.last_error)
         self.assertTrue(node.maintenance)
         deploy_exec_mock.assert_called_once_with(mock.ANY,
@@ -2091,6 +2134,44 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
         self._do_next_clean_step_bad_step_return_value(manual=True)
 
     def __get_node_next_clean_steps(self, skip=True):
+        driver_internal_info = {'clean_steps': self.clean_steps,
+                                'clean_step_index': 0}
+        node = obj_utils.create_test_node(
+            self.context, driver='fake',
+            provision_state=states.CLEANWAIT,
+            target_provision_state=states.AVAILABLE,
+            driver_internal_info=driver_internal_info,
+            last_error=None,
+            clean_step=self.clean_steps[0])
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            step_index = self.service._get_node_next_clean_steps(
+                task, skip_current_step=skip)
+            expected_index = 1 if skip else 0
+            self.assertEqual(expected_index, step_index)
+
+    def test__get_node_next_clean_steps(self):
+        self.__get_node_next_clean_steps()
+
+    def test__get_node_next_clean_steps_no_skip(self):
+        self.__get_node_next_clean_steps(skip=False)
+
+    def test__get_node_next_clean_steps_unset_clean_step(self):
+        driver_internal_info = {'clean_steps': self.clean_steps,
+                                'clean_step_index': None}
+        node = obj_utils.create_test_node(
+            self.context, driver='fake',
+            provision_state=states.CLEANWAIT,
+            target_provision_state=states.AVAILABLE,
+            driver_internal_info=driver_internal_info,
+            last_error=None,
+            clean_step=None)
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            step_index = self.service._get_node_next_clean_steps(task)
+            self.assertEqual(0, step_index)
+
+    def __get_node_next_clean_steps_backwards_compat(self, skip=True):
         driver_internal_info = {'clean_steps': self.clean_steps}
         node = obj_utils.create_test_node(
             self.context, driver='fake',
@@ -2101,32 +2182,19 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
             clean_step=self.clean_steps[0])
 
         with task_manager.acquire(self.context, node.uuid) as task:
-            steps = self.service._get_node_next_clean_steps(
+            step_index = self.service._get_node_next_clean_steps(
                 task, skip_current_step=skip)
-            expected = self.next_clean_steps if skip else self.clean_steps
-            self.assertEqual(expected, steps)
+            expected_index = 1 if skip else 0
+            self.assertEqual(expected_index, step_index)
 
-    def test__get_node_next_clean_steps(self):
-        self.__get_node_next_clean_steps()
+    def test__get_node_next_clean_steps_backwards_compat(self):
+        self.__get_node_next_clean_steps_backwards_compat()
 
-    def test__get_node_next_clean_steps_no_skip(self):
-        self.__get_node_next_clean_steps(skip=False)
-
-    def test__get_node_next_clean_steps_unset_clean_step(self):
-        driver_internal_info = {'clean_steps': self.clean_steps}
-        node = obj_utils.create_test_node(
-            self.context, driver='fake',
-            provision_state=states.CLEANWAIT,
-            target_provision_state=states.AVAILABLE,
-            driver_internal_info=driver_internal_info,
-            last_error=None,
-            clean_step=None)
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            steps = self.service._get_node_next_clean_steps(task)
-            self.assertEqual(self.clean_steps, steps)
+    def test__get_node_next_clean_steps_no_skip_backwards_compat(self):
+        self.__get_node_next_clean_steps_backwards_compat(skip=False)
 
     def test__get_node_next_clean_steps_bad_clean_step(self):
+        # NOTE(rloo) for backwards compatibility
         driver_internal_info = {'clean_steps': self.clean_steps}
         node = obj_utils.create_test_node(
             self.context, driver='fake',
diff --git a/ironic/tests/unit/conductor/test_utils.py b/ironic/tests/unit/conductor/test_utils.py
index a029a0dfff..81f4a5aa9c 100644
--- a/ironic/tests/unit/conductor/test_utils.py
+++ b/ironic/tests/unit/conductor/test_utils.py
@@ -434,7 +434,7 @@ class NodeCleaningStepsTestCase(base.DbTestCase):
             conductor_utils.set_node_cleaning_steps(task)
             node.refresh()
             self.assertEqual(self.clean_steps,
-                             task.node.driver_internal_info['clean_steps'])
+                             node.driver_internal_info['clean_steps'])
             self.assertEqual({}, node.clean_step)
             mock_steps.assert_called_once_with(task, enabled=True)
             self.assertFalse(mock_validate_user_steps.called)
@@ -459,7 +459,7 @@ class NodeCleaningStepsTestCase(base.DbTestCase):
             conductor_utils.set_node_cleaning_steps(task)
             node.refresh()
             self.assertEqual(clean_steps,
-                             task.node.driver_internal_info['clean_steps'])
+                             node.driver_internal_info['clean_steps'])
             self.assertEqual({}, node.clean_step)
             self.assertFalse(mock_steps.called)
             mock_validate_user_steps.assert_called_once_with(task, clean_steps)
@@ -571,10 +571,12 @@ class ErrorHandlersTestCase(tests_base.TestCase):
         self.node.provision_state = states.CLEANING
         target = 'baz'
         self.node.target_provision_state = target
+        self.node.driver_internal_info = {}
         msg = 'error bar'
         conductor_utils.cleaning_error_handler(self.task, msg)
         self.node.save.assert_called_once_with()
         self.assertEqual({}, self.node.clean_step)
+        self.assertFalse('clean_step_index' in self.node.driver_internal_info)
         self.assertEqual(msg, self.node.last_error)
         self.assertTrue(self.node.maintenance)
         self.assertEqual(msg, self.node.maintenance_reason)