diff --git a/ironic/conductor/deployments.py b/ironic/conductor/deployments.py
index 20c3c15b66..c04b578d62 100644
--- a/ironic/conductor/deployments.py
+++ b/ironic/conductor/deployments.py
@@ -18,13 +18,11 @@ from ironic_lib import metrics_utils
 from oslo_db import exception as db_exception
 from oslo_log import log
 from oslo_utils import excutils
-from oslo_utils import versionutils
 
 from ironic.common import exception
 from ironic.common.glance_service import service_utils as glance_utils
 from ironic.common.i18n import _
 from ironic.common import images
-from ironic.common import release_mappings as versions
 from ironic.common import states
 from ironic.common import swift
 from ironic.conductor import notification_utils as notify_utils
@@ -38,10 +36,6 @@ LOG = log.getLogger(__name__)
 
 METRICS = metrics_utils.get_metrics_logger(__name__)
 
-# NOTE(rloo) This is used to keep track of deprecation warnings that have
-# already been issued for deploy drivers that do not use deploy steps.
-_SEEN_NO_DEPLOY_STEP_DEPRECATIONS = set()
-
 
 def validate_node(task, event='deploy'):
     """Validate that a node is suitable for deployment/rebuilding.
@@ -202,97 +196,7 @@ def do_node_deploy(task, conductor_id=None, configdrive=None):
                 '%(node)s. Error: %(err)s' % {'node': node.uuid, 'err': e},
                 _("Cannot get deploy steps; failed to deploy: %s") % e)
 
-    steps = node.driver_internal_info.get('deploy_steps', [])
-
-    new_rpc_version = True
-    release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
-    if release_ver:
-        new_rpc_version = versionutils.is_compatible('1.45',
-                                                     release_ver['rpc'])
-
-    if not steps or not new_rpc_version:
-        # TODO(rloo): This if.. (and the above code wrt rpc version)
-        # can be deleted after the deprecation period when we no
-        # longer support drivers with no deploy steps.
-        # Note that after the deprecation period, there needs to be at least
-        # one deploy step. If none, the deployment fails.
-
-        if steps:
-            info = node.driver_internal_info
-            info.pop('deploy_steps')
-            node.driver_internal_info = info
-            node.save()
-
-        # We go back to using the old way, if:
-        # - out-of-tree driver hasn't yet converted to using deploy steps, or
-        # - we're in the middle of a rolling upgrade. This is to prevent the
-        #   corner case of having new conductors with old conductors, and
-        #   a node is deployed with a new conductor (via deploy steps), but
-        #   after the deploy_wait, the node gets handled by an old conductor.
-        #   To avoid this, we need to wait until all the conductors are new,
-        # signalled by the RPC API version being '1.45'.
-        _old_rest_of_do_node_deploy(task, conductor_id, not steps)
-    else:
-        do_next_deploy_step(task, 0, conductor_id)
-
-
-def _old_rest_of_do_node_deploy(task, conductor_id, no_deploy_steps):
-    """The rest of the do_node_deploy() if not using deploy steps.
-
-    To support out-of-tree drivers that have not yet migrated to using
-    deploy steps.
-
-    :param no_deploy_steps: Boolean; True if there are no deploy steps.
-    """
-    # TODO(rloo): This method can be deleted after the deprecation period
-    #             for supporting drivers with no deploy steps.
-
-    if no_deploy_steps:
-        deploy_driver_name = task.driver.deploy.__class__.__name__
-        if deploy_driver_name not in _SEEN_NO_DEPLOY_STEP_DEPRECATIONS:
-            LOG.warning('Deploy driver %s does not support deploy steps; this '
-                        'will be required starting with the Stein release.',
-                        deploy_driver_name)
-            _SEEN_NO_DEPLOY_STEP_DEPRECATIONS.add(deploy_driver_name)
-
-    node = task.node
-    try:
-        new_state = task.driver.deploy.deploy(task)
-    except exception.IronicException as e:
-        with excutils.save_and_reraise_exception():
-            utils.deploying_error_handler(
-                task,
-                ('Error in deploy of node %(node)s: %(err)s' %
-                 {'node': node.uuid, 'err': e}),
-                _("Failed to deploy: %s") % e)
-    except Exception as e:
-        with excutils.save_and_reraise_exception():
-            utils.deploying_error_handler(
-                task,
-                ('Unexpected error while deploying node %(node)s' %
-                 {'node': node.uuid}),
-                _("Failed to deploy. Exception: %s") % e,
-                traceback=True)
-
-    # Update conductor_affinity to reference this conductor's ID
-    # since there may be local persistent state
-    node.conductor_affinity = conductor_id
-
-    # NOTE(deva): Some drivers may return states.DEPLOYWAIT
-    #             eg. if they are waiting for a callback
-    if new_state == states.DEPLOYDONE:
-        _start_console_in_deploy(task)
-        task.process_event('done')
-        LOG.info('Successfully deployed node %(node)s with '
-                 'instance %(instance)s.',
-                 {'node': node.uuid, 'instance': node.instance_uuid})
-    elif new_state == states.DEPLOYWAIT:
-        task.process_event('wait')
-    else:
-        LOG.error('Unexpected state %(state)s returned while '
-                  'deploying node %(node)s.',
-                  {'state': new_state, 'node': node.uuid})
-    node.save()
+    do_next_deploy_step(task, 0, conductor_id)
 
 
 @task_manager.require_exclusive_lock
diff --git a/ironic/drivers/modules/agent_base.py b/ironic/drivers/modules/agent_base.py
index f7975c25ce..5c4f94f046 100644
--- a/ironic/drivers/modules/agent_base.py
+++ b/ironic/drivers/modules/agent_base.py
@@ -334,11 +334,6 @@ class HeartbeatMixin(object):
         :param task: a TaskManager instance
         :returns: True if the current deploy step is deploy.deploy.
         """
