From bad281cf161d4ffa2c87f0155c5cd572289c0251 Mon Sep 17 00:00:00 2001
From: Dmitry Tantsur <dtantsur@protonmail.com>
Date: Fri, 30 Apr 2021 18:19:14 +0200
Subject: [PATCH] Clean up deprecated features of the agent deploy

Removes support for non-decomposable deploy steps.
Removes support for pre-Wallaby agents.

Change-Id: I7bb1e58ecbcb1130e64150f26aa5bd347a24f7d5
---
 ironic/drivers/modules/agent.py               |  99 +----
 ironic/drivers/modules/agent_base.py          |  82 +---
 ironic/drivers/modules/agent_client.py        |  32 --
 ironic/drivers/modules/ansible/deploy.py      |  15 +-
 .../tests/unit/drivers/modules/test_agent.py  | 120 +-----
 .../unit/drivers/modules/test_agent_base.py   | 377 ++----------------
 .../unit/drivers/modules/test_agent_client.py |  45 +--
 .../decomposed-steps-9644d3b5ccbad1ea.yaml    |   7 +
 8 files changed, 109 insertions(+), 668 deletions(-)
 create mode 100644 releasenotes/notes/decomposed-steps-9644d3b5ccbad1ea.yaml

diff --git a/ironic/drivers/modules/agent.py b/ironic/drivers/modules/agent.py
index 1dc3881e24..3296fb9347 100644
--- a/ironic/drivers/modules/agent.py
+++ b/ironic/drivers/modules/agent.py
@@ -208,8 +208,6 @@ class CustomAgentDeploy(agent_base.AgentDeployMixin, agent_base.AgentBaseMixin,
     the ramdisk and prepare the instance boot.
     """
 
-    has_decomposed_deploy_steps = True
-
     def get_properties(self):
         """Return the properties of the interface.
 
@@ -563,63 +561,15 @@ class AgentDeploy(CustomAgentDeploy):
             if disk_label is not None:
                 image_info['disk_label'] = disk_label
 
-        has_write_image = agent_base.find_step(
-            task, 'deploy', 'deploy', 'write_image') is not None
-        if not has_write_image:
-            LOG.warning('The agent on node %s does not have the deploy '
-                        'step deploy.write_image, using the deprecated '
-                        'synchronous fall-back', task.node.uuid)
-
-        if self.has_decomposed_deploy_steps and has_write_image:
-            configdrive = node.instance_info.get('configdrive')
-            # Now switch into the corresponding in-band deploy step and let the
-            # result be polled normally.
-            new_step = {'interface': 'deploy',
-                        'step': 'write_image',
-                        'args': {'image_info': image_info,
-                                 'configdrive': configdrive}}
-            return agent_base.execute_step(task, new_step, 'deploy',
-                                           client=self._client)
-        else:
-            # TODO(dtantsur): remove in W
-            command = self._client.prepare_image(node, image_info, wait=True)
-            if command['command_status'] == 'FAILED':
-                # TODO(jimrollenhagen) power off if using neutron dhcp to
-                #                      align with pxe driver?
-                msg = (_('node %(node)s command status errored: %(error)s') %
-                       {'node': node.uuid, 'error': command['command_error']})
-                LOG.error(msg)
-                deploy_utils.set_failed_state(task, msg)
-
-    # TODO(dtantsur): remove in W
-    def _get_uuid_from_result(self, task, type_uuid):
-        command = self._client.get_last_command_status(task.node,
-                                                       'prepare_image')
-        if (not command
-                or not command.get('command_result', {}).get('result')):
-            msg = _('Unexpected response from the agent for node %s: the '
-                    'running command list does not include prepare_image '
-                    'or its result is malformed') % task.node.uuid
-            LOG.error(msg)
-            deploy_utils.set_failed_state(task, msg)
-            return
-
-        words = command['command_result']['result'].split()
-        for word in words:
-            if type_uuid in word:
-                result = word.split('=')[1]
-                if not result:
-                    msg = (_('Command result did not return %(type_uuid)s '
-                             'for node %(node)s. The version of the IPA '
-                             'ramdisk used in the deployment might not '
-                             'have support for provisioning of '
-                             'partition images.') %
-                           {'type_uuid': type_uuid,
-                            'node': task.node.uuid})
-                    LOG.error(msg)
-                    deploy_utils.set_failed_state(task, msg)
-                    return
-                return result
+        configdrive = node.instance_info.get('configdrive')
+        # Now switch into the corresponding in-band deploy step and let the
+        # result be polled normally.
+        new_step = {'interface': 'deploy',
+                    'step': 'write_image',
+                    'args': {'image_info': image_info,
+                             'configdrive': configdrive}}
+        return agent_base.execute_step(task, new_step, 'deploy',
+                                       client=self._client)
 
     @METRICS.timer('AgentDeployMixin.prepare_instance_boot')
     @base.deploy_step(priority=60)
@@ -648,16 +598,9 @@ class AgentDeploy(CustomAgentDeploy):
         # ppc64* hardware we need to provide the 'PReP_Boot_partition_uuid' to
         # direct where the bootloader should be installed.
         driver_internal_info = task.node.driver_internal_info
-        try:
-            partition_uuids = self._client.get_partition_uuids(node).get(
-                'command_result') or {}
-            root_uuid = partition_uuids.get('root uuid')
-        except exception.AgentAPIError:
-            # TODO(dtantsur): remove in W
-            LOG.warning('Old ironic-python-agent detected, please update '
-                        'to Victoria or newer')
-            partition_uuids = None
-            root_uuid = self._get_uuid_from_result(task, 'root_uuid')
+        partition_uuids = self._client.get_partition_uuids(node).get(
+            'command_result') or {}
+        root_uuid = partition_uuids.get('root uuid')
 
         if root_uuid:
             driver_internal_info['root_uuid_or_disk_id'] = root_uuid
@@ -672,23 +615,13 @@ class AgentDeploy(CustomAgentDeploy):
         efi_sys_uuid = None
         if not iwdi:
             if boot_mode_utils.get_boot_mode(node) == 'uefi':
-                # TODO(dtantsur): remove in W
-                if partition_uuids is None:
-                    efi_sys_uuid = (self._get_uuid_from_result(task,
-                                    'efi_system_partition_uuid'))
-                else:
-                    efi_sys_uuid = partition_uuids.get(
-                        'efi system partition uuid')
+                efi_sys_uuid = partition_uuids.get(
+                    'efi system partition uuid')
 
         prep_boot_part_uuid = None
         if cpu_arch is not None and cpu_arch.startswith('ppc64'):
-            # TODO(dtantsur): remove in W
-            if partition_uuids is None:
-                prep_boot_part_uuid = (self._get_uuid_from_result(task,
-                                       'PReP_Boot_partition_uuid'))
-            else:
-                prep_boot_part_uuid = partition_uuids.get(
-                    'PReP Boot partition uuid')
+            prep_boot_part_uuid = partition_uuids.get(
+                'PReP Boot partition uuid')
 
         LOG.info('Image successfully written to node %s', node.uuid)
 
diff --git a/ironic/drivers/modules/agent_base.py b/ironic/drivers/modules/agent_base.py
index 2bc0598997..2db1ae82c1 100644
--- a/ironic/drivers/modules/agent_base.py
+++ b/ironic/drivers/modules/agent_base.py
@@ -401,55 +401,8 @@ def _step_failure_handler(task, msg, step_type, traceback=False):
 class HeartbeatMixin(object):
     """Mixin class implementing heartbeat processing."""
 
-    has_decomposed_deploy_steps = False
-    """Whether the driver supports decomposed deploy steps.
-
-    Previously (since Rocky), drivers used a single 'deploy' deploy step on
-    the deploy interface. Some additional steps were added for the 'direct'
-    and 'iscsi' deploy interfaces in the Ussuri cycle, which means that
-    more of the deployment flow is driven by deploy steps.
-    """
-
     def __init__(self):
         self._client = _get_client()
-        if not self.has_decomposed_deploy_steps:
-            LOG.warning('%s does not support decomposed deploy steps. This '
-                        'is deprecated and will stop working in a future '
-                        'release', self.__class__.__name__)
-
-    def continue_deploy(self, task):
-        """Continues the deployment of baremetal node.
-
-        This method continues the deployment of the baremetal node after
-        the ramdisk have been booted.
-
-        :param task: a TaskManager instance
-        """
-
-    def deploy_has_started(self, task):
-        """Check if the deployment has started already.
-
-        :returns: True if the deploy has started, False otherwise.
-        """
-
-    def deploy_is_done(self, task):
-        """Check if the deployment is already completed.
-
-        :returns: True if the deployment is completed. False otherwise
-        """
-
-    def in_core_deploy_step(self, task):
-        """Check if we are in the deploy.deploy deploy step.
-
-        Assumes that we are in the DEPLOYWAIT state.
-
-        :param task: a TaskManager instance
-        :returns: True if the current deploy step is deploy.deploy.
-        """
-        step = task.node.deploy_step
-        return (step
-                and step['interface'] == 'deploy'
-                and step['step'] == 'deploy')
 
     def reboot_to_instance(self, task):
         """Method invoked after the deployment is completed.
@@ -530,33 +483,14 @@ class HeartbeatMixin(object):
                 # booted and we need to collect in-band steps.
                 self.refresh_steps(task, 'deploy')
 
-            # NOTE(mgoddard): Only handle heartbeats during DEPLOYWAIT if we
-            # are currently in the core deploy.deploy step. Other deploy steps
-            # may cause the agent to boot, but we should not trigger deployment
-            # at that point if the driver is polling for completion of a step.
-            if (not self.has_decomposed_deploy_steps
-                    and self.in_core_deploy_step(task)):
-                msg = _('Failed checking if deploy is done')
-                # NOTE(mgoddard): support backwards compatibility for
-                # drivers which do not implement continue_deploy and
-                # reboot_to_instance as deploy steps.
-                if not self.deploy_has_started(task):
-                    msg = _('Node failed to deploy')
-                    self.continue_deploy(task)
-                elif self.deploy_is_done(task):
-                    msg = _('Node failed to move to active state')
-                    self.reboot_to_instance(task)
-                else:
-                    node.touch_provisioning()
-            else:
-                node.touch_provisioning()
-                # Check if the driver is polling for completion of
-                # a step, via the 'deployment_polling' flag.
-                polling = node.driver_internal_info.get(
-                    'deployment_polling', False)
-                if not polling:
-                    msg = _('Failed to process the next deploy step')
-                    self.process_next_step(task, 'deploy')
+            node.touch_provisioning()
+            # Check if the driver is polling for completion of
+            # a step, via the 'deployment_polling' flag.
+            polling = node.driver_internal_info.get(
+                'deployment_polling', False)
+            if not polling:
+                msg = _('Failed to process the next deploy step')
+                self.process_next_step(task, 'deploy')
         except Exception as e:
             last_error = _('%(msg)s. Error: %(exc)s') % {'msg': msg, 'exc': e}
             LOG.exception('Asynchronous exception for node %(node)s: %(err)s',
diff --git a/ironic/drivers/modules/agent_client.py b/ironic/drivers/modules/agent_client.py
index b5b6c3c1a5..5ca93a3b1d 100644
--- a/ironic/drivers/modules/agent_client.py
+++ b/ironic/drivers/modules/agent_client.py
@@ -342,38 +342,6 @@ class AgentClient(object):
                       {'cmd': method, 'node': node.uuid})
             return None
 
-    @METRICS.timer('AgentClient.prepare_image')
-    def prepare_image(self, node, image_info, wait=False):
-        """Call the `prepare_image` method on the node.
-
-        :param node: A Node object.
-        :param image_info: A dictionary containing various image related
-                           information.
-        :param wait: True to wait for the command to finish executing, False
-                     otherwise.
-        :raises: IronicException when failed to issue the request or there was
-                 a malformed response from the agent.
-        :raises: AgentAPIError when agent failed to execute specified command.
-        :raises: AgentInProgress when the command fails to execute as the agent
-                 is presently executing the prior command.
-        :returns: A dict containing command status from agent.
-                  See :func:`get_commands_status` for a command result sample.
-        """
-        LOG.debug('Preparing image %(image)s on node %(node)s.',
-                  {'image': image_info.get('id'),
-                   'node': node.uuid})
-        params = {'image_info': image_info}
-
-        # this should be an http(s) URL
-        configdrive = node.instance_info.get('configdrive')
-        if configdrive is not None:
-            params['configdrive'] = configdrive
-
-        return self._command(node=node,
-                             method='standby.prepare_image',
-                             params=params,
-                             poll=wait)
-
     @METRICS.timer('AgentClient.start_iscsi_target')
     def start_iscsi_target(self, node, iqn,
                            portal_port=DEFAULT_IPA_PORTAL_PORT,
diff --git a/ironic/drivers/modules/ansible/deploy.py b/ironic/drivers/modules/ansible/deploy.py
index 03d0f961f9..2c17bccdb3 100644
--- a/ironic/drivers/modules/ansible/deploy.py
+++ b/ironic/drivers/modules/ansible/deploy.py
@@ -381,8 +381,6 @@ class AnsibleDeploy(agent_base.HeartbeatMixin,
                     base.DeployInterface):
     """Interface for deploy-related actions."""
 
-    has_decomposed_deploy_steps = True
-
     def __init__(self):
         super(AnsibleDeploy, self).__init__()
         # NOTE(pas-ha) overriding agent creation as we won't be
@@ -447,6 +445,19 @@ class AnsibleDeploy(agent_base.HeartbeatMixin,
         manager_utils.node_power_action(task, states.REBOOT)
         return states.DEPLOYWAIT
 
+    def in_core_deploy_step(self, task):
+        """Check if we are in the deploy.deploy deploy step.
+
+        Assumes that we are in the DEPLOYWAIT state.
+
+        :param task: a TaskManager instance
+        :returns: True if the current deploy step is deploy.deploy.
+        """
+        step = task.node.deploy_step
+        return (step
+                and step['interface'] == 'deploy'
+                and step['step'] == 'deploy')
+
     def process_next_step(self, task, step_type):
         """Start the next clean/deploy step if the previous one is complete.
 
diff --git a/ironic/tests/unit/drivers/modules/test_agent.py b/ironic/tests/unit/drivers/modules/test_agent.py
index 0494c30d03..6ab5d71d06 100644
--- a/ironic/tests/unit/drivers/modules/test_agent.py
+++ b/ironic/tests/unit/drivers/modules/test_agent.py
@@ -289,14 +289,11 @@ class CommonTestsMixin:
 
     @mock.patch.object(agent.CustomAgentDeploy, 'refresh_steps',
                        autospec=True)
-    @mock.patch.object(agent_client.AgentClient, 'prepare_image',
-                       autospec=True)
     @mock.patch('ironic.conductor.utils.is_fast_track', autospec=True)
     @mock.patch.object(pxe.PXEBoot, 'prepare_instance', autospec=True)
     @mock.patch('ironic.conductor.utils.node_power_action', autospec=True)
     def test_deploy_fast_track(self, power_mock, mock_pxe_instance,
-                               mock_is_fast_track, prepare_image_mock,
-                               refresh_mock):
+                               mock_is_fast_track, refresh_mock):
         mock_is_fast_track.return_value = True
         self.node.target_provision_state = states.ACTIVE
         self.node.provision_state = states.DEPLOYING
