From 56acabfab4c8f24065aa7e109a39b9cca2f7f2a3 Mon Sep 17 00:00:00 2001
From: Mark Goddard <mark@stackhpc.com>
Date: Fri, 8 Mar 2019 13:43:39 +0000
Subject: [PATCH] Deploy Templates: factor out ironic.conductor.steps

There is enough steps code in conductor.utils to warrant a separate
module.

Change-Id: I0126e860210bbc56991876f26e64d81d3d7d5c08
Story: 1722275
Task: 29902
---
 ironic/api/controllers/v1/deploy_template.py  |   4 +-
 ironic/api/controllers/v1/node.py             |   4 +-
 ironic/conductor/manager.py                   |   9 +-
 ironic/conductor/steps.py                     | 601 +++++++++++++++
 ironic/conductor/utils.py                     | 578 --------------
 ironic/drivers/base.py                        |   4 +-
 ironic/drivers/modules/agent_base_vendor.py   |   5 +-
 ironic/drivers/modules/ansible/deploy.py      |   3 +-
 ironic/tests/unit/conductor/test_manager.py   |  17 +-
 ironic/tests/unit/conductor/test_steps.py     | 724 ++++++++++++++++++
 ironic/tests/unit/conductor/test_utils.py     | 698 -----------------
 .../drivers/modules/ansible/test_deploy.py    |   7 +-
 .../drivers/modules/test_agent_base_vendor.py |  13 +-
 13 files changed, 1363 insertions(+), 1304 deletions(-)
 create mode 100644 ironic/conductor/steps.py
 create mode 100644 ironic/tests/unit/conductor/test_steps.py

diff --git a/ironic/api/controllers/v1/deploy_template.py b/ironic/api/controllers/v1/deploy_template.py
index 4ac6866ac8..29c1e279b5 100644
--- a/ironic/api/controllers/v1/deploy_template.py
+++ b/ironic/api/controllers/v1/deploy_template.py
@@ -33,7 +33,7 @@ from ironic.api.controllers.v1 import utils as api_utils
 from ironic.api import expose
 from ironic.common import exception
 from ironic.common.i18n import _
-from ironic.conductor import utils as conductor_utils
+from ironic.conductor import steps as conductor_steps
 import ironic.conf
 from ironic import objects
 
@@ -44,7 +44,7 @@ METRICS = metrics_utils.get_metrics_logger(__name__)
 _DEFAULT_RETURN_FIELDS = ('uuid', 'name')
 
 _DEPLOY_INTERFACE_TYPE = wtypes.Enum(
-    wtypes.text, *conductor_utils.DEPLOYING_INTERFACE_PRIORITY)
+    wtypes.text, *conductor_steps.DEPLOYING_INTERFACE_PRIORITY)
 
 
 class DeployStepType(wtypes.Base, base.AsDictMixin):
diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py
index 1dd9e2e36c..68efff8319 100644
--- a/ironic/api/controllers/v1/node.py
+++ b/ironic/api/controllers/v1/node.py
@@ -43,7 +43,7 @@ from ironic.common import exception
 from ironic.common.i18n import _
 from ironic.common import policy
 from ironic.common import states as ir_states
-from ironic.conductor import utils as conductor_utils
+from ironic.conductor import steps as conductor_steps
 import ironic.conf
 from ironic import objects
 