-        # TODO(mgoddard): Remove this 'if' in the Train release, after the
-        # deprecation period for supporting drivers with no deploy steps.
-        if not task.node.driver_internal_info.get('deploy_steps'):
-            return True
-
         step = task.node.deploy_step
         return (step
                 and step['interface'] == 'deploy'
@@ -851,15 +846,7 @@ class AgentDeployMixin(HeartbeatMixin):
             # powered off.
             log_and_raise_deployment_error(task, msg, collect_logs=False,
                                            exc=e)
-
-        if not node.deploy_step:
-            # TODO(rloo): delete this 'if' part after deprecation period, when
-            # we expect all (out-of-tree) drivers to support deploy steps.
-            # After which we will always notify_conductor_resume_deploy().
-            task.process_event('done')
-            LOG.info('Deployment to node %s done', task.node.uuid)
-        else:
-            manager_utils.notify_conductor_resume_deploy(task)
+        manager_utils.notify_conductor_resume_deploy(task)
 
     @METRICS.timer('AgentDeployMixin.prepare_instance_to_boot')
     def prepare_instance_to_boot(self, task, root_uuid, efi_sys_uuid,
diff --git a/ironic/drivers/modules/ansible/deploy.py b/ironic/drivers/modules/ansible/deploy.py
index db2a1e4183..4a1a478d8f 100644
--- a/ironic/drivers/modules/ansible/deploy.py
+++ b/ironic/drivers/modules/ansible/deploy.py
@@ -608,14 +608,7 @@ class AnsibleDeploy(agent_base.HeartbeatMixin, base.DeployInterface):
         self.reboot_and_finish_deploy(task)
         task.driver.boot.clean_up_ramdisk(task)
 
-        if not node.deploy_step:
-            # TODO(rloo): delete this 'if' part after deprecation period, when
-            # we expect all (out-of-tree) drivers to support deploy steps.
-            # After which we will always notify_conductor_resume_deploy().
-            task.process_event('done')
-            LOG.info('Deployment to node %s done', task.node.uuid)
-        else:
-            manager_utils.notify_conductor_resume_deploy(task)
+        manager_utils.notify_conductor_resume_deploy(task)
 
     @METRICS.timer('AnsibleDeploy.reboot_and_finish_deploy')
     def reboot_and_finish_deploy(self, task):
diff --git a/ironic/tests/unit/conductor/test_deployments.py b/ironic/tests/unit/conductor/test_deployments.py
index 6d71adfe6b..b81b529a3c 100644
--- a/ironic/tests/unit/conductor/test_deployments.py
+++ b/ironic/tests/unit/conductor/test_deployments.py
@@ -86,59 +86,6 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertTrue(mock_prepare.called)
         self.assertFalse(mock_deploy.called)
 
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_driver_raises_error_old(self, mock_deploy):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        self._start_service()
-        # test when driver.deploy.deploy raises an ironic error
-        mock_deploy.side_effect = exception.InstanceDeployFailure('test')
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
-                                          provision_state=states.DEPLOYING,
-                                          target_provision_state=states.ACTIVE)
-        task = task_manager.TaskManager(self.context, node.uuid)
-
-        self.assertRaises(exception.InstanceDeployFailure,
-                          deployments.do_node_deploy, task,
-                          self.service.conductor.id)
-        node.refresh()
-        self.assertEqual(states.DEPLOYFAIL, node.provision_state)
-        # NOTE(deva): failing a deploy does not clear the target state
-        #             any longer. Instead, it is cleared when the instance
-        #             is deleted.
-        self.assertEqual(states.ACTIVE, node.target_provision_state)
-        self.assertIsNotNone(node.last_error)
-        mock_deploy.assert_called_once_with(mock.ANY)
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_driver_unexpected_exception_old(self,
-                                                             mock_deploy):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        self._start_service()
-        # test when driver.deploy.deploy raises an exception
-        mock_deploy.side_effect = RuntimeError('test')
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
-                                          provision_state=states.DEPLOYING,
-                                          target_provision_state=states.ACTIVE)
-        task = task_manager.TaskManager(self.context, node.uuid)
-
-        self.assertRaises(RuntimeError,
-                          deployments.do_node_deploy, task,
-                          self.service.conductor.id)
-        node.refresh()
-        self.assertEqual(states.DEPLOYFAIL, node.provision_state)
-        # NOTE(deva): failing a deploy does not clear the target state
-        #             any longer. Instead, it is cleared when the instance
-        #             is deleted.
-        self.assertEqual(states.ACTIVE, node.target_provision_state)
-        self.assertIsNotNone(node.last_error)
-        mock_deploy.assert_called_once_with(mock.ANY)
-
     def _test__do_node_deploy_driver_exception(self, exc, unexpected=False):
         self._start_service()
         with mock.patch.object(fake.FakeDeploy,
@@ -174,55 +121,6 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self._test__do_node_deploy_driver_exception(RuntimeError('test'),
                                                     unexpected=True)
 
-    @mock.patch.object(deployments, '_store_configdrive', autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok_old(self, mock_deploy, mock_store):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        self._start_service()
-        # test when driver.deploy.deploy returns DEPLOYDONE
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
-                                          provision_state=states.DEPLOYING,
-                                          target_provision_state=states.ACTIVE)
-        task = task_manager.TaskManager(self.context, node.uuid)
-
-        deployments.do_node_deploy(task, self.service.conductor.id)
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        self.assertIsNone(node.last_error)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        # assert _store_configdrive wasn't invoked
-        self.assertFalse(mock_store.called)
-
-    @mock.patch.object(deployments, '_store_configdrive', autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok_configdrive_old(self, mock_deploy, mock_store):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        self._start_service()
-        # test when driver.deploy.deploy returns DEPLOYDONE
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
-                                          provision_state=states.DEPLOYING,
-                                          target_provision_state=states.ACTIVE)
-        task = task_manager.TaskManager(self.context, node.uuid)
-        configdrive = 'foo'
-
-        deployments.do_node_deploy(task, self.service.conductor.id,
-                                   configdrive=configdrive)
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        self.assertIsNone(node.last_error)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_store.assert_called_once_with(task.node, configdrive)
-
     @mock.patch.object(deployments, '_store_configdrive', autospec=True)
     def _test__do_node_deploy_ok(self, mock_store, configdrive=None,
                                  expected_configdrive=None):
@@ -396,28 +294,6 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertIsNotNone(node.last_error)
         self.assertFalse(mock_prepare.called)
 
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok_2_old(self, mock_deploy):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        # NOTE(rloo): a different way of testing for the same thing as in
-        # test__do_node_deploy_ok()
-        self._start_service()
-        # test when driver.deploy.deploy returns DEPLOYDONE
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-
-        deployments.do_node_deploy(task, self.service.conductor.id)
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        self.assertIsNone(node.last_error)
-        mock_deploy.assert_called_once_with(mock.ANY)
-
     def test__do_node_deploy_ok_2(self):
         # NOTE(rloo): a different way of testing for the same thing as in
         # test__do_node_deploy_ok(). Instead of specifying the provision &
@@ -441,33 +317,9 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
             mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
 
     @mock.patch.object(deployments, 'do_next_deploy_step', autospec=True)
-    @mock.patch.object(deployments, '_old_rest_of_do_node_deploy',
-                       autospec=True)
     @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
                        autospec=True)