@@ -307,7 +304,6 @@ class CommonTestsMixin:
             self.assertIsNone(result)
             self.assertFalse(power_mock.called)
             self.assertFalse(mock_pxe_instance.called)
-            self.assertFalse(prepare_image_mock.called)
             self.assertEqual(states.DEPLOYING, task.node.provision_state)
             self.assertEqual(states.ACTIVE,
                              task.node.target_provision_state)
@@ -1353,21 +1349,21 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
                 task, manage_boot=False)
 
     def _test_write_image(self, additional_driver_info=None,
-                          additional_expected_image_info=None,
-                          compat=False):
+                          additional_expected_image_info=None):
         self.node.provision_state = states.DEPLOYWAIT
         self.node.target_provision_state = states.ACTIVE
         driver_info = self.node.driver_info
         driver_info.update(additional_driver_info or {})
         self.node.driver_info = driver_info
-        if not compat:
-            step = {'step': 'write_image', 'interface': 'deploy'}
-            dii = self.node.driver_internal_info
-            dii['agent_cached_deploy_steps'] = {
-                'deploy': [step],
-            }
-            self.node.driver_internal_info = dii
+
+        step = {'step': 'write_image', 'interface': 'deploy'}
+        dii = self.node.driver_internal_info
+        dii['agent_cached_deploy_steps'] = {
+            'deploy': [step],
+        }
+        self.node.driver_internal_info = dii
         self.node.save()