@@ -63,7 +63,7 @@ _CLEAN_STEPS_SCHEMA = {
         "properties": {
             "interface": {
                 "description": "driver interface",
-                "enum": list(conductor_utils.CLEANING_INTERFACE_PRIORITY)
+                "enum": list(conductor_steps.CLEANING_INTERFACE_PRIORITY)
                 # interface value must be one of the valid interfaces
             },
             "step": {
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 0d32eadaf9..ba4c971069 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -69,6 +69,7 @@ from ironic.common import swift
 from ironic.conductor import allocations
 from ironic.conductor import base_manager
 from ironic.conductor import notification_utils as notify_utils
+from ironic.conductor import steps as conductor_steps
 from ironic.conductor import task_manager
 from ironic.conductor import utils
 from ironic.conf import CONF
@@ -853,7 +854,7 @@ class ConductorManager(base_manager.BaseConductorManager):
                 task.driver.power.validate(task)
                 task.driver.deploy.validate(task)
                 utils.validate_instance_info_traits(task.node)
-                utils.validate_deploy_templates(task)
+                conductor_steps.validate_deploy_templates(task)
             except exception.InvalidParameterValue as e:
                 raise exception.InstanceDeployFailure(
                     _("Failed to validate deploy or power info for node "
@@ -1338,7 +1339,7 @@ class ConductorManager(base_manager.BaseConductorManager):
             return
 
         try:
-            utils.set_node_cleaning_steps(task)
+            conductor_steps.set_node_cleaning_steps(task)
         except (exception.InvalidParameterValue,
                 exception.NodeCleaningFailure) as e:
             msg = (_('Cannot clean node %(node)s. Error: %(msg)s')
@@ -2205,7 +2206,7 @@ class ConductorManager(base_manager.BaseConductorManager):
                     iface.validate(task)
                     if iface_name == 'deploy':
                         utils.validate_instance_info_traits(task.node)
-                        utils.validate_deploy_templates(task)
+                        conductor_steps.validate_deploy_templates(task)
                     result = True
                 except (exception.InvalidParameterValue,
                         exception.UnsupportedDriverExtension) as e:
@@ -3692,7 +3693,7 @@ def do_node_deploy(task, conductor_id=None, configdrive=None):
     try:
         # This gets the deploy steps (if any) and puts them in the node's
         # driver_internal_info['deploy_steps'].
-        utils.set_node_deployment_steps(task)
+        conductor_steps.set_node_deployment_steps(task)
     except exception.InstanceDeployFailure as e:
         with excutils.save_and_reraise_exception():
             utils.deploying_error_handler(
diff --git a/ironic/conductor/steps.py b/ironic/conductor/steps.py
new file mode 100644
index 0000000000..490daaf984
--- /dev/null
+++ b/ironic/conductor/steps.py
@@ -0,0 +1,601 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+
+from oslo_config import cfg
+from oslo_log import log
+
+from ironic.common import exception
+from ironic.common.i18n import _
+from ironic.common import states
+from ironic.objects import deploy_template
+
+LOG = log.getLogger(__name__)
+CONF = cfg.CONF
+
+
+CLEANING_INTERFACE_PRIORITY = {
+    # When two clean steps have the same priority, their order is determined
+    # by which interface is implementing the clean step. The clean step of the
+    # interface with the highest value here, will be executed first in that
+    # case.
+    'power': 5,
+    'management': 4,
+    'deploy': 3,
+    'bios': 2,
+    'raid': 1,
+}
+
+DEPLOYING_INTERFACE_PRIORITY = {
+    # When two deploy steps have the same priority, their order is determined
+    # by which interface is implementing the step. The step of the interface
+    # with the highest value here, will be executed first in that case.
+    # TODO(rloo): If we think it makes sense to have the interface priorities
+    # the same for cleaning & deploying, replace the two with one e.g.
+    # 'INTERFACE_PRIORITIES'.
+    'power': 5,
+    'management': 4,
+    'deploy': 3,
+    'bios': 2,
+    'raid': 1,
+}
+
+
+def _clean_step_key(step):
+    """Sort by priority, then interface priority in event of tie.
+
+    :param step: cleaning step dict to get priority for.
+    """
+    return (step.get('priority'),
+            CLEANING_INTERFACE_PRIORITY[step.get('interface')])
+
+
+def _deploy_step_key(step):
+    """Sort by priority, then interface priority in event of tie.
+
+    :param step: deploy step dict to get priority for.
+    """
+    return (step.get('priority'),
+            DEPLOYING_INTERFACE_PRIORITY[step.get('interface')])
+
+
+def _sorted_steps(steps, sort_step_key):
+    """Return a sorted list of steps.
+
+    :param sort_step_key: If set, this is a method (key) used to sort the steps
+        from highest priority to lowest priority. For steps having the same
+        priority, they are sorted from highest interface priority to lowest.
+    :returns: A list of sorted step dictionaries.
+    """
+    # Sort the steps from higher priority to lower priority
+    return sorted(steps, key=sort_step_key, reverse=True)
+
+
+def _get_steps(task, interfaces, get_method, enabled=False,
+               sort_step_key=None):
+    """Get steps for task.node.
+
+    :param task: A TaskManager object
+    :param interfaces: A dictionary of (key) interfaces and their
+        (value) priorities. These are the interfaces that will have steps of
+        interest. The priorities are used for deciding the priorities of steps
+        having the same priority.
+    :param get_method: The method used to get the steps from the node's
+        interface; a string.
+    :param enabled: If True, returns only enabled (priority > 0) steps. If
+        False, returns all steps.
+    :param sort_step_key: If set, this is a method (key) used to sort the steps
+        from highest priority to lowest priority. For steps having the same
+        priority, they are sorted from highest interface priority to lowest.
+    :raises: NodeCleaningFailure or InstanceDeployFailure if there was a
+        problem getting the steps.
+    :returns: A list of step dictionaries
+    """
+    # Get steps from each interface
+    steps = list()
+    for interface in interfaces:
+        interface = getattr(task.driver, interface)
+        if interface:
+            interface_steps = [x for x in getattr(interface, get_method)(task)
+                               if not enabled or x['priority'] > 0]
+            steps.extend(interface_steps)
+    if sort_step_key:
+        steps = _sorted_steps(steps, sort_step_key)
+    return steps
+
+
+def _get_cleaning_steps(task, enabled=False, sort=True):
+    """Get cleaning steps for task.node.
+
+    :param task: A TaskManager object
+    :param enabled: If True, returns only enabled (priority > 0) steps. If
+        False, returns all clean steps.
+    :param sort: If True, the steps are sorted from highest priority to lowest
+        priority. For steps having the same priority, they are sorted from
+        highest interface priority to lowest.
+    :raises: NodeCleaningFailure if there was a problem getting the
+        clean steps.
+    :returns: A list of clean step dictionaries
+    """
+    sort_key = _clean_step_key if sort else None
+    return _get_steps(task, CLEANING_INTERFACE_PRIORITY, 'get_clean_steps',
+                      enabled=enabled, sort_step_key=sort_key)
+
+
+def _get_deployment_steps(task, enabled=False, sort=True):
+    """Get deployment steps for task.node.
+
+    :param task: A TaskManager object
+    :param enabled: If True, returns only enabled (priority > 0) steps. If
+        False, returns all deploy steps.
+    :param sort: If True, the steps are sorted from highest priority to lowest
+        priority. For steps having the same priority, they are sorted from
+        highest interface priority to lowest.
+    :raises: InstanceDeployFailure if there was a problem getting the
+        deploy steps.
+    :returns: A list of deploy step dictionaries
+    """
+    sort_key = _deploy_step_key if sort else None
+    return _get_steps(task, DEPLOYING_INTERFACE_PRIORITY, 'get_deploy_steps',
+                      enabled=enabled, sort_step_key=sort_key)
+
+
+def set_node_cleaning_steps(task):
+    """Set up the node with clean step information for cleaning.
+
+    For automated cleaning, get the clean steps from the driver.
+    For manual cleaning, the user's clean steps are known but need to be
+    validated against the driver's clean steps.
+
+    :raises: InvalidParameterValue if there is a problem with the user's
+             clean steps.
+    :raises: NodeCleaningFailure if there was a problem getting the
+             clean steps.
+    """
+    node = task.node
+    driver_internal_info = node.driver_internal_info
+
+    # For manual cleaning, the target provision state is MANAGEABLE, whereas
+    # for automated cleaning, it is AVAILABLE.
+    manual_clean = node.target_provision_state == states.MANAGEABLE
+
+    if not manual_clean:
+        # Get the prioritized steps for automated cleaning
+        driver_internal_info['clean_steps'] = _get_cleaning_steps(task,
+                                                                  enabled=True)
+    else:
+        # For manual cleaning, the list of cleaning steps was specified by the
+        # user and already saved in node.driver_internal_info['clean_steps'].
+        # Now that we know what the driver's available clean steps are, we can
+        # do further checks to validate the user's clean steps.
+        steps = node.driver_internal_info['clean_steps']
+        driver_internal_info['clean_steps'] = (
+            _validate_user_clean_steps(task, steps))
+
+    node.clean_step = {}
+    driver_internal_info['clean_step_index'] = None
+    node.driver_internal_info = driver_internal_info
+    node.save()
+
+
+def _get_deployment_templates(task):
+    """Get deployment templates for task.node.
+
+    Return deployment templates where the name of the deployment template
+    matches one of the node's instance traits (the subset of the node's traits
+    requested by the user via a flavor or image).
+
+    :param task: A TaskManager object
+    :returns: a list of DeployTemplate objects.
+    """
+    node = task.node
+    if not node.instance_info.get('traits'):
+        return []
+    instance_traits = node.instance_info['traits']
+    return deploy_template.DeployTemplate.list_by_names(task.context,
+                                                        instance_traits)
+
+
+def _get_steps_from_deployment_templates(task, templates):
+    """Get deployment template steps for task.node.
+
+    Given a list of deploy template objects, return a list of all deploy steps
+    combined.
+
+    :param task: A TaskManager object
+    :param templates: a list of deploy templates
+    :returns: A list of deploy step dictionaries
+    """
+    steps = []
+    # NOTE(mgoddard): The steps from the object include id, created_at, etc.,
+    # which we don't want to include when we assign them to
+    # node.driver_internal_info. Include only the relevant fields.
+    step_fields = ('interface', 'step', 'args', 'priority')
+    for template in templates:
+        steps.extend([{key: step[key] for key in step_fields}
+                      for step in template.steps])
+    return steps
+
+
+def _get_validated_steps_from_templates(task):
+    """Return a list of validated deploy steps from deploy templates.
+
+    Deployment template steps are those steps defined in deployment templates
+    where the name of the deployment template matches one of the node's
+    instance traits (the subset of the node's traits requested by the user via
+    a flavor or image). There may be many such matching templates, each with a
+    list of steps to execute.
+
+    This method gathers the steps from all matching deploy templates for a
+    node, and validates those steps against the node's driver interfaces,
+    raising an error if validation fails.
+
+    :param task: A TaskManager object
+    :raises: InvalidParameterValue if validation of steps fails.
+    :raises: InstanceDeployFailure if there was a problem getting the
+        deploy steps.
+    :returns: A list of validated deploy step dictionaries
+    """
+    # Gather deploy templates matching the node's instance traits.
+    templates = _get_deployment_templates(task)
+
+    # Gather deploy steps from deploy templates.
+    user_steps = _get_steps_from_deployment_templates(task, templates)
+
+    # Validate the steps.
+    error_prefix = (_('Validation of deploy steps from deploy templates '
+                      'matching this node\'s instance traits failed. Matching '
+                      'deploy templates: %(templates)s. Errors: ') %
+                    {'templates': ','.join(t.name for t in templates)})
+    return _validate_user_deploy_steps(task, user_steps,
+                                       error_prefix=error_prefix)
+
+
+def _get_all_deployment_steps(task):
+    """Get deployment steps for task.node.
+
+    Deployment steps from matching deployment templates are combined with those
+    from driver interfaces and all enabled steps returned in priority order.
+
+    :param task: A TaskManager object
+    :raises: InstanceDeployFailure if there was a problem getting the
+        deploy steps.
+    :returns: A list of deploy step dictionaries
+    """
+    # Gather deploy steps from deploy templates and validate.
+    # NOTE(mgoddard): although we've probably just validated the templates in
+    # do_node_deploy, they may have changed in the DB since we last checked, so
+    # validate again.
+    user_steps = _get_validated_steps_from_templates(task)
+
+    # Gather enabled deploy steps from drivers.
+    driver_steps = _get_deployment_steps(task, enabled=True, sort=False)
+
+    # Remove driver steps that have been disabled or overridden by user steps.
+    user_step_keys = {(s['interface'], s['step']) for s in user_steps}
+    steps = [s for s in driver_steps
+             if (s['interface'], s['step']) not in user_step_keys]
+
+    # Add enabled user steps.
+    enabled_user_steps = [s for s in user_steps if s['priority'] > 0]
+    steps.extend(enabled_user_steps)
+
+    return _sorted_steps(steps, _deploy_step_key)
+
+
+def set_node_deployment_steps(task):
+    """Set up the node with deployment step information for deploying.
+
+    Get the deploy steps from the driver.
+
+    :raises: InstanceDeployFailure if there was a problem getting the
+             deployment steps.
+    """
+    node = task.node
+    driver_internal_info = node.driver_internal_info
+    driver_internal_info['deploy_steps'] = _get_all_deployment_steps(task)
+    node.deploy_step = {}
+    driver_internal_info['deploy_step_index'] = None
+    node.driver_internal_info = driver_internal_info
+    node.save()
+
+
+def _step_id(step):
+    """Return the 'ID' of a deploy step.
+
+    The ID is a string, <interface>.<step>.
+
+    :param step: the step dictionary.
+    :return: the step's ID string.
+    """
+    return '.'.join([step['interface'], step['step']])
+
+
+def _validate_deploy_steps_unique(user_steps):
+    """Validate that deploy steps from deploy templates are unique.
+
+    :param user_steps: a list of user steps. A user step is a dictionary
+        with required keys 'interface', 'step', 'args', and 'priority'::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_step>,
+                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
+                'priority': <priority_of_step> }
+
+        For example::
+
+              { 'interface': deploy',
+                'step': 'upgrade_firmware',
+                'args': {'force': True},
+                'priority': 10 }
+
+    :return: a list of validation error strings for the steps.
+    """
+    # Check for duplicate steps. Each interface/step combination can be
+    # specified at most once.
+    errors = []
+    counter = collections.Counter(_step_id(step) for step in user_steps)
+    duplicates = {step_id for step_id, count in counter.items() if count > 1}
+    if duplicates:
+        err = (_('deploy steps from all deploy templates matching this '
+                 'node\'s instance traits cannot have the same interface '
+                 'and step. Duplicate deploy steps for %(duplicates)s') %
+               {'duplicates': ', '.join(duplicates)})
+        errors.append(err)
+    return errors
+
+
+def _validate_user_step(task, user_step, driver_step, step_type):
+    """Validate a user-specified step.
+
+    :param task: A TaskManager object
+    :param user_step: a user step dictionary with required keys 'interface'
+        and 'step', and optional keys 'args' and 'priority'::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_step>,
+                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
+                'priority': <optional_priority> }
+
+        For example::
+
+              { 'interface': deploy',
+                'step': 'upgrade_firmware',
+                'args': {'force': True} }
+
+    :param driver_step: a driver step dictionary::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_step>,
+                'priority': <integer>
+                'abortable': Optional for clean steps, absent for deploy steps.
+                             <Boolean>.
+                'argsinfo': Optional. A dictionary of
+                            {<arg_name>:<arg_info_dict>} entries.
+                            <arg_info_dict> is a dictionary with
+                            { 'description': <description>,
+                              'required': <Boolean> } }
+
+        For example::
+
+              { 'interface': deploy',
+                'step': 'upgrade_firmware',
+                'priority': 10,
+                'abortable': True,
+                'argsinfo': {
+                    'force': { 'description': 'Whether to force the upgrade',
+                               'required': False } } }
+
+    :param step_type: either 'clean' or 'deploy'.
+    :return: a list of validation error strings for the step.
+    """
+    errors = []
+    # Check that the user-specified arguments are valid
+    argsinfo = driver_step.get('argsinfo') or {}
+    user_args = user_step.get('args') or {}
+    unexpected = set(user_args) - set(argsinfo)
+    if unexpected:
+        error = (_('%(type)s step %(step)s has these unexpected arguments: '
+                   '%(unexpected)s') %
+                 {'type': step_type, 'step': user_step,
+                  'unexpected': ', '.join(unexpected)})
+        errors.append(error)
+
+    if step_type == 'clean' or user_step['priority'] > 0:
+        # Check that all required arguments were specified by the user
+        missing = []
+        for (arg_name, arg_info) in argsinfo.items():
+            if arg_info.get('required', False) and arg_name not in user_args:
+                msg = arg_name
+                if arg_info.get('description'):
+                    msg += ' (%(desc)s)' % {'desc': arg_info['description']}
+                missing.append(msg)
+        if missing:
+            error = (_('%(type)s step %(step)s is missing these required '
+                       'arguments: %(miss)s') %
+                     {'type': step_type, 'step': user_step,
+                      'miss': ', '.join(missing)})
+            errors.append(error)
+
+    if step_type == 'clean':
+        # Copy fields that should not be provided by a user
+        user_step['abortable'] = driver_step.get('abortable', False)
+        user_step['priority'] = driver_step.get('priority', 0)
+    elif user_step['priority'] > 0:
+        # 'core' deploy steps can only be disabled.
+
+        # NOTE(mgoddard): we'll need something a little more sophisticated to
+        # track core steps once we split out the single core step.
+        is_core = (driver_step['interface'] == 'deploy' and
+                   driver_step['step'] == 'deploy')
+        if is_core:
+            error = (_('deploy step %(step)s on interface %(interface)s is a '
+                       'core step and cannot be overridden by user steps. It '
+                       'may be disabled by setting the priority to 0') %
+                     {'step': user_step['step'],
+                      'interface': user_step['interface']})
+            errors.append(error)
+
+    return errors
+
+
+def _validate_user_steps(task, user_steps, driver_steps, step_type,
+                         error_prefix=None):
+    """Validate the user-specified steps.
+
+    :param task: A TaskManager object
+    :param user_steps: a list of user steps. A user step is a dictionary
+        with required keys 'interface' and 'step', and optional keys 'args'
+        and 'priority'::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_step>,
+                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
+                'priority': <optional_priority> }
+
+        For example::
+
+              { 'interface': deploy',
+                'step': 'upgrade_firmware',
+                'args': {'force': True} }
+
+    :param driver_steps: a list of driver steps::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_step>,
+                'priority': <integer>
+                'abortable': Optional for clean steps, absent for deploy steps.
+                             <Boolean>.
+                'argsinfo': Optional. A dictionary of
+                            {<arg_name>:<arg_info_dict>} entries.
+                            <arg_info_dict> is a dictionary with
+                            { 'description': <description>,
+                              'required': <Boolean> } }
+
+        For example::
+
+              { 'interface': deploy',
+                'step': 'upgrade_firmware',
+                'priority': 10,
+                'abortable': True,
+                'argsinfo': {
+                    'force': { 'description': 'Whether to force the upgrade',
+                               'required': False } } }
+
+    :param step_type: either 'clean' or 'deploy'.
+    :param error_prefix: String to use as a prefix for exception messages, or
+        None.
+    :raises: InvalidParameterValue if validation of steps fails.
+    :raises: NodeCleaningFailure or InstanceDeployFailure if
+        there was a problem getting the steps from the driver.
+    :return: validated steps updated with information from the driver
+    """
+
+    errors = []
+
+    # Convert driver steps to a dict.
+    driver_steps = {_step_id(s): s for s in driver_steps}
+
+    for user_step in user_steps:
+        # Check if this user-specified step isn't supported by the driver
+        try:
+            driver_step = driver_steps[_step_id(user_step)]
+        except KeyError:
+            error = (_('node does not support this %(type)s step: %(step)s')
+                     % {'type': step_type, 'step': user_step})
+            errors.append(error)
+            continue
+
+        step_errors = _validate_user_step(task, user_step, driver_step,
+                                          step_type)
+        errors.extend(step_errors)
+
+    if step_type == 'deploy':
+        # Deploy steps should be unique across all combined templates.
+        dup_errors = _validate_deploy_steps_unique(user_steps)
+        errors.extend(dup_errors)
+
+    if errors:
+        err = error_prefix or ''
+        err += '; '.join(errors)
+        raise exception.InvalidParameterValue(err=err)
+
+    return user_steps
+
+
+def _validate_user_clean_steps(task, user_steps):
+    """Validate the user-specified clean steps.
+
+    :param task: A TaskManager object
+    :param user_steps: a list of clean steps. A clean step is a dictionary
+        with required keys 'interface' and 'step', and optional key 'args'::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_clean_step>,
+                'args': {<arg1>: <value1>, ..., <argn>: <valuen>} }
+
+            For example::
+
+              { 'interface': 'deploy',
+                'step': 'upgrade_firmware',
+                'args': {'force': True} }
+    :raises: InvalidParameterValue if validation of clean steps fails.
+    :raises: NodeCleaningFailure if there was a problem getting the
+        clean steps from the driver.
+    :return: validated clean steps update with information from the driver
+    """
+    driver_steps = _get_cleaning_steps(task, enabled=False, sort=False)
+    return _validate_user_steps(task, user_steps, driver_steps, 'clean')
+
+
+def _validate_user_deploy_steps(task, user_steps, error_prefix=None):
+    """Validate the user-specified deploy steps.
+
+    :param task: A TaskManager object
+    :param user_steps: a list of deploy steps. A deploy step is a dictionary
+        with required keys 'interface', 'step', 'args', and 'priority'::
+
+              { 'interface': <driver_interface>,
+                'step': <name_of_deploy_step>,
+                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
+                'priority': <priority_of_deploy_step> }
+
+            For example::
+
+              { 'interface': 'bios',
+                'step': 'apply_configuration',
+                'args': { 'settings': [ { 'foo': 'bar' } ] },
+                'priority': 150 }
+    :param error_prefix: String to use as a prefix for exception messages, or
+        None.
+    :raises: InvalidParameterValue if validation of deploy steps fails.
+    :raises: InstanceDeployFailure if there was a problem getting the deploy
+        steps from the driver.
+    :return: validated deploy steps update with information from the driver
+    """
+    driver_steps = _get_deployment_steps(task, enabled=False, sort=False)
+    return _validate_user_steps(task, user_steps, driver_steps, 'deploy',
+                                error_prefix=error_prefix)
+
+
+def validate_deploy_templates(task):
+    """Validate the deploy templates for a node.
+
+    :param task: A TaskManager object
+    :raises: InvalidParameterValue if the instance has traits that map to
+        deploy steps that are unsupported by the node's driver interfaces.
+    :raises: InstanceDeployFailure if there was a problem getting the deploy
+        steps from the driver.
+    """
+    # Gather deploy steps from matching deploy templates and validate them.
+    _get_validated_steps_from_templates(task)
diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py
index cd16449f60..88d9b3814c 100644
--- a/ironic/conductor/utils.py
+++ b/ironic/conductor/utils.py
@@ -12,7 +12,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import collections
 import datetime
 import time
 
@@ -33,38 +32,11 @@ from ironic.common import network
 from ironic.common import states
 from ironic.conductor import notification_utils as notify_utils
 from ironic.conductor import task_manager
-from ironic.objects import deploy_template
 from ironic.objects import fields
 
 LOG = log.getLogger(__name__)
 CONF = cfg.CONF
 
-CLEANING_INTERFACE_PRIORITY = {
-    # When two clean steps have the same priority, their order is determined
-    # by which interface is implementing the clean step. The clean step of the
-    # interface with the highest value here, will be executed first in that
-    # case.
-    'power': 5,
-    'management': 4,
-    'deploy': 3,
-    'bios': 2,
-    'raid': 1,
-}
-
-DEPLOYING_INTERFACE_PRIORITY = {
-    # When two deploy steps have the same priority, their order is determined
-    # by which interface is implementing the step. The step of the interface
-    # with the highest value here, will be executed first in that case.
-    # TODO(rloo): If we think it makes sense to have the interface priorities
-    # the same for cleaning & deploying, replace the two with one e.g.
-    # 'INTERFACE_PRIORITIES'.
-    'power': 5,
-    'management': 4,
-    'deploy': 3,
-    'bios': 2,
-    'raid': 1,
-}
-
 
 @task_manager.require_exclusive_lock
 def node_set_boot_device(task, device, persistent=False):
@@ -632,543 +604,6 @@ def power_state_error_handler(e, node, power_state):
                     {'node': node.uuid, 'power_state': power_state})
 
 
-def _clean_step_key(step):
-    """Sort by priority, then interface priority in event of tie.
-
-    :param step: cleaning step dict to get priority for.
-    """
-    return (step.get('priority'),
-            CLEANING_INTERFACE_PRIORITY[step.get('interface')])
-
-
-def _deploy_step_key(step):
-    """Sort by priority, then interface priority in event of tie.
-
-    :param step: deploy step dict to get priority for.
-    """
-    return (step.get('priority'),
-            DEPLOYING_INTERFACE_PRIORITY[step.get('interface')])
-
-
-def _sorted_steps(steps, sort_step_key):
-    """Return a sorted list of steps.
-
-    :param sort_step_key: If set, this is a method (key) used to sort the steps
-        from highest priority to lowest priority. For steps having the same
-        priority, they are sorted from highest interface priority to lowest.
-    :returns: A list of sorted step dictionaries.
-    """
-    # Sort the steps from higher priority to lower priority
-    return sorted(steps, key=sort_step_key, reverse=True)
-
-
-def _get_steps(task, interfaces, get_method, enabled=False,
-               sort_step_key=None):
-    """Get steps for task.node.
-
-    :param task: A TaskManager object
-    :param interfaces: A dictionary of (key) interfaces and their
-        (value) priorities. These are the interfaces that will have steps of
-        interest. The priorities are used for deciding the priorities of steps
-        having the same priority.
-    :param get_method: The method used to get the steps from the node's
-        interface; a string.
-    :param enabled: If True, returns only enabled (priority > 0) steps. If
-        False, returns all steps.
-    :param sort_step_key: If set, this is a method (key) used to sort the steps
-        from highest priority to lowest priority. For steps having the same
-        priority, they are sorted from highest interface priority to lowest.
-    :raises: NodeCleaningFailure or InstanceDeployFailure if there was a
-        problem getting the steps.
-    :returns: A list of step dictionaries
-    """
-    # Get steps from each interface
-    steps = list()
-    for interface in interfaces:
-        interface = getattr(task.driver, interface)
-        if interface:
-            interface_steps = [x for x in getattr(interface, get_method)(task)
-                               if not enabled or x['priority'] > 0]
-            steps.extend(interface_steps)
-    if sort_step_key:
-        steps = _sorted_steps(steps, sort_step_key)
-    return steps
-
-
-def _get_cleaning_steps(task, enabled=False, sort=True):
-    """Get cleaning steps for task.node.
-
-    :param task: A TaskManager object
-    :param enabled: If True, returns only enabled (priority > 0) steps. If
-        False, returns all clean steps.
-    :param sort: If True, the steps are sorted from highest priority to lowest
-        priority. For steps having the same priority, they are sorted from
-        highest interface priority to lowest.
-    :raises: NodeCleaningFailure if there was a problem getting the
-        clean steps.
-    :returns: A list of clean step dictionaries
-    """
-    sort_key = _clean_step_key if sort else None
-    return _get_steps(task, CLEANING_INTERFACE_PRIORITY, 'get_clean_steps',
-                      enabled=enabled, sort_step_key=sort_key)
-
-
-def _get_deployment_steps(task, enabled=False, sort=True):
-    """Get deployment steps for task.node.
-
-    :param task: A TaskManager object
-    :param enabled: If True, returns only enabled (priority > 0) steps. If
-        False, returns all deploy steps.
-    :param sort: If True, the steps are sorted from highest priority to lowest
-        priority. For steps having the same priority, they are sorted from
-        highest interface priority to lowest.
-    :raises: InstanceDeployFailure if there was a problem getting the
-        deploy steps.
-    :returns: A list of deploy step dictionaries
-    """
-    sort_key = _deploy_step_key if sort else None
-    return _get_steps(task, DEPLOYING_INTERFACE_PRIORITY, 'get_deploy_steps',
-                      enabled=enabled, sort_step_key=sort_key)
-
-
-def set_node_cleaning_steps(task):
-    """Set up the node with clean step information for cleaning.
-
-    For automated cleaning, get the clean steps from the driver.
-    For manual cleaning, the user's clean steps are known but need to be
-    validated against the driver's clean steps.
-
-    :raises: InvalidParameterValue if there is a problem with the user's
-             clean steps.
-    :raises: NodeCleaningFailure if there was a problem getting the
-             clean steps.
-    """
-    node = task.node
-    driver_internal_info = node.driver_internal_info
-
-    # For manual cleaning, the target provision state is MANAGEABLE, whereas
-    # for automated cleaning, it is AVAILABLE.
-    manual_clean = node.target_provision_state == states.MANAGEABLE
-
-    if not manual_clean:
-        # Get the prioritized steps for automated cleaning
-        driver_internal_info['clean_steps'] = _get_cleaning_steps(task,
-                                                                  enabled=True)
-    else:
-        # For manual cleaning, the list of cleaning steps was specified by the
-        # user and already saved in node.driver_internal_info['clean_steps'].
-        # Now that we know what the driver's available clean steps are, we can
-        # do further checks to validate the user's clean steps.
-        steps = node.driver_internal_info['clean_steps']
-        driver_internal_info['clean_steps'] = (
-            _validate_user_clean_steps(task, steps))
-
-    node.clean_step = {}
-    driver_internal_info['clean_step_index'] = None
-    node.driver_internal_info = driver_internal_info
-    node.save()
-
-
-def _get_deployment_templates(task):
-    """Get deployment templates for task.node.
-
-    Return deployment templates where the name of the deployment template
-    matches one of the node's instance traits (the subset of the node's traits
-    requested by the user via a flavor or image).
-
-    :param task: A TaskManager object
-    :returns: a list of DeployTemplate objects.
-    """
-    node = task.node
-    if not node.instance_info.get('traits'):
-        return []
-    instance_traits = node.instance_info['traits']
-    return deploy_template.DeployTemplate.list_by_names(task.context,
-                                                        instance_traits)
-
-
-def _get_steps_from_deployment_templates(task, templates):
-    """Get deployment template steps for task.node.
-
-    Given a list of deploy template objects, return a list of all deploy steps
-    combined.
-
-    :param task: A TaskManager object
-    :param templates: a list of deploy templates
-    :returns: A list of deploy step dictionaries
-    """
-    steps = []
-    # NOTE(mgoddard): The steps from the object include id, created_at, etc.,
-    # which we don't want to include when we assign them to
-    # node.driver_internal_info. Include only the relevant fields.
-    step_fields = ('interface', 'step', 'args', 'priority')
-    for template in templates:
-        steps.extend([{key: step[key] for key in step_fields}
-                      for step in template.steps])
-    return steps
-
-
-def _get_validated_steps_from_templates(task):
-    """Return a list of validated deploy steps from deploy templates.
-
-    Deployment template steps are those steps defined in deployment templates
-    where the name of the deployment template matches one of the node's
-    instance traits (the subset of the node's traits requested by the user via
-    a flavor or image). There may be many such matching templates, each with a
-    list of steps to execute.
-
-    This method gathers the steps from all matching deploy templates for a
-    node, and validates those steps against the node's driver interfaces,
-    raising an error if validation fails.
-
-    :param task: A TaskManager object
-    :raises: InvalidParameterValue if validation of steps fails.
-    :raises: InstanceDeployFailure if there was a problem getting the
-        deploy steps.
-    :returns: A list of validated deploy step dictionaries
-    """
-    # Gather deploy templates matching the node's instance traits.
-    templates = _get_deployment_templates(task)
-
-    # Gather deploy steps from deploy templates.
-    user_steps = _get_steps_from_deployment_templates(task, templates)
-
-    # Validate the steps.
-    error_prefix = (_('Validation of deploy steps from deploy templates '
-                      'matching this node\'s instance traits failed. Matching '
-                      'deploy templates: %(templates)s. Errors: ') %
-                    {'templates': ','.join(t.name for t in templates)})
-    return _validate_user_deploy_steps(task, user_steps,
-                                       error_prefix=error_prefix)
-
-
-def _get_all_deployment_steps(task):
-    """Get deployment steps for task.node.
-
-    Deployment steps from matching deployment templates are combined with those
-    from driver interfaces and all enabled steps returned in priority order.
-
-    :param task: A TaskManager object
-    :raises: InstanceDeployFailure if there was a problem getting the
-        deploy steps.
-    :returns: A list of deploy step dictionaries
-    """
-    # Gather deploy steps from deploy templates and validate.
-    # NOTE(mgoddard): although we've probably just validated the templates in
-    # do_node_deploy, they may have changed in the DB since we last checked, so
-    # validate again.
-    user_steps = _get_validated_steps_from_templates(task)
-
-    # Gather enabled deploy steps from drivers.
-    driver_steps = _get_deployment_steps(task, enabled=True, sort=False)
-
-    # Remove driver steps that have been disabled or overridden by user steps.
-    user_step_keys = {(s['interface'], s['step']) for s in user_steps}
-    steps = [s for s in driver_steps
-             if (s['interface'], s['step']) not in user_step_keys]
-
-    # Add enabled user steps.
-    enabled_user_steps = [s for s in user_steps if s['priority'] > 0]
-    steps.extend(enabled_user_steps)
-
-    return _sorted_steps(steps, _deploy_step_key)
-
-
-def set_node_deployment_steps(task):
-    """Set up the node with deployment step information for deploying.
-
-    Get the deploy steps from the driver.
-
-    :raises: InstanceDeployFailure if there was a problem getting the
-             deployment steps.
-    """
-    node = task.node
-    driver_internal_info = node.driver_internal_info
-    driver_internal_info['deploy_steps'] = _get_all_deployment_steps(task)
-    node.deploy_step = {}
-    driver_internal_info['deploy_step_index'] = None
-    node.driver_internal_info = driver_internal_info
-    node.save()
-
-
-def _step_id(step):
-    """Return the 'ID' of a deploy step.
-
-    The ID is a string, <interface>.<step>.
-
-    :param step: the step dictionary.
-    :return: the step's ID string.
-    """
-    return '.'.join([step['interface'], step['step']])
-
-
-def _validate_deploy_steps_unique(user_steps):
-    """Validate that deploy steps from deploy templates are unique.
-
-    :param user_steps: a list of user steps. A user step is a dictionary
-        with required keys 'interface', 'step', 'args', and 'priority'::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_step>,
-                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
-                'priority': <priority_of_step> }
-
-        For example::
-
-              { 'interface': deploy',
-                'step': 'upgrade_firmware',
-                'args': {'force': True},
-                'priority': 10 }
-
-    :return: a list of validation error strings for the steps.
-    """
-    # Check for duplicate steps. Each interface/step combination can be
-    # specified at most once.
-    errors = []
-    counter = collections.Counter(_step_id(step) for step in user_steps)
-    duplicates = {step_id for step_id, count in counter.items() if count > 1}
-    if duplicates:
-        err = (_('deploy steps from all deploy templates matching this '
-                 'node\'s instance traits cannot have the same interface '
-                 'and step. Duplicate deploy steps for %(duplicates)s') %
-               {'duplicates': ', '.join(duplicates)})
-        errors.append(err)
-    return errors
-
-
-def _validate_user_step(task, user_step, driver_step, step_type):
-    """Validate a user-specified step.
-
-    :param task: A TaskManager object
-    :param user_step: a user step dictionary with required keys 'interface'
-        and 'step', and optional keys 'args' and 'priority'::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_step>,
-                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
-                'priority': <optional_priority> }
-
-        For example::
-
-              { 'interface': deploy',
-                'step': 'upgrade_firmware',
-                'args': {'force': True} }
-
-    :param driver_step: a driver step dictionary::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_step>,
-                'priority': <integer>
-                'abortable': Optional for clean steps, absent for deploy steps.
-                             <Boolean>.
-                'argsinfo': Optional. A dictionary of
-                            {<arg_name>:<arg_info_dict>} entries.
-                            <arg_info_dict> is a dictionary with
-                            { 'description': <description>,
-                              'required': <Boolean> } }
-
-        For example::
-
-              { 'interface': deploy',
-                'step': 'upgrade_firmware',
-                'priority': 10,
-                'abortable': True,
-                'argsinfo': {
-                    'force': { 'description': 'Whether to force the upgrade',
-                               'required': False } } }
-
-    :param step_type: either 'clean' or 'deploy'.
-    :return: a list of validation error strings for the step.
-    """
-    errors = []
-    # Check that the user-specified arguments are valid
-    argsinfo = driver_step.get('argsinfo') or {}
-    user_args = user_step.get('args') or {}
-    unexpected = set(user_args) - set(argsinfo)
-    if unexpected:
-        error = (_('%(type)s step %(step)s has these unexpected arguments: '
-                   '%(unexpected)s') %
-                 {'type': step_type, 'step': user_step,
-                  'unexpected': ', '.join(unexpected)})
-        errors.append(error)
-
-    if step_type == 'clean' or user_step['priority'] > 0:
-        # Check that all required arguments were specified by the user
-        missing = []
-        for (arg_name, arg_info) in argsinfo.items():
-            if arg_info.get('required', False) and arg_name not in user_args:
-                msg = arg_name
-                if arg_info.get('description'):
-                    msg += ' (%(desc)s)' % {'desc': arg_info['description']}
-                missing.append(msg)
-        if missing:
-            error = (_('%(type)s step %(step)s is missing these required '
-                       'arguments: %(miss)s') %
-                     {'type': step_type, 'step': user_step,
-                      'miss': ', '.join(missing)})
-            errors.append(error)
-
-    if step_type == 'clean':
-        # Copy fields that should not be provided by a user
-        user_step['abortable'] = driver_step.get('abortable', False)
-        user_step['priority'] = driver_step.get('priority', 0)
-    elif user_step['priority'] > 0:
-        # 'core' deploy steps can only be disabled.
-
-        # NOTE(mgoddard): we'll need something a little more sophisticated to
-        # track core steps once we split out the single core step.
-        is_core = (driver_step['interface'] == 'deploy' and
-                   driver_step['step'] == 'deploy')
-        if is_core:
-            error = (_('deploy step %(step)s on interface %(interface)s is a '
-                       'core step and cannot be overridden by user steps. It '
-                       'may be disabled by setting the priority to 0') %
-                     {'step': user_step['step'],
-                      'interface': user_step['interface']})
-            errors.append(error)
-
-    return errors
-
-
-def _validate_user_steps(task, user_steps, driver_steps, step_type,
-                         error_prefix=None):
-    """Validate the user-specified steps.
-
-    :param task: A TaskManager object
-    :param user_steps: a list of user steps. A user step is a dictionary
-        with required keys 'interface' and 'step', and optional keys 'args'
-        and 'priority'::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_step>,
-                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
-                'priority': <optional_priority> }
-
-        For example::
-
-              { 'interface': deploy',
-                'step': 'upgrade_firmware',
-                'args': {'force': True} }
-
-    :param driver_steps: a list of driver steps::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_step>,
-                'priority': <integer>
-                'abortable': Optional for clean steps, absent for deploy steps.
-                             <Boolean>.
-                'argsinfo': Optional. A dictionary of
-                            {<arg_name>:<arg_info_dict>} entries.
-                            <arg_info_dict> is a dictionary with
-                            { 'description': <description>,
-                              'required': <Boolean> } }
-
-        For example::
-
-              { 'interface': deploy',
-                'step': 'upgrade_firmware',
-                'priority': 10,
-                'abortable': True,
-                'argsinfo': {
-                    'force': { 'description': 'Whether to force the upgrade',
-                               'required': False } } }
-
-    :param step_type: either 'clean' or 'deploy'.
-    :param error_prefix: String to use as a prefix for exception messages, or
-        None.
-    :raises: InvalidParameterValue if validation of steps fails.
-    :raises: NodeCleaningFailure or InstanceDeployFailure if
-        there was a problem getting the steps from the driver.
-    :return: validated steps updated with information from the driver
-    """
-
-    errors = []
-
-    # Convert driver steps to a dict.
-    driver_steps = {_step_id(s): s for s in driver_steps}
-
-    for user_step in user_steps:
-        # Check if this user-specified step isn't supported by the driver
-        try:
-            driver_step = driver_steps[_step_id(user_step)]
-        except KeyError:
-            error = (_('node does not support this %(type)s step: %(step)s')
-                     % {'type': step_type, 'step': user_step})
-            errors.append(error)
-            continue
-
-        step_errors = _validate_user_step(task, user_step, driver_step,
-                                          step_type)
-        errors.extend(step_errors)
-
-    if step_type == 'deploy':
-        # Deploy steps should be unique across all combined templates.
-        dup_errors = _validate_deploy_steps_unique(user_steps)
-        errors.extend(dup_errors)
-
-    if errors:
-        err = error_prefix or ''
-        err += '; '.join(errors)
-        raise exception.InvalidParameterValue(err=err)
-
-    return user_steps
-
-
-def _validate_user_clean_steps(task, user_steps):
-    """Validate the user-specified clean steps.
-
-    :param task: A TaskManager object
-    :param user_steps: a list of clean steps. A clean step is a dictionary
-        with required keys 'interface' and 'step', and optional key 'args'::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_clean_step>,
-                'args': {<arg1>: <value1>, ..., <argn>: <valuen>} }
-
-            For example::
-
-              { 'interface': 'deploy',
-                'step': 'upgrade_firmware',
-                'args': {'force': True} }
-    :raises: InvalidParameterValue if validation of clean steps fails.
-    :raises: NodeCleaningFailure if there was a problem getting the
-        clean steps from the driver.
-    :return: validated clean steps update with information from the driver
-    """
-    driver_steps = _get_cleaning_steps(task, enabled=False, sort=False)
-    return _validate_user_steps(task, user_steps, driver_steps, 'clean')
-
-
-def _validate_user_deploy_steps(task, user_steps, error_prefix=None):
-    """Validate the user-specified deploy steps.
-
-    :param task: A TaskManager object
-    :param user_steps: a list of deploy steps. A deploy step is a dictionary
-        with required keys 'interface', 'step', 'args', and 'priority'::
-
-              { 'interface': <driver_interface>,
-                'step': <name_of_deploy_step>,
-                'args': {<arg1>: <value1>, ..., <argn>: <valuen>},
-                'priority': <priority_of_deploy_step> }
-
-            For example::
-
-              { 'interface': 'bios',
-                'step': 'apply_configuration',
-                'args': { 'settings': [ { 'foo': 'bar' } ] },
-                'priority': 150 }
-    :param error_prefix: String to use as a prefix for exception messages, or
-        None.
-    :raises: InvalidParameterValue if validation of deploy steps fails.
-    :raises: InstanceDeployFailure if there was a problem getting the deploy
-        steps from the driver.
-    :return: validated deploy steps update with information from the driver
-    """
-    driver_steps = _get_deployment_steps(task, enabled=False, sort=False)
-    return _validate_user_steps(task, user_steps, driver_steps, 'deploy',
-                                error_prefix=error_prefix)
-
-
 @task_manager.require_exclusive_lock
 def validate_port_physnet(task, port_obj):
     """Validate the consistency of physical networks of ports in a portgroup.
@@ -1372,19 +807,6 @@ def restore_power_state_if_needed(task, power_state_to_restore):
         node_power_action(task, power_state_to_restore)
 
 
-def validate_deploy_templates(task):
-    """Validate the deploy templates for a node.
-
-    :param task: A TaskManager object
-    :raises: InvalidParameterValue if the instance has traits that map to
-        deploy steps that are unsupported by the node's driver interfaces.
-    :raises: InstanceDeployFailure if there was a problem getting the deploy
-        steps from the driver.
-    """
-    # Gather deploy steps from matching deploy templates and validate them.
-    _get_validated_steps_from_templates(task)
-
-
 def build_configdrive(node, configdrive):
     """Build a configdrive from provided meta_data, network_data and user_data.
 
diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py
index 254486fcdf..513ba71131 100644
--- a/ironic/drivers/base.py
+++ b/ironic/drivers/base.py
@@ -1438,7 +1438,7 @@ def clean_step(priority, abortable=False, argsinfo=None):
     For automated cleaning, only steps with priorities greater than 0 are
     used. These steps are ordered by priority from highest value to lowest
     value. For steps with the same priority, they are ordered by driver
-    interface priority (see conductor.manager.CLEANING_INTERFACE_PRIORITY).
+    interface priority (see conductor.steps.CLEANING_INTERFACE_PRIORITY).
     execute_clean_step() will be called on each step.
 
     For manual cleaning, the clean steps will be executed in a similar fashion
@@ -1514,7 +1514,7 @@ def deploy_step(priority, argsinfo=None):
     Only steps with priorities greater than 0 are used.
     These steps are ordered by priority from highest value to lowest
     value. For steps with the same priority, they are ordered by driver
-    interface priority (see conductor.manager.DEPLOYING_INTERFACE_PRIORITY).
+    interface priority (see conductor.steps.DEPLOYING_INTERFACE_PRIORITY).
     execute_deploy_step() will be called on each step.
 
     Decorated deploy steps must take as the only positional argument, a
diff --git a/ironic/drivers/modules/agent_base_vendor.py b/ironic/drivers/modules/agent_base_vendor.py
index 6ccf386b5e..5ba8ff910d 100644
--- a/ironic/drivers/modules/agent_base_vendor.py
+++ b/ironic/drivers/modules/agent_base_vendor.py
@@ -28,6 +28,7 @@ from ironic.common import boot_devices
 from ironic.common import exception
 from ironic.common.i18n import _
 from ironic.common import states
+from ironic.conductor import steps as conductor_steps
 from ironic.conductor import utils as manager_utils
 from ironic.conf import CONF
 from ironic.drivers.modules import agent_client
@@ -352,7 +353,7 @@ class HeartbeatMixin(object):
                     # First, cache the clean steps
                     self.refresh_clean_steps(task)
                     # Then set/verify node clean steps and start cleaning
-                    manager_utils.set_node_cleaning_steps(task)
+                    conductor_steps.set_node_cleaning_steps(task)
                     # The exceptions from RPC are not possible as we using cast
                     # here
                     manager_utils.notify_conductor_resume_clean(task)
@@ -553,7 +554,7 @@ class AgentDeployMixin(HeartbeatMixin):
                          'clean version mismatch. Resetting clean steps '
                          'and rebooting the node.', node.uuid)
                 try:
-                    manager_utils.set_node_cleaning_steps(task)
+                    conductor_steps.set_node_cleaning_steps(task)
                 except exception.NodeCleaningFailure:
                     msg = (_('Could not restart automated cleaning on node '
                              '%(node)s: %(err)s.') %
diff --git a/ironic/drivers/modules/ansible/deploy.py b/ironic/drivers/modules/ansible/deploy.py
index 3732695be2..3e424206bb 100644
--- a/ironic/drivers/modules/ansible/deploy.py
+++ b/ironic/drivers/modules/ansible/deploy.py
@@ -36,6 +36,7 @@ from ironic.common.i18n import _
 from ironic.common import images
 from ironic.common import states
 from ironic.common import utils
+from ironic.conductor import steps as conductor_steps
 from ironic.conductor import task_manager
 from ironic.conductor import utils as manager_utils
 from ironic.conf import CONF
@@ -547,7 +548,7 @@ class AnsibleDeploy(agent_base.HeartbeatMixin, base.DeployInterface):
         :returns: None or states.CLEANWAIT for async prepare.
         """
         node = task.node
-        manager_utils.set_node_cleaning_steps(task)
+        conductor_steps.set_node_cleaning_steps(task)
         if not node.driver_internal_info['clean_steps']:
             # no clean steps configured, nothing to do.
             return
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 79fdca0614..de282fb328 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -41,6 +41,7 @@ from ironic.common import states
 from ironic.common import swift
 from ironic.conductor import manager
 from ironic.conductor import notification_utils
+from ironic.conductor import steps as conductor_steps
 from ironic.conductor import task_manager
 from ironic.conductor import utils as conductor_utils
 from ironic.db import api as dbapi
@@ -1325,7 +1326,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
                                                  mock_iwdi):
         self._test_do_node_deploy_validate_fail(mock_validate, mock_iwdi)
 
-    @mock.patch.object(conductor_utils, 'validate_deploy_templates')
+    @mock.patch.object(conductor_steps, 'validate_deploy_templates')
     def test_do_node_deploy_validate_template_fail(self, mock_validate,
                                                    mock_iwdi):
         self._test_do_node_deploy_validate_fail(mock_validate, mock_iwdi)
@@ -2238,7 +2239,7 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
     @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
     @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
                        autospec=True)
-    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+    @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
                        autospec=True)
     def test_do_node_deploy_deprecated(self, mock_set_steps, mock_old_way,
                                        mock_deploy_step):
@@ -2259,7 +2260,7 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
     @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
     @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
                        autospec=True)
-    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+    @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
                        autospec=True)
     def test_do_node_deploy_steps(self, mock_set_steps, mock_old_way,
                                   mock_deploy_step):
@@ -2288,7 +2289,7 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
     @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
     @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
                        autospec=True)
-    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+    @mock.patch.object(conductor_steps, 'set_node_deployment_steps',
                        autospec=True)
     def test_do_node_deploy_steps_old_rpc(self, mock_set_steps, mock_old_way,
                                           mock_deploy_step):
@@ -3499,7 +3500,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.__do_node_clean_validate_fail(mock_validate, clean_steps=[])
 
     @mock.patch.object(manager, 'LOG', autospec=True)
-    @mock.patch.object(conductor_utils, 'set_node_cleaning_steps',
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
                        autospec=True)
     @mock.patch('ironic.conductor.manager.ConductorManager.'
                 '_do_next_clean_step', autospec=True)
@@ -3756,7 +3757,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.__do_node_clean_prepare_clean_wait(clean_steps=[self.deploy_raid])
 
     @mock.patch.object(n_flat.FlatNetwork, 'validate', autospec=True)
-    @mock.patch.object(conductor_utils, 'set_node_cleaning_steps',
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
                        autospec=True)
     def __do_node_clean_steps_fail(self, mock_steps, mock_validate,
                                    clean_steps=None, invalid_exc=True):
@@ -3788,7 +3789,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
             self.__do_node_clean_steps_fail(clean_steps=[self.deploy_raid],
                                             invalid_exc=invalid)
 
-    @mock.patch.object(conductor_utils, 'set_node_cleaning_steps',
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
                        autospec=True)
     @mock.patch('ironic.conductor.manager.ConductorManager.'
                 '_do_next_clean_step', autospec=True)
@@ -4852,7 +4853,7 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
         node = obj_utils.create_test_node(self.context, driver='fake-hardware',
                                           network_interface='noop')
         with mock.patch(
-            'ironic.conductor.utils.validate_deploy_templates'
+            'ironic.conductor.steps.validate_deploy_templates'
         ) as mock_validate:
             reason = 'fake reason'
             mock_validate.side_effect = exception.InvalidParameterValue(reason)
diff --git a/ironic/tests/unit/conductor/test_steps.py b/ironic/tests/unit/conductor/test_steps.py
new file mode 100644
index 0000000000..f1abb0c970
--- /dev/null
+++ b/ironic/tests/unit/conductor/test_steps.py
@@ -0,0 +1,724 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+from oslo_config import cfg
+from oslo_utils import uuidutils
+
+from ironic.common import exception
+from ironic.common import states
+from ironic.conductor import steps as conductor_steps
+from ironic.conductor import task_manager
+from ironic import objects
+from ironic.tests.unit.db import base as db_base
+from ironic.tests.unit.db import utils as db_utils
+from ironic.tests.unit.objects import utils as obj_utils
+
+CONF = cfg.CONF
+
+
+class NodeDeployStepsTestCase(db_base.DbTestCase):
+    def setUp(self):
+        super(NodeDeployStepsTestCase, self).setUp()
+
+        self.deploy_start = {
+            'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
+        self.power_one = {
+            'step': 'power_one', 'priority': 40, 'interface': 'power'}
+        self.deploy_middle = {
+            'step': 'deploy_middle', 'priority': 40, 'interface': 'deploy'}
+        self.deploy_end = {
+            'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
+        self.power_disable = {
+            'step': 'power_disable', 'priority': 0, 'interface': 'power'}
+        self.deploy_core = {
+            'step': 'deploy', 'priority': 100, 'interface': 'deploy'}
+        # enabled steps
+        self.deploy_steps = [self.deploy_start, self.power_one,
+                             self.deploy_middle, self.deploy_end]
+        # Deploy step with argsinfo.
+        self.deploy_raid = {
+            'step': 'build_raid', 'priority': 0, 'interface': 'deploy',
+            'argsinfo': {'arg1': {'description': 'desc1', 'required': True},
+                         'arg2': {'description': 'desc2'}}}
+        self.node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware')
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
+                autospec=True)
+    def test__get_deployment_steps(self, mock_mgt_steps, mock_power_steps,
+                                   mock_deploy_steps):
+        # Test getting deploy steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+
+        mock_power_steps.return_value = [self.power_disable, self.power_one]
+        mock_deploy_steps.return_value = [
+            self.deploy_start, self.deploy_middle, self.deploy_end]
+
+        expected = self.deploy_steps + [self.power_disable]
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_steps._get_deployment_steps(task, enabled=False)
+
+            self.assertEqual(expected, steps)
+            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
+            mock_power_steps.assert_called_once_with(mock.ANY, task)
+            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
+                autospec=True)
+    def test__get_deploy_steps_unsorted(self, mock_mgt_steps, mock_power_steps,
+                                        mock_deploy_steps):
+
+        mock_deploy_steps.return_value = [self.deploy_end,
+                                          self.deploy_start,
+                                          self.deploy_middle]
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_steps._get_deployment_steps(task, enabled=False,
+                                                          sort=False)
+            self.assertEqual(mock_deploy_steps.return_value, steps)
+            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
+            mock_power_steps.assert_called_once_with(mock.ANY, task)
+            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
+                autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
+                autospec=True)
+    def test__get_deployment_steps_only_enabled(
+            self, mock_mgt_steps, mock_power_steps, mock_deploy_steps):
+        # Test getting only deploy steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+        # Should discard zero-priority deploy step.
+
+        mock_power_steps.return_value = [self.power_one, self.power_disable]
+        mock_deploy_steps.return_value = [self.deploy_end,
+                                          self.deploy_middle,
+                                          self.deploy_start]
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=True) as task:
+            steps = conductor_steps._get_deployment_steps(task, enabled=True)
+
+            self.assertEqual(self.deploy_steps, steps)
+            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
+            mock_power_steps.assert_called_once_with(mock.ANY, task)
+            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
+
+    @mock.patch.object(objects.DeployTemplate, 'list_by_names')
+    def test__get_deployment_templates_no_traits(self, mock_list):
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            templates = conductor_steps._get_deployment_templates(task)
+            self.assertEqual([], templates)
+            self.assertFalse(mock_list.called)
+
+    @mock.patch.object(objects.DeployTemplate, 'list_by_names')
+    def test__get_deployment_templates(self, mock_list):
+        traits = ['CUSTOM_DT1', 'CUSTOM_DT2']
+        node = obj_utils.create_test_node(
+            self.context, uuid=uuidutils.generate_uuid(),
+            instance_info={'traits': traits})
+        template1 = obj_utils.get_test_deploy_template(self.context)
+        template2 = obj_utils.get_test_deploy_template(
+            self.context, name='CUSTOM_DT2', uuid=uuidutils.generate_uuid(),
+            steps=[{'interface': 'bios', 'step': 'apply_configuration',
+                    'args': {}, 'priority': 1}])
+        mock_list.return_value = [template1, template2]
+        expected = [template1, template2]
+        with task_manager.acquire(
+                self.context, node.uuid, shared=False) as task:
+            templates = conductor_steps._get_deployment_templates(task)
+            self.assertEqual(expected, templates)
+            mock_list.assert_called_once_with(task.context, traits)
+
+    def test__get_steps_from_deployment_templates(self):
+        template1 = obj_utils.get_test_deploy_template(self.context)
+        template2 = obj_utils.get_test_deploy_template(
+            self.context, name='CUSTOM_DT2', uuid=uuidutils.generate_uuid(),
+            steps=[{'interface': 'bios', 'step': 'apply_configuration',
+                    'args': {}, 'priority': 1}])
+        step1 = template1.steps[0]
+        step2 = template2.steps[0]
+        expected = [
+            {
+                'interface': step1['interface'],
+                'step': step1['step'],
+                'args': step1['args'],
+                'priority': step1['priority'],
+            },
+            {
+                'interface': step2['interface'],
+                'step': step2['step'],
+                'args': step2['args'],
+                'priority': step2['priority'],
+            }
+        ]
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_steps._get_steps_from_deployment_templates(
+                task, [template1, template2])
+            self.assertEqual(expected, steps)
+
+    @mock.patch.object(conductor_steps, '_get_validated_steps_from_templates',
+                       autospec=True)
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def _test__get_all_deployment_steps(self, user_steps, driver_steps,
+                                        expected_steps, mock_steps,
+                                        mock_validated):
+        mock_validated.return_value = user_steps
+        mock_steps.return_value = driver_steps
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_steps._get_all_deployment_steps(task)
+            self.assertEqual(expected_steps, steps)
+            mock_validated.assert_called_once_with(task)
+            mock_steps.assert_called_once_with(task, enabled=True, sort=False)
+
+    def test__get_all_deployment_steps_no_steps(self):
+        # Nothing in -> nothing out.
+        user_steps = []
+        driver_steps = []
+        expected_steps = []
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_no_user_steps(self):
+        # Only driver steps in -> only driver steps out.
+        user_steps = []
+        driver_steps = self.deploy_steps
+        expected_steps = self.deploy_steps
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_no_driver_steps(self):
+        # Only user steps in -> only user steps out.
+        user_steps = self.deploy_steps
+        driver_steps = []
+        expected_steps = self.deploy_steps
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_user_and_driver_steps(self):
+        # Driver and user steps in -> driver and user steps out.
+        user_steps = self.deploy_steps[:2]
+        driver_steps = self.deploy_steps[2:]
+        expected_steps = self.deploy_steps
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_disable_core_steps(self):
+        # User steps can disable core driver steps.
+        user_steps = [self.deploy_core.copy()]
+        user_steps[0].update({'priority': 0})
+        driver_steps = [self.deploy_core]
+        expected_steps = []
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_override_driver_steps(self):
+        # User steps override non-core driver steps.
+        user_steps = [step.copy() for step in self.deploy_steps[:2]]
+        user_steps[0].update({'priority': 200})
+        user_steps[1].update({'priority': 100})
+        driver_steps = self.deploy_steps
+        expected_steps = user_steps + self.deploy_steps[2:]
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    def test__get_all_deployment_steps_duplicate_user_steps(self):
+        # Duplicate user steps override non-core driver steps.
+
+        # NOTE(mgoddard): This case is currently prevented by the API and
+        # conductor - the interface/step must be unique across all enabled
+        # steps. This test ensures that we can support this case, in case we
+        # choose to allow it in future.
+        user_steps = [self.deploy_start.copy(), self.deploy_start.copy()]
+        user_steps[0].update({'priority': 200})
+        user_steps[1].update({'priority': 100})
+        driver_steps = self.deploy_steps
+        # Each user invocation of the deploy_start step should be included, but
+        # not the default deploy_start from the driver.
+        expected_steps = user_steps + self.deploy_steps[1:]
+        self._test__get_all_deployment_steps(user_steps, driver_steps,
+                                             expected_steps)
+
+    @mock.patch.object(conductor_steps, '_get_validated_steps_from_templates',
+                       autospec=True)
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__get_all_deployment_steps_error(self, mock_steps, mock_validated):
+        mock_validated.side_effect = exception.InvalidParameterValue('foo')
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            self.assertRaises(exception.InvalidParameterValue,
+                              conductor_steps._get_all_deployment_steps, task)
+            mock_validated.assert_called_once_with(task)
+            self.assertFalse(mock_steps.called)
+
+    @mock.patch.object(conductor_steps, '_get_all_deployment_steps',
+                       autospec=True)
+    def test_set_node_deployment_steps(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            conductor_steps.set_node_deployment_steps(task)
+            self.node.refresh()
+            self.assertEqual(self.deploy_steps,
+                             self.node.driver_internal_info['deploy_steps'])
+            self.assertEqual({}, self.node.deploy_step)
+            self.assertIsNone(
+                self.node.driver_internal_info['deploy_step_index'])
+            mock_steps.assert_called_once_with(task)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+
+        user_steps = [{'step': 'deploy_start', 'interface': 'deploy',
+                       'priority': 100},
+                      {'step': 'power_one', 'interface': 'power',
+                       'priority': 200}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            result = conductor_steps._validate_user_deploy_steps(task,
+                                                                 user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+        self.assertEqual(user_steps, result)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_no_steps(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            conductor_steps._validate_user_deploy_steps(task, [])
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_get_steps_exception(self, mock_steps):
+        mock_steps.side_effect = exception.InstanceDeployFailure('bad')
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaises(exception.InstanceDeployFailure,
+                              conductor_steps._validate_user_deploy_steps,
+                              task, [])
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_not_supported(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'bad_step', 'interface': 'deploy',
+                       'priority': 100}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "does not support.*bad_step",
+                                   conductor_steps._validate_user_deploy_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_invalid_arg(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'args': {'arg1': 'val1', 'arg2': 'val2'},
+                       'priority': 200}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "power_one.*unexpected.*arg1",
+                                   conductor_steps._validate_user_deploy_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_missing_required_arg(self,
+                                                              mock_steps):
+        mock_steps.return_value = [self.power_one, self.deploy_raid]
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'build_raid', 'interface': 'deploy',
+                       'priority': 100}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "build_raid.*missing.*arg1",
+                                   conductor_steps._validate_user_deploy_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_disable_non_core(self, mock_steps):
+        # Required arguments don't apply to disabled steps.
+        mock_steps.return_value = [self.power_one, self.deploy_raid]
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'build_raid', 'interface': 'deploy',
+                       'priority': 0}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            result = conductor_steps._validate_user_deploy_steps(task,
+                                                                 user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+        self.assertEqual(user_steps, result)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_disable_core(self, mock_steps):
+        mock_steps.return_value = [self.power_one, self.deploy_core]
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'deploy', 'interface': 'deploy', 'priority': 0}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            result = conductor_steps._validate_user_deploy_steps(task,
+                                                                 user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+        self.assertEqual(user_steps, result)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_override_core(self, mock_steps):
+        mock_steps.return_value = [self.power_one, self.deploy_core]
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'deploy', 'interface': 'deploy',
+                       'priority': 200}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "deploy.*is a core step",
+                                   conductor_steps._validate_user_deploy_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_deployment_steps', autospec=True)
+    def test__validate_user_deploy_steps_duplicates(self, mock_steps):
+        mock_steps.return_value = [self.power_one, self.deploy_core]
+        user_steps = [{'step': 'power_one', 'interface': 'power',
+                       'priority': 200},
+                      {'step': 'power_one', 'interface': 'power',
+                       'priority': 100}]
+
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "Duplicate deploy steps for "
+                                   "power.power_one",
+                                   conductor_steps._validate_user_deploy_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+
+class NodeCleaningStepsTestCase(db_base.DbTestCase):
+    def setUp(self):
+        super(NodeCleaningStepsTestCase, self).setUp()
+
+        self.power_update = {
+            'step': 'update_firmware', 'priority': 10, 'interface': 'power'}
+        self.deploy_update = {
+            'step': 'update_firmware', 'priority': 10, 'interface': 'deploy'}
+        self.deploy_erase = {
+            'step': 'erase_disks', 'priority': 20, 'interface': 'deploy',
+            'abortable': True}
+        # Automated cleaning should be executed in this order
+        self.clean_steps = [self.deploy_erase, self.power_update,
+                            self.deploy_update]
+        # Manual clean step
+        self.deploy_raid = {
+            'step': 'build_raid', 'priority': 0, 'interface': 'deploy',
+            'argsinfo': {'arg1': {'description': 'desc1', 'required': True},
+                         'arg2': {'description': 'desc2'}}}
+
+    @mock.patch('ironic.drivers.modules.fake.FakeBIOS.get_clean_steps',
+                lambda self, task: [])
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
+    def test__get_cleaning_steps(self, mock_power_steps, mock_deploy_steps):
+        # Test getting cleaning steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANING,
+            target_provision_state=states.AVAILABLE)
+
+        mock_power_steps.return_value = [self.power_update]
+        mock_deploy_steps.return_value = [self.deploy_erase,
+                                          self.deploy_update]
+
+        with task_manager.acquire(
+                self.context, node.uuid, shared=False) as task:
+            steps = conductor_steps._get_cleaning_steps(task, enabled=False)
+
+        self.assertEqual(self.clean_steps, steps)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeBIOS.get_clean_steps',
+                lambda self, task: [])
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
+    def test__get_cleaning_steps_unsorted(self, mock_power_steps,
+                                          mock_deploy_steps):
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANING,
+            target_provision_state=states.MANAGEABLE)
+
+        mock_deploy_steps.return_value = [self.deploy_raid,
+                                          self.deploy_update,
+                                          self.deploy_erase]
+        with task_manager.acquire(
+                self.context, node.uuid, shared=False) as task:
+            steps = conductor_steps._get_cleaning_steps(task, enabled=False,
+                                                        sort=False)
+        self.assertEqual(mock_deploy_steps.return_value, steps)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
+    def test__get_cleaning_steps_only_enabled(self, mock_power_steps,
+                                              mock_deploy_steps):
+        # Test getting only cleaning steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+        # Should discard zero-priority (manual) clean step
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANING,
+            target_provision_state=states.AVAILABLE)
+
+        mock_power_steps.return_value = [self.power_update]
+        mock_deploy_steps.return_value = [self.deploy_erase,
+                                          self.deploy_update,
+                                          self.deploy_raid]
+
+        with task_manager.acquire(
+                self.context, node.uuid, shared=True) as task:
+            steps = conductor_steps._get_cleaning_steps(task, enabled=True)
+
+        self.assertEqual(self.clean_steps, steps)
+
+    @mock.patch.object(conductor_steps, '_validate_user_clean_steps')
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test_set_node_cleaning_steps_automated(self, mock_steps,
+                                               mock_validate_user_steps):
+        mock_steps.return_value = self.clean_steps
+
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANING,
+            target_provision_state=states.AVAILABLE,
+            last_error=None,
+            clean_step=None)
+
+        with task_manager.acquire(
+                self.context, node.uuid, shared=False) as task:
+            conductor_steps.set_node_cleaning_steps(task)
+            node.refresh()
+            self.assertEqual(self.clean_steps,
+                             node.driver_internal_info['clean_steps'])
+            self.assertEqual({}, node.clean_step)
+            mock_steps.assert_called_once_with(task, enabled=True)
+            self.assertFalse(mock_validate_user_steps.called)
+
+    @mock.patch.object(conductor_steps, '_validate_user_clean_steps')
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test_set_node_cleaning_steps_manual(self, mock_steps,
+                                            mock_validate_user_steps):
+        clean_steps = [self.deploy_raid]
+        mock_steps.return_value = self.clean_steps
+        mock_validate_user_steps.return_value = clean_steps
+
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANING,
+            target_provision_state=states.MANAGEABLE,
+            last_error=None,
+            clean_step=None,
+            driver_internal_info={'clean_steps': clean_steps})
+
+        with task_manager.acquire(
+                self.context, node.uuid, shared=False) as task:
+            conductor_steps.set_node_cleaning_steps(task)
+            node.refresh()
+            self.assertEqual(clean_steps,
+                             node.driver_internal_info['clean_steps'])
+            self.assertEqual({}, node.clean_step)
+            self.assertFalse(mock_steps.called)
+            mock_validate_user_steps.assert_called_once_with(task, clean_steps)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.return_value = self.clean_steps
+
+        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
+                      {'step': 'erase_disks', 'interface': 'deploy'}]
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            result = conductor_steps._validate_user_clean_steps(task,
+                                                                user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+        expected = [{'step': 'update_firmware', 'interface': 'power',
+                     'priority': 10, 'abortable': False},
+                    {'step': 'erase_disks', 'interface': 'deploy',
+                     'priority': 20, 'abortable': True}]
+        self.assertEqual(expected, result)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps_no_steps(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.return_value = self.clean_steps
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            conductor_steps._validate_user_clean_steps(task, [])
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps_get_steps_exception(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.side_effect = exception.NodeCleaningFailure('bad')
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            self.assertRaises(exception.NodeCleaningFailure,
+                              conductor_steps._validate_user_clean_steps,
+                              task, [])
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps_not_supported(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.return_value = [self.power_update, self.deploy_raid]
+        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
+                      {'step': 'bad_step', 'interface': 'deploy'}]
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "does not support.*bad_step",
+                                   conductor_steps._validate_user_clean_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps_invalid_arg(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.return_value = self.clean_steps
+        user_steps = [{'step': 'update_firmware', 'interface': 'power',
+                       'args': {'arg1': 'val1', 'arg2': 'val2'}},
+                      {'step': 'erase_disks', 'interface': 'deploy'}]
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "update_firmware.*unexpected.*arg1",
+                                   conductor_steps._validate_user_clean_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+    @mock.patch.object(conductor_steps, '_get_cleaning_steps')
+    def test__validate_user_clean_steps_missing_required_arg(self, mock_steps):
+        node = obj_utils.create_test_node(self.context)
+        mock_steps.return_value = [self.power_update, self.deploy_raid]
+        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
+                      {'step': 'build_raid', 'interface': 'deploy'}]
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            self.assertRaisesRegex(exception.InvalidParameterValue,
+                                   "build_raid.*missing.*arg1",
+                                   conductor_steps._validate_user_clean_steps,
+                                   task, user_steps)
+            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+
+@mock.patch.object(conductor_steps, '_get_deployment_templates',
+                   autospec=True)
+@mock.patch.object(conductor_steps, '_get_steps_from_deployment_templates',
+                   autospec=True)
+@mock.patch.object(conductor_steps, '_validate_user_deploy_steps',
+                   autospec=True)
+class GetValidatedStepsFromTemplatesTestCase(db_base.DbTestCase):
+
+    def setUp(self):
+        super(GetValidatedStepsFromTemplatesTestCase, self).setUp()
+        self.node = obj_utils.create_test_node(self.context,
+                                               driver='fake-hardware')
+        self.template = obj_utils.get_test_deploy_template(self.context)
+
+    def test_ok(self, mock_validate, mock_steps, mock_templates):
+        mock_templates.return_value = [self.template]
+        steps = [db_utils.get_test_deploy_template_step()]
+        mock_steps.return_value = steps
+        mock_validate.return_value = steps
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            result = conductor_steps._get_validated_steps_from_templates(task)
+            self.assertEqual(steps, result)
+            mock_templates.assert_called_once_with(task)
+            mock_steps.assert_called_once_with(task, [self.template])
+            mock_validate.assert_called_once_with(task, steps, mock.ANY)
+
+    def test_invalid_parameter_value(self, mock_validate, mock_steps,
+                                     mock_templates):
+        mock_templates.return_value = [self.template]
+        mock_validate.side_effect = exception.InvalidParameterValue('fake')
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            self.assertRaises(
+                exception.InvalidParameterValue,
+                conductor_steps._get_validated_steps_from_templates, task)
+
+    def test_instance_deploy_failure(self, mock_validate, mock_steps,
+                                     mock_templates):
+        mock_templates.return_value = [self.template]
+        mock_validate.side_effect = exception.InstanceDeployFailure('foo')
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            self.assertRaises(
+                exception.InstanceDeployFailure,
+                conductor_steps._get_validated_steps_from_templates, task)
+
+
+@mock.patch.object(conductor_steps, '_get_validated_steps_from_templates',
+                   autospec=True)
+class ValidateDeployTemplatesTestCase(db_base.DbTestCase):
+
+    def setUp(self):
+        super(ValidateDeployTemplatesTestCase, self).setUp()
+        self.node = obj_utils.create_test_node(self.context,
+                                               driver='fake-hardware')
+
+    def test_ok(self, mock_validated):
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            result = conductor_steps.validate_deploy_templates(task)
+            self.assertIsNone(result)
+            mock_validated.assert_called_once_with(task)
+
+    def test_error(self, mock_validated):
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            mock_validated.side_effect = exception.InvalidParameterValue('foo')
+            self.assertRaises(exception.InvalidParameterValue,
+                              conductor_steps.validate_deploy_templates, task)
+            mock_validated.assert_called_once_with(task)
diff --git a/ironic/tests/unit/conductor/test_utils.py b/ironic/tests/unit/conductor/test_utils.py
index af06a0b3a0..dc2e4b3d0a 100644
--- a/ironic/tests/unit/conductor/test_utils.py
+++ b/ironic/tests/unit/conductor/test_utils.py
@@ -967,631 +967,6 @@ class DeployingErrorHandlerTestCase(tests_base.TestCase):
         self.task.process_event.assert_called_once_with('fail')
 
 
-class NodeDeployStepsTestCase(db_base.DbTestCase):
-    def setUp(self):
-        super(NodeDeployStepsTestCase, self).setUp()
-
-        self.deploy_start = {
-            'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
-        self.power_one = {
-            'step': 'power_one', 'priority': 40, 'interface': 'power'}
-        self.deploy_middle = {
-            'step': 'deploy_middle', 'priority': 40, 'interface': 'deploy'}
-        self.deploy_end = {
-            'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
-        self.power_disable = {
-            'step': 'power_disable', 'priority': 0, 'interface': 'power'}
-        self.deploy_core = {
-            'step': 'deploy', 'priority': 100, 'interface': 'deploy'}
-        # enabled steps
-        self.deploy_steps = [self.deploy_start, self.power_one,
-                             self.deploy_middle, self.deploy_end]
-        # Deploy step with argsinfo.
-        self.deploy_raid = {
-            'step': 'build_raid', 'priority': 0, 'interface': 'deploy',
-            'argsinfo': {'arg1': {'description': 'desc1', 'required': True},
-                         'arg2': {'description': 'desc2'}}}
-        self.node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware')
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
-                autospec=True)
-    def test__get_deployment_steps(self, mock_mgt_steps, mock_power_steps,
-                                   mock_deploy_steps):
-        # Test getting deploy steps, with one driver returning None, two
-        # conflicting priorities, and asserting they are ordered properly.
-
-        mock_power_steps.return_value = [self.power_disable, self.power_one]
-        mock_deploy_steps.return_value = [
-            self.deploy_start, self.deploy_middle, self.deploy_end]
-
-        expected = self.deploy_steps + [self.power_disable]
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            steps = conductor_utils._get_deployment_steps(task, enabled=False)
-
-            self.assertEqual(expected, steps)
-            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
-            mock_power_steps.assert_called_once_with(mock.ANY, task)
-            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
-                autospec=True)
-    def test__get_deploy_steps_unsorted(self, mock_mgt_steps, mock_power_steps,
-                                        mock_deploy_steps):
-
-        mock_deploy_steps.return_value = [self.deploy_end,
-                                          self.deploy_start,
-                                          self.deploy_middle]
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            steps = conductor_utils._get_deployment_steps(task, enabled=False,
-                                                          sort=False)
-            self.assertEqual(mock_deploy_steps.return_value, steps)
-            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
-            mock_power_steps.assert_called_once_with(mock.ANY, task)
-            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps',
-                autospec=True)
-    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps',
-                autospec=True)
-    def test__get_deployment_steps_only_enabled(
-            self, mock_mgt_steps, mock_power_steps, mock_deploy_steps):
-        # Test getting only deploy steps, with one driver returning None, two
-        # conflicting priorities, and asserting they are ordered properly.
-        # Should discard zero-priority deploy step.
-
-        mock_power_steps.return_value = [self.power_one, self.power_disable]
-        mock_deploy_steps.return_value = [self.deploy_end,
-                                          self.deploy_middle,
-                                          self.deploy_start]
-
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=True) as task:
-            steps = conductor_utils._get_deployment_steps(task, enabled=True)
-
-            self.assertEqual(self.deploy_steps, steps)
-            mock_mgt_steps.assert_called_once_with(mock.ANY, task)
-            mock_power_steps.assert_called_once_with(mock.ANY, task)
-            mock_deploy_steps.assert_called_once_with(mock.ANY, task)
-
-    @mock.patch.object(objects.DeployTemplate, 'list_by_names')
-    def test__get_deployment_templates_no_traits(self, mock_list):
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            templates = conductor_utils._get_deployment_templates(task)
-            self.assertEqual([], templates)
-            self.assertFalse(mock_list.called)
-
-    @mock.patch.object(objects.DeployTemplate, 'list_by_names')
-    def test__get_deployment_templates(self, mock_list):
-        traits = ['CUSTOM_DT1', 'CUSTOM_DT2']
-        node = obj_utils.create_test_node(
-            self.context, uuid=uuidutils.generate_uuid(),
-            instance_info={'traits': traits})
-        template1 = obj_utils.get_test_deploy_template(self.context)
-        template2 = obj_utils.get_test_deploy_template(
-            self.context, name='CUSTOM_DT2', uuid=uuidutils.generate_uuid(),
-            steps=[{'interface': 'bios', 'step': 'apply_configuration',
-                    'args': {}, 'priority': 1}])
-        mock_list.return_value = [template1, template2]
-        expected = [template1, template2]
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            templates = conductor_utils._get_deployment_templates(task)
-            self.assertEqual(expected, templates)
-            mock_list.assert_called_once_with(task.context, traits)
-
-    def test__get_steps_from_deployment_templates(self):
-        template1 = obj_utils.get_test_deploy_template(self.context)
-        template2 = obj_utils.get_test_deploy_template(
-            self.context, name='CUSTOM_DT2', uuid=uuidutils.generate_uuid(),
-            steps=[{'interface': 'bios', 'step': 'apply_configuration',
-                    'args': {}, 'priority': 1}])
-        step1 = template1.steps[0]
-        step2 = template2.steps[0]
-        expected = [
-            {
-                'interface': step1['interface'],
-                'step': step1['step'],
-                'args': step1['args'],
-                'priority': step1['priority'],
-            },
-            {
-                'interface': step2['interface'],
-                'step': step2['step'],
-                'args': step2['args'],
-                'priority': step2['priority'],
-            }
-        ]
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            steps = conductor_utils._get_steps_from_deployment_templates(
-                task, [template1, template2])
-            self.assertEqual(expected, steps)
-
-    @mock.patch.object(conductor_utils, '_get_validated_steps_from_templates',
-                       autospec=True)
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def _test__get_all_deployment_steps(self, user_steps, driver_steps,
-                                        expected_steps, mock_steps,
-                                        mock_validated):
-        mock_validated.return_value = user_steps
-        mock_steps.return_value = driver_steps
-
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            steps = conductor_utils._get_all_deployment_steps(task)
-            self.assertEqual(expected_steps, steps)
-            mock_validated.assert_called_once_with(task)
-            mock_steps.assert_called_once_with(task, enabled=True, sort=False)
-
-    def test__get_all_deployment_steps_no_steps(self):
-        # Nothing in -> nothing out.
-        user_steps = []
-        driver_steps = []
-        expected_steps = []
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_no_user_steps(self):
-        # Only driver steps in -> only driver steps out.
-        user_steps = []
-        driver_steps = self.deploy_steps
-        expected_steps = self.deploy_steps
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_no_driver_steps(self):
-        # Only user steps in -> only user steps out.
-        user_steps = self.deploy_steps
-        driver_steps = []
-        expected_steps = self.deploy_steps
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_user_and_driver_steps(self):
-        # Driver and user steps in -> driver and user steps out.
-        user_steps = self.deploy_steps[:2]
-        driver_steps = self.deploy_steps[2:]
-        expected_steps = self.deploy_steps
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_disable_core_steps(self):
-        # User steps can disable core driver steps.
-        user_steps = [self.deploy_core.copy()]
-        user_steps[0].update({'priority': 0})
-        driver_steps = [self.deploy_core]
-        expected_steps = []
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_override_driver_steps(self):
-        # User steps override non-core driver steps.
-        user_steps = [step.copy() for step in self.deploy_steps[:2]]
-        user_steps[0].update({'priority': 200})
-        user_steps[1].update({'priority': 100})
-        driver_steps = self.deploy_steps
-        expected_steps = user_steps + self.deploy_steps[2:]
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    def test__get_all_deployment_steps_duplicate_user_steps(self):
-        # Duplicate user steps override non-core driver steps.
-
-        # NOTE(mgoddard): This case is currently prevented by the API and
-        # conductor - the interface/step must be unique across all enabled
-        # steps. This test ensures that we can support this case, in case we
-        # choose to allow it in future.
-        user_steps = [self.deploy_start.copy(), self.deploy_start.copy()]
-        user_steps[0].update({'priority': 200})
-        user_steps[1].update({'priority': 100})
-        driver_steps = self.deploy_steps
-        # Each user invocation of the deploy_start step should be included, but
-        # not the default deploy_start from the driver.
-        expected_steps = user_steps + self.deploy_steps[1:]
-        self._test__get_all_deployment_steps(user_steps, driver_steps,
-                                             expected_steps)
-
-    @mock.patch.object(conductor_utils, '_get_validated_steps_from_templates',
-                       autospec=True)
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__get_all_deployment_steps_error(self, mock_steps, mock_validated):
-        mock_validated.side_effect = exception.InvalidParameterValue('foo')
-
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertRaises(exception.InvalidParameterValue,
-                              conductor_utils._get_all_deployment_steps, task)
-            mock_validated.assert_called_once_with(task)
-            self.assertFalse(mock_steps.called)
-
-    @mock.patch.object(conductor_utils, '_get_all_deployment_steps',
-                       autospec=True)
-    def test_set_node_deployment_steps(self, mock_steps):
-        mock_steps.return_value = self.deploy_steps
-
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            conductor_utils.set_node_deployment_steps(task)
-            self.node.refresh()
-            self.assertEqual(self.deploy_steps,
-                             self.node.driver_internal_info['deploy_steps'])
-            self.assertEqual({}, self.node.deploy_step)
-            self.assertIsNone(
-                self.node.driver_internal_info['deploy_step_index'])
-            mock_steps.assert_called_once_with(task)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps(self, mock_steps):
-        mock_steps.return_value = self.deploy_steps
-
-        user_steps = [{'step': 'deploy_start', 'interface': 'deploy',
-                       'priority': 100},
-                      {'step': 'power_one', 'interface': 'power',
-                       'priority': 200}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            result = conductor_utils._validate_user_deploy_steps(task,
-                                                                 user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-        self.assertEqual(user_steps, result)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_no_steps(self, mock_steps):
-        mock_steps.return_value = self.deploy_steps
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            conductor_utils._validate_user_deploy_steps(task, [])
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_get_steps_exception(self, mock_steps):
-        mock_steps.side_effect = exception.InstanceDeployFailure('bad')
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaises(exception.InstanceDeployFailure,
-                              conductor_utils._validate_user_deploy_steps,
-                              task, [])
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_not_supported(self, mock_steps):
-        mock_steps.return_value = self.deploy_steps
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'bad_step', 'interface': 'deploy',
-                       'priority': 100}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "does not support.*bad_step",
-                                   conductor_utils._validate_user_deploy_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_invalid_arg(self, mock_steps):
-        mock_steps.return_value = self.deploy_steps
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'args': {'arg1': 'val1', 'arg2': 'val2'},
-                       'priority': 200}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "power_one.*unexpected.*arg1",
-                                   conductor_utils._validate_user_deploy_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_missing_required_arg(self,
-                                                              mock_steps):
-        mock_steps.return_value = [self.power_one, self.deploy_raid]
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'build_raid', 'interface': 'deploy',
-                       'priority': 100}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "build_raid.*missing.*arg1",
-                                   conductor_utils._validate_user_deploy_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_disable_non_core(self, mock_steps):
-        # Required arguments don't apply to disabled steps.
-        mock_steps.return_value = [self.power_one, self.deploy_raid]
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'build_raid', 'interface': 'deploy',
-                       'priority': 0}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            result = conductor_utils._validate_user_deploy_steps(task,
-                                                                 user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-        self.assertEqual(user_steps, result)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_disable_core(self, mock_steps):
-        mock_steps.return_value = [self.power_one, self.deploy_core]
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'deploy', 'interface': 'deploy', 'priority': 0}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            result = conductor_utils._validate_user_deploy_steps(task,
-                                                                 user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-        self.assertEqual(user_steps, result)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_override_core(self, mock_steps):
-        mock_steps.return_value = [self.power_one, self.deploy_core]
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'deploy', 'interface': 'deploy',
-                       'priority': 200}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "deploy.*is a core step",
-                                   conductor_utils._validate_user_deploy_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_deployment_steps', autospec=True)
-    def test__validate_user_deploy_steps_duplicates(self, mock_steps):
-        mock_steps.return_value = [self.power_one, self.deploy_core]
-        user_steps = [{'step': 'power_one', 'interface': 'power',
-                       'priority': 200},
-                      {'step': 'power_one', 'interface': 'power',
-                       'priority': 100}]
-
-        with task_manager.acquire(self.context, self.node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "Duplicate deploy steps for "
-                                   "power.power_one",
-                                   conductor_utils._validate_user_deploy_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-
-class NodeCleaningStepsTestCase(db_base.DbTestCase):
-    def setUp(self):
-        super(NodeCleaningStepsTestCase, self).setUp()
-
-        self.power_update = {
-            'step': 'update_firmware', 'priority': 10, 'interface': 'power'}
-        self.deploy_update = {
-            'step': 'update_firmware', 'priority': 10, 'interface': 'deploy'}
-        self.deploy_erase = {
-            'step': 'erase_disks', 'priority': 20, 'interface': 'deploy',
-            'abortable': True}
-        # Automated cleaning should be executed in this order
-        self.clean_steps = [self.deploy_erase, self.power_update,
-                            self.deploy_update]
-        # Manual clean step
-        self.deploy_raid = {
-            'step': 'build_raid', 'priority': 0, 'interface': 'deploy',
-            'argsinfo': {'arg1': {'description': 'desc1', 'required': True},
-                         'arg2': {'description': 'desc2'}}}
-
-    @mock.patch('ironic.drivers.modules.fake.FakeBIOS.get_clean_steps',
-                lambda self, task: [])
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
-    def test__get_cleaning_steps(self, mock_power_steps, mock_deploy_steps):
-        # Test getting cleaning steps, with one driver returning None, two
-        # conflicting priorities, and asserting they are ordered properly.
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.CLEANING,
-            target_provision_state=states.AVAILABLE)
-
-        mock_power_steps.return_value = [self.power_update]
-        mock_deploy_steps.return_value = [self.deploy_erase,
-                                          self.deploy_update]
-
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            steps = conductor_utils._get_cleaning_steps(task, enabled=False)
-
-        self.assertEqual(self.clean_steps, steps)
-
-    @mock.patch('ironic.drivers.modules.fake.FakeBIOS.get_clean_steps',
-                lambda self, task: [])
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
-    def test__get_cleaning_steps_unsorted(self, mock_power_steps,
-                                          mock_deploy_steps):
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.CLEANING,
-            target_provision_state=states.MANAGEABLE)
-
-        mock_deploy_steps.return_value = [self.deploy_raid,
-                                          self.deploy_update,
-                                          self.deploy_erase]
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            steps = conductor_utils._get_cleaning_steps(task, enabled=False,
-                                                        sort=False)
-        self.assertEqual(mock_deploy_steps.return_value, steps)
-
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_clean_steps')
-    @mock.patch('ironic.drivers.modules.fake.FakePower.get_clean_steps')
-    def test__get_cleaning_steps_only_enabled(self, mock_power_steps,
-                                              mock_deploy_steps):
-        # Test getting only cleaning steps, with one driver returning None, two
-        # conflicting priorities, and asserting they are ordered properly.
-        # Should discard zero-priority (manual) clean step
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.CLEANING,
-            target_provision_state=states.AVAILABLE)
-
-        mock_power_steps.return_value = [self.power_update]
-        mock_deploy_steps.return_value = [self.deploy_erase,
-                                          self.deploy_update,
-                                          self.deploy_raid]
-
-        with task_manager.acquire(
-                self.context, node.uuid, shared=True) as task:
-            steps = conductor_utils._get_cleaning_steps(task, enabled=True)
-
-        self.assertEqual(self.clean_steps, steps)
-
-    @mock.patch.object(conductor_utils, '_validate_user_clean_steps')
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test_set_node_cleaning_steps_automated(self, mock_steps,
-                                               mock_validate_user_steps):
-        mock_steps.return_value = self.clean_steps
-
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.CLEANING,
-            target_provision_state=states.AVAILABLE,
-            last_error=None,
-            clean_step=None)
-
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            conductor_utils.set_node_cleaning_steps(task)
-            node.refresh()
-            self.assertEqual(self.clean_steps,
-                             node.driver_internal_info['clean_steps'])
-            self.assertEqual({}, node.clean_step)
-            mock_steps.assert_called_once_with(task, enabled=True)
-            self.assertFalse(mock_validate_user_steps.called)
-
-    @mock.patch.object(conductor_utils, '_validate_user_clean_steps')
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test_set_node_cleaning_steps_manual(self, mock_steps,
-                                            mock_validate_user_steps):
-        clean_steps = [self.deploy_raid]
-        mock_steps.return_value = self.clean_steps
-        mock_validate_user_steps.return_value = clean_steps
-
-        node = obj_utils.create_test_node(
-            self.context, driver='fake-hardware',
-            provision_state=states.CLEANING,
-            target_provision_state=states.MANAGEABLE,
-            last_error=None,
-            clean_step=None,
-            driver_internal_info={'clean_steps': clean_steps})
-
-        with task_manager.acquire(
-                self.context, node.uuid, shared=False) as task:
-            conductor_utils.set_node_cleaning_steps(task)
-            node.refresh()
-            self.assertEqual(clean_steps,
-                             node.driver_internal_info['clean_steps'])
-            self.assertEqual({}, node.clean_step)
-            self.assertFalse(mock_steps.called)
-            mock_validate_user_steps.assert_called_once_with(task, clean_steps)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.return_value = self.clean_steps
-
-        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
-                      {'step': 'erase_disks', 'interface': 'deploy'}]
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            result = conductor_utils._validate_user_clean_steps(task,
-                                                                user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-        expected = [{'step': 'update_firmware', 'interface': 'power',
-                     'priority': 10, 'abortable': False},
-                    {'step': 'erase_disks', 'interface': 'deploy',
-                     'priority': 20, 'abortable': True}]
-        self.assertEqual(expected, result)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps_no_steps(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.return_value = self.clean_steps
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            conductor_utils._validate_user_clean_steps(task, [])
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps_get_steps_exception(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.side_effect = exception.NodeCleaningFailure('bad')
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            self.assertRaises(exception.NodeCleaningFailure,
-                              conductor_utils._validate_user_clean_steps,
-                              task, [])
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps_not_supported(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.return_value = [self.power_update, self.deploy_raid]
-        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
-                      {'step': 'bad_step', 'interface': 'deploy'}]
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "does not support.*bad_step",
-                                   conductor_utils._validate_user_clean_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps_invalid_arg(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.return_value = self.clean_steps
-        user_steps = [{'step': 'update_firmware', 'interface': 'power',
-                       'args': {'arg1': 'val1', 'arg2': 'val2'}},
-                      {'step': 'erase_disks', 'interface': 'deploy'}]
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "update_firmware.*unexpected.*arg1",
-                                   conductor_utils._validate_user_clean_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-    @mock.patch.object(conductor_utils, '_get_cleaning_steps')
-    def test__validate_user_clean_steps_missing_required_arg(self, mock_steps):
-        node = obj_utils.create_test_node(self.context)
-        mock_steps.return_value = [self.power_update, self.deploy_raid]
-        user_steps = [{'step': 'update_firmware', 'interface': 'power'},
-                      {'step': 'build_raid', 'interface': 'deploy'}]
-
-        with task_manager.acquire(self.context, node.uuid) as task:
-            self.assertRaisesRegex(exception.InvalidParameterValue,
-                                   "build_raid.*missing.*arg1",
-                                   conductor_utils._validate_user_clean_steps,
-                                   task, user_steps)
-            mock_steps.assert_called_once_with(task, enabled=False, sort=False)
-
-
 class ErrorHandlersTestCase(tests_base.TestCase):
     def setUp(self):
         super(ErrorHandlersTestCase, self).setUp()
@@ -2467,79 +1842,6 @@ class ValidateInstanceInfoTraitsTestCase(tests_base.TestCase):
                                self.node)
 
 
-@mock.patch.object(conductor_utils, '_get_deployment_templates',
-                   autospec=True)
-@mock.patch.object(conductor_utils, '_get_steps_from_deployment_templates',
-                   autospec=True)
-@mock.patch.object(conductor_utils, '_validate_user_deploy_steps',
-                   autospec=True)
-class GetValidatedStepsFromTemplatesTestCase(db_base.DbTestCase):
-
-    def setUp(self):
-        super(GetValidatedStepsFromTemplatesTestCase, self).setUp()
-        self.node = obj_utils.create_test_node(self.context,
-                                               driver='fake-hardware')
-        self.template = obj_utils.get_test_deploy_template(self.context)
-
-    def test_ok(self, mock_validate, mock_steps, mock_templates):
-        mock_templates.return_value = [self.template]
-        steps = [db_utils.get_test_deploy_template_step()]
-        mock_steps.return_value = steps
-        mock_validate.return_value = steps
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            result = conductor_utils._get_validated_steps_from_templates(task)
-            self.assertEqual(steps, result)
-            mock_templates.assert_called_once_with(task)
-            mock_steps.assert_called_once_with(task, [self.template])
-            mock_validate.assert_called_once_with(task, steps, mock.ANY)
-
-    def test_invalid_parameter_value(self, mock_validate, mock_steps,
-                                     mock_templates):
-        mock_templates.return_value = [self.template]
-        mock_validate.side_effect = exception.InvalidParameterValue('fake')
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertRaises(
-                exception.InvalidParameterValue,
-                conductor_utils._get_validated_steps_from_templates, task)
-
-    def test_instance_deploy_failure(self, mock_validate, mock_steps,
-                                     mock_templates):
-        mock_templates.return_value = [self.template]
-        mock_validate.side_effect = exception.InstanceDeployFailure('foo')
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            self.assertRaises(
-                exception.InstanceDeployFailure,
-                conductor_utils._get_validated_steps_from_templates, task)
-
-
-@mock.patch.object(conductor_utils, '_get_validated_steps_from_templates',
-                   autospec=True)
-class ValidateDeployTemplatesTestCase(db_base.DbTestCase):
-
-    def setUp(self):
-        super(ValidateDeployTemplatesTestCase, self).setUp()
-        self.node = obj_utils.create_test_node(self.context,
-                                               driver='fake-hardware')
-
-    def test_ok(self, mock_validated):
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            result = conductor_utils.validate_deploy_templates(task)
-            self.assertIsNone(result)
-            mock_validated.assert_called_once_with(task)
-
-    def test_error(self, mock_validated):
-        with task_manager.acquire(
-                self.context, self.node.uuid, shared=False) as task:
-            mock_validated.side_effect = exception.InvalidParameterValue('foo')
-            self.assertRaises(exception.InvalidParameterValue,
-                              conductor_utils.validate_deploy_templates, task)
-            mock_validated.assert_called_once_with(task)
-
-
 @mock.patch.object(fake.FakePower, 'get_power_state', autospec=True)
 class FastTrackTestCase(db_base.DbTestCase):
 
diff --git a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
index d4a8da6179..49050a7944 100644
--- a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
+++ b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
@@ -20,6 +20,7 @@ import six
 from ironic.common import exception
 from ironic.common import states
 from ironic.common import utils as com_utils
+from ironic.conductor import steps
 from ironic.conductor import task_manager
 from ironic.conductor import utils
 from ironic.drivers.modules.ansible import deploy as ansible_deploy
@@ -734,7 +735,7 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
             self.assertFalse(log_mock.info.called)
 
     @mock.patch.object(ansible_deploy, '_run_playbook', autospec=True)
-    @mock.patch.object(utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(steps, 'set_node_cleaning_steps', autospec=True)
     @mock.patch.object(utils, 'node_power_action', autospec=True)
     @mock.patch('ironic.drivers.modules.deploy_utils.build_agent_options',
                 return_value={'op1': 'test1'}, autospec=True)
@@ -764,7 +765,7 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
             self.assertFalse(run_playbook_mock.called)
             self.assertEqual(states.CLEANWAIT, state)
 
-    @mock.patch.object(utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(steps, 'set_node_cleaning_steps', autospec=True)
     def test_prepare_cleaning_callback_no_steps(self,
                                                 set_node_cleaning_steps):
         with task_manager.acquire(self.context, self.node.uuid) as task:
@@ -1047,7 +1048,7 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
     @mock.patch.object(utils, 'restore_power_state_if_needed', autospec=True)
     @mock.patch.object(utils, 'power_on_node_if_needed', autospec=True)
     @mock.patch.object(ansible_deploy, '_run_playbook', autospec=True)
-    @mock.patch.object(utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(steps, 'set_node_cleaning_steps', autospec=True)
     @mock.patch.object(utils, 'node_power_action', autospec=True)
     @mock.patch.object(deploy_utils, 'build_agent_options', autospec=True)
     @mock.patch.object(pxe.PXEBoot, 'prepare_ramdisk')
diff --git a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
index ff634a8f5c..c175032c81 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
@@ -22,6 +22,7 @@ from oslo_config import cfg
 from ironic.common import boot_devices
 from ironic.common import exception
 from ironic.common import states
+from ironic.conductor import steps as conductor_steps
 from ironic.conductor import task_manager
 from ironic.conductor import utils as manager_utils
 from ironic.drivers import base as drivers_base
@@ -213,7 +214,8 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
     @mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
     @mock.patch.object(agent_base_vendor.HeartbeatMixin,
                        'refresh_clean_steps', autospec=True)
-    @mock.patch.object(manager_utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'notify_conductor_resume_clean',
                        autospec=True)
     def test_heartbeat_resume_clean(self, mock_notify, mock_set_steps,
@@ -234,7 +236,8 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
     @mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
     @mock.patch.object(agent_base_vendor.HeartbeatMixin,
                        'refresh_clean_steps', autospec=True)
-    @mock.patch.object(manager_utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'notify_conductor_resume_clean',
                        autospec=True)
     def test_heartbeat_resume_clean_fails(self, mock_notify, mock_set_steps,
@@ -1351,7 +1354,8 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
             self.deploy.continue_cleaning(task)
             error_mock.assert_called_once_with(task, mock.ANY)
 
-    @mock.patch.object(manager_utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'notify_conductor_resume_clean',
                        autospec=True)
     @mock.patch.object(agent_base_vendor.AgentDeployMixin,
@@ -1390,7 +1394,8 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
         self._test_continue_cleaning_clean_version_mismatch(manual=True)
 
     @mock.patch.object(manager_utils, 'cleaning_error_handler', autospec=True)
-    @mock.patch.object(manager_utils, 'set_node_cleaning_steps', autospec=True)
+    @mock.patch.object(conductor_steps, 'set_node_cleaning_steps',
+                       autospec=True)
     @mock.patch.object(manager_utils, 'notify_conductor_resume_clean',
                        autospec=True)
     @mock.patch.object(agent_base_vendor.AgentDeployMixin,