-    def test_do_node_deploy_deprecated(self, mock_set_steps, mock_old_way,
-                                       mock_deploy_step):
-        # TODO(rloo): no deploy steps; delete this when we remove support
-        # for handling no deploy steps.
-        self._start_service()
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-
-        deployments.do_node_deploy(task, self.service.conductor.id)
-        mock_set_steps.assert_called_once_with(task)
-        mock_old_way.assert_called_once_with(task, self.service.conductor.id,
-                                             True)
-        self.assertFalse(mock_deploy_step.called)
-        self.assertNotIn('deploy_steps', task.node.driver_internal_info)
-
-    @mock.patch.object(deployments, 'do_next_deploy_step', autospec=True)
-    @mock.patch.object(deployments, '_old_rest_of_do_node_deploy',
-                       autospec=True)
-    @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
-                       autospec=True)
-    def test_do_node_deploy_steps(self, mock_set_steps, mock_old_way,
-                                  mock_deploy_step):
+    def test_do_node_deploy_steps(self, mock_set_steps, mock_deploy_step):
         # these are not real steps...
         fake_deploy_steps = ['step1', 'step2']
 
@@ -485,113 +337,10 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
 
         deployments.do_node_deploy(task, self.service.conductor.id)
         mock_set_steps.assert_called_once_with(task)
-        self.assertFalse(mock_old_way.called)
         mock_set_steps.assert_called_once_with(task)
         self.assertEqual(fake_deploy_steps,
                          task.node.driver_internal_info['deploy_steps'])
 