+
         test_temp_url = 'http://image'
         expected_image_info = {
             'urls': [test_temp_url],
@@ -1380,22 +1376,17 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
         }
         expected_image_info.update(additional_expected_image_info or {})
 
-        client_mock = mock.MagicMock(spec_set=['prepare_image',
-                                               'execute_deploy_step'])
+        client_mock = mock.MagicMock(spec_set=['execute_deploy_step'])
 
         with task_manager.acquire(self.context, self.node.uuid,
                                   shared=False) as task:
             task.driver.deploy._client = client_mock
             task.driver.deploy.write_image(task)
 
-            if compat:
-                client_mock.prepare_image.assert_called_with(
-                    task.node, expected_image_info, wait=True)
-            else:
-                step['args'] = {'image_info': expected_image_info,
-                                'configdrive': None}
-                client_mock.execute_deploy_step.assert_called_once_with(
-                    step, task.node, mock.ANY)
+            step['args'] = {'image_info': expected_image_info,
+                            'configdrive': None}
+            client_mock.execute_deploy_step.assert_called_once_with(
+                step, task.node, mock.ANY)
             self.assertEqual(states.DEPLOYWAIT, task.node.provision_state)
             self.assertEqual(states.ACTIVE,
                              task.node.target_provision_state)
@@ -1403,9 +1394,6 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
     def test_write_image(self):
         self._test_write_image()
 
-    def test_write_image_compat(self):
-        self._test_write_image(compat=True)
-
     def test_write_image_with_proxies(self):
         self._test_write_image(
             additional_driver_info={'image_https_proxy': 'https://spam.ni',
@@ -1478,16 +1466,18 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
             'disk_label': 'msdos'
         }
 
-        client_mock = mock.MagicMock(spec_set=['prepare_image'])
+        client_mock = mock.MagicMock(spec_set=['execute_deploy_step'])
 
         with task_manager.acquire(self.context, self.node.uuid,
                                   shared=False) as task:
             task.driver.deploy._client = client_mock
             task.driver.deploy.write_image(task)
 
-            client_mock.prepare_image.assert_called_with(task.node,
-                                                         expected_image_info,
-                                                         wait=True)
+            step = {'step': 'write_image', 'interface': 'deploy',
+                    'args': {'image_info': expected_image_info,
+                             'configdrive': 'configdrive'}}
+            client_mock.execute_deploy_step.assert_called_once_with(
+                step, task.node, mock.ANY)
             self.assertEqual(states.DEPLOYWAIT, task.node.provision_state)
             self.assertEqual(states.ACTIVE,
                              task.node.target_provision_state)
@@ -1586,46 +1576,6 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
             self.assertEqual(states.DEPLOYING, task.node.provision_state)
             self.assertEqual(states.ACTIVE, task.node.target_provision_state)
 
-    @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
-    @mock.patch.object(boot_mode_utils, 'get_boot_mode_for_deploy',
-                       autospec=True)
-    @mock.patch.object(agent.AgentDeploy, '_get_uuid_from_result',
-                       autospec=True)
-    @mock.patch.object(agent_client.AgentClient, 'get_partition_uuids',
-                       autospec=True)
-    @mock.patch.object(agent.AgentDeploy, 'prepare_instance_to_boot',
-                       autospec=True)
-    def test_prepare_instance_boot_partition_image_compat(
-            self, prepare_instance_mock, uuid_mock,
-            old_uuid_mock, boot_mode_mock, log_mock):
-        self.node.instance_info = {
-            'capabilities': {'boot_option': 'netboot'}}
-        uuid_mock.side_effect = exception.AgentAPIError
-        old_uuid_mock.return_value = 'root_uuid'
-        self.node.provision_state = states.DEPLOYING
-        self.node.target_provision_state = states.ACTIVE
-        self.node.save()
-        boot_mode_mock.return_value = 'bios'
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=False) as task:
-            driver_internal_info = task.node.driver_internal_info
-            driver_internal_info['is_whole_disk_image'] = False
-            task.node.driver_internal_info = driver_internal_info
-            task.driver.deploy.prepare_instance_boot(task)
-            uuid_mock.assert_called_once_with(mock.ANY, task.node)
-            old_uuid_mock.assert_called_once_with(mock.ANY, task, 'root_uuid')
-            driver_int_info = task.node.driver_internal_info
-            self.assertEqual('root_uuid',
-                             driver_int_info['root_uuid_or_disk_id']),
-            boot_mode_mock.assert_called_once_with(task.node)
-            self.assertTrue(log_mock.called)
-            prepare_instance_mock.assert_called_once_with(mock.ANY,
-                                                          task,
-                                                          'root_uuid',
-                                                          None, None)
-            self.assertEqual(states.DEPLOYING, task.node.provision_state)
-            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
-
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
     @mock.patch.object(boot_mode_utils, 'get_boot_mode_for_deploy',
                        autospec=True)
@@ -1781,34 +1731,6 @@ class TestAgentDeploy(CommonTestsMixin, db_base.DbTestCase):
         self.node.refresh()
         self.assertEqual('bar', self.node.instance_info['foo'])
 