-    @mock.patch.object(deployments, 'do_next_deploy_step', autospec=True)
-    @mock.patch.object(deployments, '_old_rest_of_do_node_deploy',
-                       autospec=True)
-    @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
-                       autospec=True)
-    def test_do_node_deploy_steps_old_rpc(self, mock_set_steps, mock_old_way,
-                                          mock_deploy_step):
-        # TODO(rloo): old RPC; delete this when we remove support for drivers
-        # with no deploy steps.
-        CONF.set_override('pin_release_version', '11.0')
-        # these are not real steps...
-        fake_deploy_steps = ['step1', 'step2']
-
-        def add_steps(task):
-            info = task.node.driver_internal_info
-            info['deploy_steps'] = fake_deploy_steps
-            task.node.driver_internal_info = info
-            task.node.save()
-
-        mock_set_steps.side_effect = add_steps
-        self._start_service()
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-
-        deployments.do_node_deploy(task, self.service.conductor.id)
-        mock_set_steps.assert_called_once_with(task)
-        mock_old_way.assert_called_once_with(task, self.service.conductor.id,
-                                             False)
-        self.assertFalse(mock_deploy_step.called)
-        self.assertNotIn('deploy_steps', task.node.driver_internal_info)
-
-    @mock.patch.object(deployments, '_SEEN_NO_DEPLOY_STEP_DEPRECATIONS',
-                       autospec=True)
-    @mock.patch.object(deployments, 'LOG', autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy', autospec=True)
-    def test__old_rest_of_do_node_deploy_no_steps(self, mock_deploy, mock_log,
-                                                  mock_deprecate):
-        # TODO(rloo): no deploy steps; delete this when we remove support
-        # for handling no deploy steps.
-        mock_deprecate.__contains__.side_effect = [False, True]
-        self._start_service()
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-
-        deployments._old_rest_of_do_node_deploy(
-            task, self.service.conductor.id, True)
-        mock_deploy.assert_called_once_with(mock.ANY, task)
-        self.assertTrue(mock_log.warning.called)
-        self.assertEqual(self.service.conductor.id,
-                         task.node.conductor_affinity)
-        mock_deprecate.__contains__.assert_called_once_with('FakeDeploy')
-        mock_deprecate.add.assert_called_once_with('FakeDeploy')
-
-        # Make sure the deprecation warning isn't logged again
-        mock_log.reset_mock()
-        mock_deprecate.add.reset_mock()
-        deployments._old_rest_of_do_node_deploy(
-            task, self.service.conductor.id, True)
-        self.assertFalse(mock_log.warning.called)
-        mock_deprecate.__contains__.assert_has_calls(
-            [mock.call('FakeDeploy'), mock.call('FakeDeploy')])
-        self.assertFalse(mock_deprecate.add.called)
-
-    @mock.patch.object(deployments, 'LOG', autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy', autospec=True)
-    def test__old_rest_of_do_node_deploy_has_steps(self, mock_deploy,
-                                                   mock_log):
-        # TODO(rloo): has steps but old RPC; delete this when we remove support
-        # for handling no deploy steps.
-        deployments._SEEN_NO_DEPLOY_STEP_DEPRECATIONS = set()
-        self._start_service()
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-
-        deployments._old_rest_of_do_node_deploy(
-            task, self.service.conductor.id, False)
-        mock_deploy.assert_called_once_with(mock.ANY, task)
-        self.assertFalse(mock_log.warning.called)
-        self.assertEqual(self.service.conductor.id,
-                         task.node.conductor_affinity)
-
-    @mock.patch('ironic.conductor.deployments._start_console_in_deploy',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy', autospec=True)
-    def test__old_rest_of_do_node_deploy_console(self, mock_deploy,
-                                                 mock_console):
-        self._start_service()
-        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
-        task = task_manager.TaskManager(self.context, node.uuid)
-        task.process_event('deploy')
-        mock_deploy.return_value = states.DEPLOYDONE
-
-        deployments._old_rest_of_do_node_deploy(
-            task, self.service.conductor.id, True)
-        mock_deploy.assert_called_once_with(mock.ANY, task)
-        mock_console.assert_called_once_with(task)
-        self.assertEqual(self.service.conductor.id,
-                         task.node.conductor_affinity)
-
 
 @mgr_utils.mock_record_keepalive
 class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index eba6766ec2..285317f31b 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -1431,157 +1431,6 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
             self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
             self.assertNotIn('agent_url', node.driver_internal_info)
 
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state_old(self, mock_deploy,
-                                                     mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        # This tests manager._old_rest_of_do_node_deploy(), the 'else' path of
-        # 'if new_state == states.DEPLOYDONE'. The node's states
-        # aren't changed in this case.
-        mock_iwdi.return_value = True
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYING
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.ACTIVE,
-            target_provision_state=states.NOSTATE,
-            instance_info={'image_source': uuidutils.generate_uuid(),
-                           'kernel': 'aaaa', 'ramdisk': 'bbbb'},
-            driver_internal_info={'is_whole_disk_image': False})
-
-        self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.DEPLOYING, node.provision_state)
-        self.assertEqual(states.ACTIVE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        # Verify instance_info values has been cleared.
-        self.assertNotIn('kernel', node.instance_info)
-        self.assertNotIn('ramdisk', node.instance_info)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        # Verify is_whole_disk_image reflects correct value on rebuild.
-        self.assertTrue(node.driver_internal_info['is_whole_disk_image'])
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state_waiting_old(self, mock_deploy,
-                                                             mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        mock_iwdi.return_value = False
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYWAIT
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.ACTIVE,
-            target_provision_state=states.NOSTATE,
-            instance_info={'image_source': uuidutils.generate_uuid()})
-
-        self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.DEPLOYWAIT, node.provision_state)
-        self.assertEqual(states.ACTIVE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state_done_old(self, mock_deploy,
-                                                          mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        mock_iwdi.return_value = False
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.ACTIVE,
-            target_provision_state=states.NOSTATE)
-
-        self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_deployfail_state_old(self, mock_deploy,
-                                                         mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        mock_iwdi.return_value = False
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.DEPLOYFAIL,
-            target_provision_state=states.NOSTATE)
-
-        self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_error_state_old(self, mock_deploy,
-                                                    mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        mock_iwdi.return_value = False
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.ERROR,
-            target_provision_state=states.NOSTATE)
-
-        self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
-
     def test_do_node_deploy_rebuild_active_state_error(self, mock_iwdi):
         # Tests manager.do_node_deploy() & deployments.do_next_deploy_step(),
         # when getting an unexpected state returned from a deploy_step.
@@ -1589,10 +1438,8 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self._start_service()
         # NOTE(rloo): We have to mock this here as opposed to using a
         # decorator. With a decorator, when initialization is done, the
-        # mocked deploy() method isn't considered a deploy step, causing
-        # manager._old_rest_of_do_node_deploy() to be run instead of
-        # deployments.do_next_deploy_step(). So we defer mock'ing until after
-        # the init is done.
+        # mocked deploy() method isn't considered a deploy step. So we defer
+        # mock'ing until after the init is done.
         with mock.patch.object(fake.FakeDeploy,
                                'deploy', autospec=True) as mock_deploy:
             mock_deploy.return_value = states.DEPLOYING
@@ -1626,10 +1473,8 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self._start_service()
         # NOTE(rloo): We have to mock this here as opposed to using a
         # decorator. With a decorator, when initialization is done, the
-        # mocked deploy() method isn't considered a deploy step, causing
-        # manager._old_rest_of_do_node_deploy() to be run instead of
-        # deployments.do_next_deploy_step(). So we defer mock'ing until after
-        # the init is done.
+        # mocked deploy() method isn't considered a deploy step. So we defer
+        # mock'ing until after the init is done.
         with mock.patch.object(fake.FakeDeploy,
                                'deploy', autospec=True) as mock_deploy:
             mock_deploy.return_value = states.DEPLOYWAIT
@@ -1658,10 +1503,8 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self._start_service()
         # NOTE(rloo): We have to mock this here as opposed to using a
         # decorator. With a decorator, when initialization is done, the
-        # mocked deploy() method isn't considered a deploy step, causing
-        # manager._old_rest_of_do_node_deploy() to be run instead of
-        # deployments.do_next_deploy_step(). So we defer mock'ing until after
-        # the init is done.
+        # mocked deploy() method isn't considered a deploy step. So we defer
+        # mock'ing until after the init is done.
         with mock.patch.object(fake.FakeDeploy,
                                'deploy', autospec=True) as mock_deploy:
             mock_deploy.return_value = None
@@ -1689,10 +1532,8 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self._start_service()
         # NOTE(rloo): We have to mock this here as opposed to using a
         # decorator. With a decorator, when initialization is done, the
-        # mocked deploy() method isn't considered a deploy step, causing
-        # manager._old_rest_of_do_node_deploy() to be run instead of
-        # deployments.do_next_deploy_step(). So we defer mock'ing until after
-        # the init is done.
+        # mocked deploy() method isn't considered a deploy step. So we defer
+        # mock'ing until after the init is done.
         with mock.patch.object(fake.FakeDeploy,
                                'deploy', autospec=True) as mock_deploy:
             mock_deploy.return_value = None
@@ -1720,10 +1561,8 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self._start_service()
         # NOTE(rloo): We have to mock this here as opposed to using a
         # decorator. With a decorator, when initialization is done, the
-        # mocked deploy() method isn't considered a deploy step, causing
-        # manager._old_rest_of_do_node_deploy() to be run instead of
-        # deployments.do_next_deploy_step(). So we defer mock'ing until after
-        # the init is done.
+        # mocked deploy() method isn't considered a deploy step. So we defer
+        # mock'ing until after the init is done.
         with mock.patch.object(fake.FakeDeploy,
                                'deploy', autospec=True) as mock_deploy:
             mock_deploy.return_value = None
@@ -1812,41 +1651,6 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
             mock_iwdi.assert_called_once_with(self.context, node.instance_info)
             self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    @mock.patch('ironic.conductor.utils.remove_agent_url')
-    @mock.patch('ironic.conductor.utils.is_fast_track')
-    def test_do_node_deploy_fast_track(self, mock_is_fast_track,
-                                       mock_remove_agent_url,
-                                       mock_deploy, mock_iwdi):
-        # TODO(rloo): delete this after the deprecation period for supporting
-        # non deploy_steps.
-        # Mocking FakeDeploy.deploy before starting the service, causes
-        # it not to be a deploy_step.
-        mock_iwdi.return_value = False
-        mock_is_fast_track.return_value = True
-        self._start_service()
-        mock_deploy.return_value = states.DEPLOYDONE
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            driver_internal_info={'agent_url': 'meow'},
-            provision_state=states.AVAILABLE,
-            target_provision_state=states.NOSTATE)
-
-        self.service.do_node_deploy(self.context, node.uuid)
-        self._stop_service()
-        node.refresh()
-        self.assertEqual(states.ACTIVE, node.provision_state)
-        self.assertEqual(states.NOSTATE, node.target_provision_state)
-        # last_error should be None.
-        self.assertIsNone(node.last_error)
-        # Verify reservation has been cleared.
-        self.assertIsNone(node.reservation)
-        mock_deploy.assert_called_once_with(mock.ANY)
-        mock_iwdi.assert_called_once_with(self.context, node.instance_info)
-        self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
-        mock_is_fast_track.assert_called_once_with(mock.ANY)
-        mock_remove_agent_url.assert_not_called()
-
 
 @mgr_utils.mock_record_keepalive
 class ContinueNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
diff --git a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
index 8787848ffd..95741d02e4 100644
--- a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
+++ b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
@@ -997,29 +997,6 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
                 task.driver.boot.clean_up_ramdisk.assert_called_once_with(
                     task)
 
-    @mock.patch.object(utils, 'notify_conductor_resume_deploy', autospec=True)
-    @mock.patch.object(utils, 'node_set_boot_device', autospec=True)
-    def test_reboot_to_instance_deprecated(self, bootdev_mock, resume_mock):
-        # TODO(rloo): no deploy steps; delete this when we remove support
-        # for handling no deploy steps.
-        self.node.provision_state = states.DEPLOYING
-        self.node.deploy_step = None
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            with mock.patch.object(self.driver, 'reboot_and_finish_deploy',
-                                   autospec=True):
-                task.driver.boot = mock.Mock()
-                task.process_event = mock.Mock()
-                self.driver.reboot_to_instance(task)
-                bootdev_mock.assert_called_once_with(task, 'disk',
-                                                     persistent=True)
-                self.assertFalse(resume_mock.called)
-                task.process_event.assert_called_once_with('done')
-                self.driver.reboot_and_finish_deploy.assert_called_once_with(
-                    task)
-                task.driver.boot.clean_up_ramdisk.assert_called_once_with(
-                    task)
-
     @mock.patch.object(utils, 'restore_power_state_if_needed', autospec=True)
     @mock.patch.object(utils, 'power_on_node_if_needed')
     @mock.patch.object(utils, 'node_power_action', autospec=True)
diff --git a/ironic/tests/unit/drivers/modules/test_agent.py b/ironic/tests/unit/drivers/modules/test_agent.py
index 7a8575e74b..81790a28ae 100644
--- a/ironic/tests/unit/drivers/modules/test_agent.py
+++ b/ironic/tests/unit/drivers/modules/test_agent.py
@@ -1231,6 +1231,8 @@ class TestAgentDeploy(db_base.DbTestCase):
             self.assertEqual(states.ACTIVE,
                              task.node.target_provision_state)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
                        autospec=True)
     @mock.patch.object(deploy_utils, 'remove_http_instance_symlink',
@@ -1251,7 +1253,7 @@ class TestAgentDeploy(db_base.DbTestCase):
                                 prepare_instance_mock, power_off_mock,
                                 get_power_state_mock, node_power_action_mock,
                                 uuid_mock, log_mock, remove_symlink_mock,
-                                power_on_node_if_needed_mock):
+                                power_on_node_if_needed_mock, resume_mock):
         self.config(manage_agent_boot=True, group='agent')
         self.config(image_download_source='http', group='agent')
         check_deploy_mock.return_value = None
@@ -1278,10 +1280,13 @@ class TestAgentDeploy(db_base.DbTestCase):
             get_power_state_mock.assert_called_once_with(task)
             node_power_action_mock.assert_called_once_with(
                 task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertTrue(remove_symlink_mock.called)
+            resume_mock.assert_called_once_with(task)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
                        autospec=True)
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
@@ -1300,7 +1305,8 @@ class TestAgentDeploy(db_base.DbTestCase):
     def test_reboot_to_instance_no_manage_agent_boot(
             self, check_deploy_mock, prepare_instance_mock, power_off_mock,
             get_power_state_mock, node_power_action_mock, uuid_mock,
-            bootdev_mock, log_mock, power_on_node_if_needed_mock):
+            bootdev_mock, log_mock, power_on_node_if_needed_mock,
+            resume_mock):
         self.config(manage_agent_boot=False, group='agent')
         check_deploy_mock.return_value = None
         uuid_mock.return_value = None
@@ -1324,9 +1330,12 @@ class TestAgentDeploy(db_base.DbTestCase):
             get_power_state_mock.assert_called_once_with(task)
             node_power_action_mock.assert_called_once_with(
                 task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
+            resume_mock.assert_called_once_with(task)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
                        autospec=True)
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
@@ -1350,7 +1359,8 @@ class TestAgentDeploy(db_base.DbTestCase):
                                                 node_power_action_mock,
                                                 uuid_mock, boot_mode_mock,
                                                 log_mock,