-    @mock.patch.object(agent_client.AgentClient, 'get_commands_status',
-                       autospec=True)
-    def test_get_uuid_from_result(self, mock_statuses):
-        mock_statuses.return_value = [
-            {'command_name': 'banana', 'command_result': None},
-            {'command_name': 'prepare_image',
-             'command_result': {'result': 'okay root_uuid=abcd'}},
-            {'command_name': 'get_deploy_steps',
-             'command_result': {'deploy_steps': []}}
-        ]
-        with task_manager.acquire(
-                self.context, self.node['uuid'], shared=False) as task:
-            result = self.driver._get_uuid_from_result(task, 'root_uuid')
-            self.assertEqual('abcd', result)
-
-    @mock.patch.object(agent_client.AgentClient, 'get_commands_status',
-                       autospec=True)
-    def test_get_uuid_from_result_fails(self, mock_statuses):
-        mock_statuses.return_value = [
-            {'command_name': 'banana', 'command_result': None},
-            {'command_name': 'get_deploy_steps',
-             'command_result': {'deploy_steps': []}}
-        ]
-        with task_manager.acquire(
-                self.context, self.node['uuid'], shared=False) as task:
-            result = self.driver._get_uuid_from_result(task, 'root_uuid')
-            self.assertIsNone(result)
-
     @mock.patch.object(manager_utils, 'restore_power_state_if_needed',
                        autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
diff --git a/ironic/tests/unit/drivers/modules/test_agent_base.py b/ironic/tests/unit/drivers/modules/test_agent_base.py
index 91daec5f1a..937312b874 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_base.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_base.py
@@ -86,19 +86,9 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
     @mock.patch.object(agent_base.HeartbeatMixin,
                        'refresh_steps', autospec=True)
     @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_continue_deploy(self, rti_mock, cd_mock,
-                                       deploy_started_mock,
-                                       in_deploy_mock,
-                                       refresh_steps_mock):
-        in_deploy_mock.return_value = True
-        deploy_started_mock.return_value = False
+                       'process_next_step', autospec=True)
+    def test_heartbeat_continue_deploy_first_run(self, next_step_mock,
+                                                 refresh_steps_mock):
         self.node.provision_state = states.DEPLOYWAIT
         self.node.save()
         with task_manager.acquire(self.context, self.node.uuid,
@@ -110,27 +100,17 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             self.assertEqual(
                 '3.2.0',
                 task.node.driver_internal_info['agent_version'])
-            cd_mock.assert_called_once_with(self.deploy, task)
-            self.assertFalse(rti_mock.called)
             refresh_steps_mock.assert_called_once_with(self.deploy,
                                                        task, 'deploy')
+            next_step_mock.assert_called_once_with(self.deploy,
+                                                   task, 'deploy')
 
     @mock.patch.object(agent_base.HeartbeatMixin,
                        'refresh_steps', autospec=True)
     @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_continue_deploy_second_run(self, rti_mock, cd_mock,
-                                                  deploy_started_mock,
-                                                  in_deploy_mock,
+                       'process_next_step', autospec=True)
+    def test_heartbeat_continue_deploy_second_run(self, next_step_mock,
                                                   refresh_steps_mock):
-        in_deploy_mock.return_value = True
-        deploy_started_mock.return_value = False
         dii = self.node.driver_internal_info
         dii['agent_cached_deploy_steps'] = ['step']
         self.node.driver_internal_info = dii
@@ -145,145 +125,13 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             self.assertEqual(
                 '3.2.0',
                 task.node.driver_internal_info['agent_version'])
-            cd_mock.assert_called_once_with(self.deploy, task)
-            self.assertFalse(rti_mock.called)
             self.assertFalse(refresh_steps_mock.called)
-
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_is_done', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_reboot_to_instance(self, rti_mock, cd_mock,
-                                          deploy_is_done_mock,
-                                          deploy_started_mock,
-                                          in_deploy_mock):
-        in_deploy_mock.return_value = True
-        deploy_started_mock.return_value = True
-        deploy_is_done_mock.return_value = True
-        self.node.provision_state = states.DEPLOYWAIT
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=True) as task:
-            self.deploy.heartbeat(task, 'url', '3.2.0')
-            self.assertIsNone(task.node.last_error)
-            self.assertFalse(task.shared)
-            self.assertEqual(
-                'url', task.node.driver_internal_info['agent_url'])
-            self.assertEqual(
-                '3.2.0',
-                task.node.driver_internal_info['agent_version'])
-            self.assertFalse(cd_mock.called)
-            rti_mock.assert_called_once_with(self.deploy, task)
+            next_step_mock.assert_called_once_with(self.deploy,
+                                                   task, 'deploy')
 
     @mock.patch.object(agent_base.HeartbeatMixin,
                        'process_next_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_is_done', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_not_in_core_deploy_step(self, rti_mock, cd_mock,
-                                               deploy_is_done_mock,
-                                               deploy_started_mock,
-                                               in_deploy_mock,
-                                               process_next_mock):
-        # Check that heartbeats do not trigger deployment actions when not in
-        # the deploy.deploy step.
-        in_deploy_mock.return_value = False
-        self.node.provision_state = states.DEPLOYWAIT
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=True) as task:
-            self.deploy.heartbeat(task, 'url', '3.2.0')
-            self.assertFalse(task.shared)
-            self.assertEqual(
-                'url', task.node.driver_internal_info['agent_url'])
-            self.assertEqual(
-                '3.2.0',
-                task.node.driver_internal_info['agent_version'])
-            self.assertFalse(deploy_started_mock.called)
-            self.assertFalse(deploy_is_done_mock.called)
-            self.assertFalse(cd_mock.called)
-            self.assertFalse(rti_mock.called)
-            process_next_mock.assert_called_once_with(self.deploy,
-                                                      task, 'deploy')
-
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'refresh_steps', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'process_next_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_is_done', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_not_in_core_deploy_step_refresh(self, rti_mock, cd_mock,
-                                                       deploy_is_done_mock,
-                                                       deploy_started_mock,
-                                                       in_deploy_mock,
-                                                       process_next_mock,
-                                                       refresh_steps_mock):
-        # Check loading in-band deploy steps.
-        in_deploy_mock.return_value = False
-        self.node.provision_state = states.DEPLOYWAIT
-        info = self.node.driver_internal_info
-        info.pop('agent_cached_deploy_steps', None)
-        self.node.driver_internal_info = info
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=True) as task:
-            self.deploy.heartbeat(task, 'url', '3.2.0')
-            self.assertFalse(task.shared)
-            self.assertEqual(
-                'url', task.node.driver_internal_info['agent_url'])
-            self.assertEqual(
-                '3.2.0',
-                task.node.driver_internal_info['agent_version'])
-            self.assertFalse(deploy_started_mock.called)
-            self.assertFalse(deploy_is_done_mock.called)
-            self.assertFalse(cd_mock.called)
-            self.assertFalse(rti_mock.called)
-            refresh_steps_mock.assert_called_once_with(self.deploy,
-                                                       task, 'deploy')
-            process_next_mock.assert_called_once_with(self.deploy,
-                                                      task, 'deploy')
-
-    @mock.patch.object(manager_utils,
-                       'notify_conductor_resume_deploy', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_is_done', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_not_in_core_deploy_step_polling(self, rti_mock, cd_mock,
-                                                       deploy_is_done_mock,
-                                                       deploy_started_mock,
-                                                       in_deploy_mock,
-                                                       in_resume_deploy_mock):
-        # Check that heartbeats do not trigger deployment actions when not in
-        # the deploy.deploy step.
-        in_deploy_mock.return_value = False
+    def test_heartbeat_polling(self, next_step_mock):
         self.node.provision_state = states.DEPLOYWAIT
         info = self.node.driver_internal_info
         info['agent_cached_deploy_steps'] = ['step1']
@@ -299,61 +147,14 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             self.assertEqual(
                 '3.2.0',
                 task.node.driver_internal_info['agent_version'])
-            self.assertFalse(deploy_started_mock.called)
-            self.assertFalse(deploy_is_done_mock.called)
-            self.assertFalse(cd_mock.called)
-            self.assertFalse(rti_mock.called)
-            self.assertFalse(in_resume_deploy_mock.called)
+            self.assertFalse(next_step_mock.called)
 
     @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
                        autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_is_done', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    def test_heartbeat_decomposed_steps(self, rti_mock, cd_mock,
-                                        deploy_is_done_mock,
-                                        deploy_started_mock,
-                                        in_deploy_mock,
-                                        next_step_mock):
-        self.deploy.has_decomposed_deploy_steps = True
-        # Check that heartbeats do not trigger deployment actions when the
-        # driver has decomposed deploy steps.
-        self.node.provision_state = states.DEPLOYWAIT
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=True) as task:
-            self.deploy.heartbeat(task, 'url', '3.2.0')
-            self.assertFalse(task.shared)
-            self.assertEqual(
-                'url', task.node.driver_internal_info['agent_url'])
-            self.assertEqual(
-                '3.2.0',
-                task.node.driver_internal_info['agent_version'])
-            self.assertFalse(in_deploy_mock.called)
-            self.assertFalse(deploy_started_mock.called)
-            self.assertFalse(deploy_is_done_mock.called)
-            self.assertFalse(cd_mock.called)
-            self.assertFalse(rti_mock.called)
-            self.assertTrue(next_step_mock.called)
-
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
-                       autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    @mock.patch.object(manager_utils, 'notify_conductor_resume_operation',
-                       autospec=True)
-    def test_heartbeat_in_maintenance(self, ncrc_mock, rti_mock, cd_mock):
+    def test_heartbeat_in_maintenance(self, next_step_mock):
         # NOTE(pas-ha) checking only for states that are not noop
         for state in (states.DEPLOYWAIT, states.CLEANWAIT):
-            for m in (ncrc_mock, rti_mock, cd_mock):
-                m.reset_mock()
+            next_step_mock.reset_mock()
             self.node.provision_state = state
             self.node.maintenance = True
             self.node.save()
@@ -370,25 +171,17 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                     task.node.driver_internal_info['agent_version'])
                 self.assertEqual(state, task.node.provision_state)
                 self.assertIsNone(task.node.last_error)
-            self.assertEqual(0, ncrc_mock.call_count)
-            self.assertEqual(0, rti_mock.call_count)
-            self.assertEqual(0, cd_mock.call_count)
+            next_step_mock.assert_not_called()
 
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
                        autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    @mock.patch.object(manager_utils, 'notify_conductor_resume_operation',
-                       autospec=True)
-    def test_heartbeat_in_maintenance_abort(self, ncrc_mock, rti_mock,
-                                            cd_mock):
+    def test_heartbeat_in_maintenance_abort(self, next_step_mock):
         CONF.set_override('allow_provisioning_in_maintenance', False,
                           group='conductor')
         for state, expected in [(states.DEPLOYWAIT, states.DEPLOYFAIL),
                                 (states.CLEANWAIT, states.CLEANFAIL),
                                 (states.RESCUEWAIT, states.RESCUEFAIL)]:
-            for m in (ncrc_mock, rti_mock, cd_mock):
-                m.reset_mock()
+            next_step_mock.reset_mock()
             self.node.provision_state = state
             self.node.maintenance = True
             self.node.save()
@@ -405,22 +198,15 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             self.node.refresh()
             self.assertEqual(expected, self.node.provision_state)
             self.assertIn('aborted', self.node.last_error)
-            self.assertEqual(0, ncrc_mock.call_count)
-            self.assertEqual(0, rti_mock.call_count)
-            self.assertEqual(0, cd_mock.call_count)
+            next_step_mock.assert_not_called()
 
     @mock.patch('time.sleep', lambda _t: None)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
                        autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    @mock.patch.object(manager_utils, 'notify_conductor_resume_operation',
-                       autospec=True)
-    def test_heartbeat_with_reservation(self, ncrc_mock, rti_mock, cd_mock):
+    def test_heartbeat_with_reservation(self, next_step_mock):
         # NOTE(pas-ha) checking only for states that are not noop
         for state in (states.DEPLOYWAIT, states.CLEANWAIT):
-            for m in (ncrc_mock, rti_mock, cd_mock):
-                m.reset_mock()
+            next_step_mock.reset_mock()
             self.node.provision_state = state
             self.node.reservation = 'localhost'
             self.node.save()