-                                                power_on_node_if_needed_mock):
+                                                power_on_node_if_needed_mock,
+                                                resume_mock):
         check_deploy_mock.return_value = None
         uuid_mock.return_value = 'root_uuid'
         self.node.provision_state = states.DEPLOYWAIT
@@ -1381,9 +1391,12 @@ class TestAgentDeploy(db_base.DbTestCase):
             get_power_state_mock.assert_called_once_with(task)
             node_power_action_mock.assert_called_once_with(
                 task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
+            resume_mock.assert_called_once_with(task)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
                        autospec=True)
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
@@ -1404,7 +1417,7 @@ class TestAgentDeploy(db_base.DbTestCase):
             self, check_deploy_mock, prepare_instance_mock,
             power_off_mock, get_power_state_mock,
             node_power_action_mock, uuid_mock, boot_mode_mock, log_mock,
-            power_on_node_if_needed_mock):
+            power_on_node_if_needed_mock, resume_mock):
         check_deploy_mock.return_value = None
         uuid_mock.side_effect = ['root_uuid', 'prep_boot_part_uuid']
         self.node.provision_state = states.DEPLOYWAIT
@@ -1442,8 +1455,8 @@ class TestAgentDeploy(db_base.DbTestCase):
             get_power_state_mock.assert_called_once_with(task)
             node_power_action_mock.assert_called_once_with(
                 task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
 
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
@@ -1480,6 +1493,8 @@ class TestAgentDeploy(db_base.DbTestCase):
             self.assertEqual(states.DEPLOYFAIL, task.node.provision_state)
             self.assertEqual(states.ACTIVE, task.node.target_provision_state)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
                        autospec=True)
     @mock.patch.object(agent.LOG, 'warning', spec_set=True, autospec=True)