@@ -432,23 +218,16 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                 self.assertTrue(task.shared)
                 self.assertEqual(old_drv_info, task.node.driver_internal_info)
                 self.assertIsNone(task.node.last_error)
-            self.assertEqual(0, ncrc_mock.call_count)
-            self.assertEqual(0, rti_mock.call_count)
-            self.assertEqual(0, cd_mock.call_count)
+            next_step_mock.assert_not_called()
 
     @mock.patch.object(agent_base.LOG, 'error', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
                        autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    @mock.patch.object(manager_utils, 'notify_conductor_resume_operation',
-                       autospec=True)
-    def test_heartbeat_noops_in_wrong_state(self, ncrc_mock, rti_mock,
-                                            cd_mock, log_mock):
+    def test_heartbeat_noops_in_wrong_state(self, next_step_mock, log_mock):
         allowed = {states.DEPLOYWAIT, states.CLEANWAIT, states.RESCUEWAIT,
                    states.DEPLOYING, states.CLEANING, states.RESCUING}
         for state in set(states.machine.states) - allowed:
-            for m in (ncrc_mock, rti_mock, cd_mock, log_mock):
+            for m in (next_step_mock, log_mock):
                 m.reset_mock()
             with task_manager.acquire(self.context, self.node.uuid,
                                       shared=True) as task:
@@ -457,50 +236,33 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                 self.assertTrue(task.shared)
                 self.assertNotIn('agent_last_heartbeat',
                                  task.node.driver_internal_info)
-            self.assertEqual(0, ncrc_mock.call_count)
-            self.assertEqual(0, rti_mock.call_count)
-            self.assertEqual(0, cd_mock.call_count)
+            next_step_mock.assert_not_called()
             log_mock.assert_called_once_with(mock.ANY,
                                              {'node': self.node.uuid,
                                               'state': state})
 
-    @mock.patch.object(agent_base.HeartbeatMixin, 'continue_deploy',
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
                        autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'reboot_to_instance', autospec=True)
-    @mock.patch.object(manager_utils, 'notify_conductor_resume_operation',
-                       autospec=True)
-    def test_heartbeat_noops_in_wrong_state2(self, ncrc_mock, rti_mock,
-                                             cd_mock):
+    def test_heartbeat_noops_in_wrong_state2(self, next_step_mock):
         CONF.set_override('allow_provisioning_in_maintenance', False,
                           group='conductor')
         allowed = {states.DEPLOYWAIT, states.CLEANWAIT}
         for state in set(states.machine.states) - allowed:
-            for m in (ncrc_mock, rti_mock, cd_mock):
-                m.reset_mock()
+            next_step_mock.reset_mock()
             with task_manager.acquire(self.context, self.node.uuid,
                                       shared=True) as task:
                 self.node.provision_state = state
                 self.deploy.heartbeat(task, 'url', '1.0.0')
                 self.assertTrue(task.shared)
-            self.assertEqual(0, ncrc_mock.call_count)
-            self.assertEqual(0, rti_mock.call_count)
-            self.assertEqual(0, cd_mock.call_count)
+            next_step_mock.assert_not_called()
 
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
     @mock.patch.object(deploy_utils, 'set_failed_state', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'deploy_is_done',
-                       autospec=True)
     @mock.patch.object(agent_base.LOG, 'exception', autospec=True)
-    def test_heartbeat_deploy_done_fails(self, log_mock, done_mock,
-                                         failed_mock, deploy_started_mock,
-                                         in_deploy_mock):
-        in_deploy_mock.return_value = True
-        deploy_started_mock.return_value = True
-        done_mock.side_effect = Exception('LlamaException')
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
+                       autospec=True)
+    def test_heartbeat_deploy_fails(self, next_step_mock, log_mock,
+                                    failed_mock):
+        next_step_mock.side_effect = Exception('LlamaException')
         with task_manager.acquire(
                 self.context, self.node['uuid'], shared=False) as task:
             task.node.provision_state = states.DEPLOYWAIT
@@ -510,24 +272,16 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                 task, mock.ANY, collect_logs=True)
             log_mock.assert_called_once_with(
                 'Asynchronous exception for node %(node)s: %(err)s',
-                {'err': 'Failed checking if deploy is done. '
+                {'err': 'Failed to process the next deploy step. '
                  'Error: LlamaException',
                  'node': task.node.uuid})
 
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
     @mock.patch.object(deploy_utils, 'set_failed_state', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin, 'deploy_is_done',
-                       autospec=True)
     @mock.patch.object(agent_base.LOG, 'exception', autospec=True)
-    def test_heartbeat_deploy_done_raises_with_event(self, log_mock, done_mock,
-                                                     failed_mock,
-                                                     deploy_started_mock,
-                                                     in_deploy_mock):
-        in_deploy_mock.return_value = True
-        deploy_started_mock.return_value = True
+    @mock.patch.object(agent_base.HeartbeatMixin, 'process_next_step',
+                       autospec=True)
+    def test_heartbeat_deploy_done_raises_with_event(self, next_step_mock,
+                                                     log_mock, failed_mock):
         with task_manager.acquire(
                 self.context, self.node['uuid'], shared=False) as task:
 
@@ -539,7 +293,7 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
 
             task.node.provision_state = states.DEPLOYWAIT
             task.node.target_provision_state = states.ACTIVE
-            done_mock.side_effect = driver_failure
+            next_step_mock.side_effect = driver_failure
             self.deploy.heartbeat(task, 'http://127.0.0.1:8080', '1.0.0')
             # task.node.provision_state being set to DEPLOYFAIL
             # within the driver_failue, hearbeat should not call
@@ -547,7 +301,7 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             self.assertFalse(failed_mock.called)
             log_mock.assert_called_once_with(
                 'Asynchronous exception for node %(node)s: %(err)s',
-                {'err': 'Failed checking if deploy is done. '
+                {'err': 'Failed to process the next deploy step. '
                  'Error: LlamaException',
                  'node': task.node.uuid})
 
@@ -704,31 +458,6 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
             task, 'Node failed to perform '
             'rescue operation. Error: some failure')
 
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'in_core_deploy_step', autospec=True)
-    @mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
-    @mock.patch.object(agent_base.HeartbeatMixin,
-                       'deploy_has_started', autospec=True)
-    def test_heartbeat_touch_provisioning_and_url_save(self,
-                                                       mock_deploy_started,
-                                                       mock_touch,
-                                                       mock_in_deploy):
-        mock_in_deploy.return_value = True
-        mock_deploy_started.return_value = True
-
-        self.node.provision_state = states.DEPLOYWAIT
-        self.node.save()
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.deploy.heartbeat(task, 'http://127.0.0.1:8080', '3.2.0')
-            self.assertEqual('http://127.0.0.1:8080',
-                             task.node.driver_internal_info['agent_url'])
-            self.assertEqual('3.2.0',
-                             task.node.driver_internal_info['agent_version'])
-            self.assertIsNotNone(
-                task.node.driver_internal_info['agent_last_heartbeat'])
-        mock_touch.assert_called_once_with(mock.ANY)
-
     @mock.patch.object(agent_base.LOG, 'error', autospec=True)
     def test_heartbeat_records_cleaning_deploying(self, log_mock):
         for provision_state in (states.CLEANING, states.DEPLOYING):
@@ -767,28 +496,6 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                     task.node.driver_internal_info['agent_last_heartbeat'])
                 self.assertEqual(provision_state, task.node.provision_state)
 
-    def test_in_core_deploy_step(self):
-        self.node.deploy_step = {
-            'interface': 'deploy', 'step': 'deploy', 'priority': 100}
-        info = self.node.driver_internal_info
-        info['deploy_steps'] = [self.node.deploy_step]
-        self.node.driver_internal_info = info
-        self.node.save()
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertTrue(self.deploy.in_core_deploy_step(task))
-
-    def test_in_core_deploy_step_in_other_step(self):
-        self.node.deploy_step = {
-            'interface': 'deploy', 'step': 'other-step', 'priority': 100}
-        info = self.node.driver_internal_info
-        info['deploy_steps'] = [self.node.deploy_step]
-        self.node.driver_internal_info = info
-        self.node.save()
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertFalse(self.deploy.in_core_deploy_step(task))
-
 
 class AgentRescueTests(AgentDeployMixinBaseTest):
 
diff --git a/ironic/tests/unit/drivers/modules/test_agent_client.py b/ironic/tests/unit/drivers/modules/test_agent_client.py
index fcff80577d..76db6d1bda 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_client.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_client.py
@@ -108,9 +108,9 @@ class TestAgentClient(base.TestCase):
                           self.node)
 
     def test__get_command_body(self):
-        expected = json.dumps({'name': 'prepare_image', 'params': {}})
+        expected = json.dumps({'name': 'get_clean_steps', 'params': {}})
         self.assertEqual(expected,
-                         self.client._get_command_body('prepare_image', {}))
+                         self.client._get_command_body('get_clean_steps', {}))
 
     def test__command(self):
         response_data = {'status': 'ok'}
@@ -464,47 +464,6 @@ class TestAgentClient(base.TestCase):
                 timeout=CONF.agent.command_timeout,
                 verify='/path/to/agent.crt')
 
-    def test_prepare_image(self):
-        self.client._command = mock.MagicMock(spec_set=[])
-        image_info = {'image_id': 'image'}
-        params = {'image_info': image_info}
-
-        self.client.prepare_image(self.node,
-                                  image_info,
-                                  wait=False)
-        self.client._command.assert_called_once_with(
-            node=self.node, method='standby.prepare_image',
-            params=params, poll=False)
-
-    def test_prepare_image_with_configdrive(self):
-        self.client._command = mock.MagicMock(spec_set=[])
-        configdrive_url = 'http://swift/configdrive'
-        self.node.instance_info['configdrive'] = configdrive_url
-        image_info = {'image_id': 'image'}
-        params = {
-            'image_info': image_info,
-            'configdrive': configdrive_url,
-        }
-
-        self.client.prepare_image(self.node,
-                                  image_info,
-                                  wait=False)
-        self.client._command.assert_called_once_with(
-            node=self.node, method='standby.prepare_image',
-            params=params, poll=False)
-
-    def test_prepare_image_with_wait(self):
-        self.client._command = mock.MagicMock(spec_set=[])
-        image_info = {'image_id': 'image'}
-        params = {'image_info': image_info}
-
-        self.client.prepare_image(self.node,
-                                  image_info,
-                                  wait=True)
-        self.client._command.assert_called_once_with(
-            node=self.node, method='standby.prepare_image',
-            params=params, poll=True)
-
     def test_start_iscsi_target(self):
         self.client._command = mock.MagicMock(spec_set=[])
         iqn = 'fake-iqn'
diff --git a/releasenotes/notes/decomposed-steps-9644d3b5ccbad1ea.yaml b/releasenotes/notes/decomposed-steps-9644d3b5ccbad1ea.yaml
new file mode 100644
index 0000000000..647162b1c6
--- /dev/null
+++ b/releasenotes/notes/decomposed-steps-9644d3b5ccbad1ea.yaml
@@ -0,0 +1,7 @@
+---
+upgrade:
+  - |
+    Removes support for deploy interfaces that do not use deploy steps and
+    rely on the monolithic ``deploy`` call instead.
+  - |
+    Removes support for ironic-python-agent Victoria or older.