@@ -1503,7 +1518,8 @@ class TestAgentDeploy(db_base.DbTestCase):
                                           node_power_action_mock,
                                           uuid_mock, boot_mode_mock,
                                           log_mock,
-                                          power_on_node_if_needed_mock):
+                                          power_on_node_if_needed_mock,
+                                          resume_mock):
         check_deploy_mock.return_value = None
         uuid_mock.side_effect = ['root_uuid', 'efi_uuid']
         self.node.provision_state = states.DEPLOYWAIT
@@ -1538,8 +1554,9 @@ class TestAgentDeploy(db_base.DbTestCase):
             get_power_state_mock.assert_called_once_with(task)
             node_power_action_mock.assert_called_once_with(
                 task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
+            resume_mock.assert_called_once_with(task)
 
     @mock.patch.object(agent_client.AgentClient, 'get_commands_status',
                        autospec=True)
diff --git a/ironic/tests/unit/drivers/modules/test_agent_base.py b/ironic/tests/unit/drivers/modules/test_agent_base.py
index 6927391f22..de8c671521 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_base.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_base.py
@@ -651,13 +651,6 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
                 self.context, self.node.uuid, shared=False) as task:
             self.assertTrue(self.deploy.in_core_deploy_step(task))
 
-    def test_in_core_deploy_step_no_steps_list(self):
-        # Need to handle drivers without deploy step support, remove in the
-        # Train release.
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertTrue(self.deploy.in_core_deploy_step(task))
-
     def test_in_core_deploy_step_in_other_step(self):
         self.node.deploy_step = {
             'interface': 'deploy', 'step': 'other-step', 'priority': 100}
@@ -784,8 +777,6 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
         cfg.CONF.set_override('deploy_logs_collect', 'always', 'agent')
         self.node.provision_state = states.DEPLOYING
         self.node.target_provision_state = states.ACTIVE
-        self.node.deploy_step = {
-            'step': 'deploy', 'priority': 50, 'interface': 'deploy'}
         self.node.save()
         with task_manager.acquire(self.context, self.node.uuid,
                                   shared=True) as task:
@@ -810,41 +801,6 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(time, 'sleep', lambda seconds: None)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
-    @mock.patch.object(fake.FakePower, 'get_power_state',
-                       spec=types.FunctionType)
-    @mock.patch.object(agent_client.AgentClient, 'power_off',
-                       spec=types.FunctionType)
-    def test_reboot_and_finish_deploy_deprecated(
-            self, power_off_mock, get_power_state_mock,
-            node_power_action_mock, collect_mock, resume_mock,
-            power_on_node_if_needed_mock):
-        # TODO(rloo): no deploy steps; delete this when we remove support
-        # for handling no deploy steps.
-        cfg.CONF.set_override('deploy_logs_collect', 'always', 'agent')
-        self.node.provision_state = states.DEPLOYING
-        self.node.target_provision_state = states.ACTIVE
-        self.node.deploy_step = None
-        self.node.save()
-        with task_manager.acquire(self.context, self.node.uuid,
-                                  shared=True) as task:
-            get_power_state_mock.side_effect = [states.POWER_ON,
-                                                states.POWER_OFF]
-            power_on_node_if_needed_mock.return_value = None
-            self.deploy.reboot_and_finish_deploy(task)
-            power_off_mock.assert_called_once_with(task.node)
-            self.assertEqual(2, get_power_state_mock.call_count)
-            node_power_action_mock.assert_called_once_with(
-                task, states.POWER_ON)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
-            collect_mock.assert_called_once_with(task.node)
-            self.assertFalse(resume_mock.called)
-
-    @mock.patch.object(manager_utils, 'power_on_node_if_needed',
-                       autospec=True)
-    @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
-    @mock.patch.object(time, 'sleep', lambda seconds: None)
-    @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
     @mock.patch.object(fake.FakePower, 'get_power_state',
                        spec=types.FunctionType)
     @mock.patch.object(agent_client.AgentClient, 'power_off',
@@ -856,7 +812,7 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
     def test_reboot_and_finish_deploy_soft_poweroff_doesnt_complete(
             self, configure_tenant_net_mock, remove_provisioning_net_mock,
             power_off_mock, get_power_state_mock,
-            node_power_action_mock, mock_collect,
+            node_power_action_mock, mock_collect, resume_mock,
             power_on_node_if_needed_mock):
         self.node.provision_state = states.DEPLOYING
         self.node.target_provision_state = states.ACTIVE
@@ -874,10 +830,13 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
             remove_provisioning_net_mock.assert_called_once_with(mock.ANY,
                                                                  task)
             configure_tenant_net_mock.assert_called_once_with(mock.ANY, task)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertFalse(mock_collect.called)
+            resume_mock.assert_called_once_with(task)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
     @mock.patch.object(agent_client.AgentClient, 'power_off',
@@ -888,7 +847,7 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
                 'configure_tenant_networks', spec_set=True, autospec=True)
     def test_reboot_and_finish_deploy_soft_poweroff_fails(
             self, configure_tenant_net_mock, remove_provisioning_net_mock,
-            power_off_mock, node_power_action_mock, mock_collect):
+            power_off_mock, node_power_action_mock, mock_collect, resume_mock):
         power_off_mock.side_effect = RuntimeError("boom")
         self.node.provision_state = states.DEPLOYING
         self.node.target_provision_state = states.ACTIVE
@@ -903,11 +862,13 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
             remove_provisioning_net_mock.assert_called_once_with(mock.ANY,
                                                                  task)
             configure_tenant_net_mock.assert_called_once_with(mock.ANY, task)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertFalse(mock_collect.called)
 
     @mock.patch.object(manager_utils, 'power_on_node_if_needed')
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(time, 'sleep', lambda seconds: None)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
@@ -922,7 +883,7 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
     def test_reboot_and_finish_deploy_get_power_state_fails(
             self, configure_tenant_net_mock, remove_provisioning_net_mock,
             power_off_mock, get_power_state_mock, node_power_action_mock,
-            mock_collect, power_on_node_if_needed_mock):
+            mock_collect, resume_mock, power_on_node_if_needed_mock):
         self.node.provision_state = states.DEPLOYING
         self.node.target_provision_state = states.ACTIVE
         self.node.save()
@@ -939,8 +900,8 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
             remove_provisioning_net_mock.assert_called_once_with(mock.ANY,
                                                                  task)
             configure_tenant_net_mock.assert_called_once_with(mock.ANY, task)
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertFalse(mock_collect.called)
 
     @mock.patch.object(manager_utils, 'power_on_node_if_needed',
@@ -1049,12 +1010,15 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
             self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertFalse(mock_collect.called)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
     @mock.patch.object(agent_client.AgentClient, 'sync',
                        spec=types.FunctionType)
     def test_reboot_and_finish_deploy_power_action_oob_power_off(
-            self, sync_mock, node_power_action_mock, mock_collect):
+            self, sync_mock, node_power_action_mock, mock_collect,
+            resume_mock):
         # Enable force power off
         driver_info = self.node.driver_info
         driver_info['deploy_forces_oob_reboot'] = True
@@ -1072,17 +1036,21 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
                 mock.call(task, states.POWER_OFF),
                 mock.call(task, states.POWER_ON),
             ])
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertFalse(mock_collect.called)
+            resume_mock.assert_called_once_with(task)
 
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(agent_base.LOG, 'warning', autospec=True)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
     @mock.patch.object(agent_client.AgentClient, 'sync',
                        spec=types.FunctionType)
     def test_reboot_and_finish_deploy_power_action_oob_power_off_failed(
-            self, sync_mock, node_power_action_mock, log_mock, mock_collect):
+            self, sync_mock, node_power_action_mock, log_mock, mock_collect,
+            resume_mock):
         # Enable force power off
         driver_info = self.node.driver_info
         driver_info['deploy_forces_oob_reboot'] = True
@@ -1101,8 +1069,8 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
                 mock.call(task, states.POWER_OFF),
                 mock.call(task, states.POWER_ON),
             ])
-            self.assertEqual(states.ACTIVE, task.node.provision_state)
-            self.assertEqual(states.NOSTATE, task.node.target_provision_state)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             log_error = ('The version of the IPA ramdisk used in the '
                          'deployment do not support the command "sync"')
             log_mock.assert_called_once_with(
diff --git a/releasenotes/notes/deploy-steps-required-aa72cdf1c0ec0e84.yaml b/releasenotes/notes/deploy-steps-required-aa72cdf1c0ec0e84.yaml
new file mode 100644
index 0000000000..a9a120d8da
--- /dev/null
+++ b/releasenotes/notes/deploy-steps-required-aa72cdf1c0ec0e84.yaml
@@ -0,0 +1,4 @@
+---
+upgrade:
+  - |
+    Removes compatibility with deploy interfaces that do not use deploy steps.