From 0e7bfe61bdbaa059b80e705e1411f7702be91a3d Mon Sep 17 00:00:00 2001 From: Jean-Emile DARTOIS Date: Wed, 6 Jan 2016 12:44:25 +0100 Subject: [PATCH] Use taskflow library for building and executing action plans The aim of this patchset is to integrate taskflow in the Watcher Applier. Taskflow will help us a lot to make Action Plan execution easy, consistent, scalable and reliable. DocImpact Partially implements: blueprint use-taskflow Change-Id: I903d6509d74a61ad64e1506b8a7156e6e91abcfb Closes-Bug: #1535326 Closes-Bug: #1531912 --- etc/watcher/watcher.conf.sample | 854 +++++++++--------- requirements.txt | 1 + setup.cfg | 10 +- watcher/applier/action_plan/default.py | 42 +- .../{execution => actions}/__init__.py | 0 .../applier/{primitives => actions}/base.py | 21 +- .../change_nova_service_state.py | 24 +- .../{primitives => actions}/factory.py | 2 +- .../loading}/__init__.py | 0 .../loading/default.py | 4 +- .../{primitives => actions}/migration.py | 48 +- .../applier/{primitives => actions}/nop.py | 25 +- watcher/applier/actions/sleep.py | 48 + watcher/applier/default.py | 38 +- watcher/applier/execution/default.py | 57 -- watcher/applier/execution/deploy_phase.py | 56 -- watcher/applier/manager.py | 25 +- .../messaging/{events.py => event_types.py} | 2 +- watcher/applier/messaging/trigger.py | 25 +- watcher/applier/promise.py | 50 - watcher/applier/rpcapi.py | 8 +- .../loading => workflow_engine}/__init__.py | 0 .../{execution => workflow_engine}/base.py | 28 +- watcher/applier/workflow_engine/default.py | 159 ++++ .../workflow_engine/loading}/__init__.py | 0 .../workflow_engine/loading/default.py | 30 + watcher/common/nova.py | 2 +- watcher/decision_engine/planner/default.py | 36 +- .../strategy/strategies/__init__.py | 27 + .../strategies/basic_consolidation.py | 8 +- .../strategy/strategies/dummy_strategy.py | 11 +- .../strategies/outlet_temp_control.py | 4 +- watcher/locale/fr/LC_MESSAGES/watcher.po | 30 +- watcher/locale/watcher.pot | 24 +- .../test_default_action_handler.py | 18 +- .../tests/applier/actions/loading/__init__.py | 0 .../loading/test_default_actions_loader.py | 32 + .../test_default_action_plan_executor.py | 56 -- ...y => test_trigger_action_plan_endpoint.py} | 9 +- .../tests/applier/workflow_engine/__init__.py | 0 .../workflow_engine/loading/__init__.py | 0 .../loading/test_default_engine_loader.py | 32 + .../test_default_workflow_engine.py | 164 ++++ .../planner/test_default_planner.py | 72 +- 44 files changed, 1234 insertions(+), 848 deletions(-) rename watcher/applier/{execution => actions}/__init__.py (100%) rename watcher/applier/{primitives => actions}/base.py (86%) rename watcher/applier/{primitives => actions}/change_nova_service_state.py (80%) rename watcher/applier/{primitives => actions}/factory.py (95%) rename watcher/applier/{primitives => actions/loading}/__init__.py (100%) rename watcher/applier/{primitives => actions}/loading/default.py (89%) rename watcher/applier/{primitives => actions}/migration.py (57%) rename watcher/applier/{primitives => actions}/nop.py (67%) create mode 100644 watcher/applier/actions/sleep.py delete mode 100644 watcher/applier/execution/default.py delete mode 100644 watcher/applier/execution/deploy_phase.py rename watcher/applier/messaging/{events.py => event_types.py} (96%) delete mode 100644 watcher/applier/promise.py rename watcher/applier/{primitives/loading => workflow_engine}/__init__.py (100%) rename watcher/applier/{execution => workflow_engine}/base.py (71%) create mode 100644 watcher/applier/workflow_engine/default.py rename watcher/{tests/applier/execution => applier/workflow_engine/loading}/__init__.py (100%) create mode 100644 watcher/applier/workflow_engine/loading/default.py create mode 100644 watcher/tests/applier/actions/loading/__init__.py create mode 100644 watcher/tests/applier/actions/loading/test_default_actions_loader.py delete mode 100644 watcher/tests/applier/execution/test_default_action_plan_executor.py rename watcher/tests/applier/messaging/{test_launch_action_plan_endpoint.py => test_trigger_action_plan_endpoint.py} (87%) create mode 100644 watcher/tests/applier/workflow_engine/__init__.py create mode 100644 watcher/tests/applier/workflow_engine/loading/__init__.py create mode 100644 watcher/tests/applier/workflow_engine/loading/test_default_engine_loader.py create mode 100644 watcher/tests/applier/workflow_engine/test_default_workflow_engine.py diff --git a/etc/watcher/watcher.conf.sample b/etc/watcher/watcher.conf.sample index 9c3f5be3b..795574a34 100644 --- a/etc/watcher/watcher.conf.sample +++ b/etc/watcher/watcher.conf.sample @@ -4,69 +4,25 @@ # From oslo.log # -# Print debugging output (set logging level to DEBUG instead of -# default INFO level). (boolean value) -#debug = false - -# If set to false, will disable INFO logging level, making WARNING the -# default. (boolean value) -# This option is deprecated for removal. -# Its value may be silently ignored in the future. -#verbose = true - -# The name of a logging configuration file. This file is appended to -# any existing logging configuration files. For details about logging -# configuration files, see the Python logging module documentation. -# Note that when logging configuration files are used then all logging -# configuration is set in the configuration file and other logging -# configuration options are ignored (for example, log_format). (string -# value) -# Deprecated group/name - [DEFAULT]/log_config -#log_config_append = - -# DEPRECATED. A logging.Formatter log message format string which may -# use any of the available logging.LogRecord attributes. This option -# is deprecated. Please use logging_context_format_string and -# logging_default_format_string instead. This option is ignored if -# log_config_append is set. (string value) -#log_format = - -# Format string for %%(asctime)s in log records. Default: %(default)s -# . This option is ignored if log_config_append is set. (string value) -#log_date_format = %Y-%m-%d %H:%M:%S - -# (Optional) Name of log file to output to. If no default is set, -# logging will go to stdout. This option is ignored if -# log_config_append is set. (string value) -# Deprecated group/name - [DEFAULT]/logfile -#log_file = - -# (Optional) The base directory used for relative --log-file paths. -# This option is ignored if log_config_append is set. (string value) -# Deprecated group/name - [DEFAULT]/logdir -#log_dir = - -# (Optional) Uses logging handler designed to watch file system. When -# log file is moved or removed this handler will open a new log file -# with specified path instantaneously. It makes sense only if log-file -# option is specified and Linux platform is used. This option is -# ignored if log_config_append is set. (boolean value) -#watch_log_file = false - # Use syslog for logging. Existing syslog format is DEPRECATED and # will be changed later to honor RFC5424. This option is ignored if # log_config_append is set. (boolean value) #use_syslog = false -# (Optional) Enables or disables syslog rfc5424 format for logging. If -# enabled, prefixes the MSG part of the syslog message with APP-NAME -# (RFC5424). The format without the APP-NAME is deprecated in Kilo, -# and will be removed in Mitaka, along with this option. This option -# is ignored if log_config_append is set. (boolean value) +# Enables or disables syslog rfc5424 format for logging. If enabled, +# prefixes the MSG part of the syslog message with APP-NAME (RFC5424). +# The format without the APP-NAME is deprecated in Kilo, and will be +# removed in Mitaka, along with this option. This option is ignored if +# log_config_append is set. (boolean value) # This option is deprecated for removal. # Its value may be silently ignored in the future. #use_syslog_rfc_format = true +# (Optional) The base directory used for relative log_file paths. +# This option is ignored if log_config_append is set. (string value) +# Deprecated group/name - [DEFAULT]/logdir +#log_dir = + # Syslog facility to receive log lines. This option is ignored if # log_config_append is set. (string value) #syslog_log_facility = LOG_USER @@ -78,47 +34,163 @@ # Format string to use for log messages with context. (string value) #logging_context_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [%(request_id)s %(user_identity)s] %(instance)s%(message)s -# Format string to use for log messages without context. (string -# value) +# Format string to use for log messages when context is undefined. +# (string value) #logging_default_format_string = %(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s [-] %(instance)s%(message)s -# Data to append to log format when level is DEBUG. (string value) +# Additional data to append to log message when logging level for the +# message is DEBUG. (string value) #logging_debug_format_suffix = %(funcName)s %(pathname)s:%(lineno)d +# If set to true, the logging level will be set to DEBUG instead of +# the default INFO level. (boolean value) +#debug = false + # Prefix each line of exception output with this format. (string # value) #logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d ERROR %(name)s %(instance)s -# List of logger=LEVEL pairs. This option is ignored if -# log_config_append is set. (list value) +# If set to false, the logging level will be set to WARNING instead of +# the default INFO level. (boolean value) +# This option is deprecated for removal. +# Its value may be silently ignored in the future. +#verbose = true + +# Defines the format string for %(user_identity)s that is used in +# logging_context_format_string. (string value) +#logging_user_identity_format = %(user)s %(tenant)s %(domain)s %(user_domain)s %(project_domain)s + +# The name of a logging configuration file. This file is appended to +# any existing logging configuration files. For details about logging +# configuration files, see the Python logging module documentation. +# Note that when logging configuration files are used all logging +# configuration is defined in the configuration file and other logging +# configuration options are ignored (for example, log_format). (string +# value) +# Deprecated group/name - [DEFAULT]/log_config +#log_config_append = + +# List of package logging levels in logger=LEVEL pairs. This option is +# ignored if log_config_append is set. (list value) #default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN +# DEPRECATED. A logging.Formatter log message format string which may +# use any of the available logging.LogRecord attributes. This option +# is deprecated. Please use logging_context_format_string and +# logging_default_format_string instead. This option is ignored if +# log_config_append is set. (string value) +#log_format = + # Enables or disables publication of error events. (boolean value) #publish_errors = false +# Defines the format string for %%(asctime)s in log records. Default: +# %(default)s . This option is ignored if log_config_append is set. +# (string value) +#log_date_format = %Y-%m-%d %H:%M:%S + # The format for an instance that is passed with the log message. # (string value) #instance_format = "[instance: %(uuid)s] " +# (Optional) Name of log file to send logging output to. If no default +# is set, logging will go to stderr as defined by use_stderr. This +# option is ignored if log_config_append is set. (string value) +# Deprecated group/name - [DEFAULT]/logfile +#log_file = + # The format for an instance UUID that is passed with the log message. # (string value) #instance_uuid_format = "[instance: %(uuid)s] " -# Format string for user_identity field of the -# logging_context_format_string (string value) -#logging_user_identity_format = %(user)s %(tenant)s %(domain)s %(user_domain)s %(project_domain)s - # Enables or disables fatal status of deprecations. (boolean value) #fatal_deprecations = false +# Uses logging handler designed to watch file system. When log file is +# moved or removed this handler will open a new log file with +# specified path instantaneously. It makes sense only if log_file +# option is specified and Linux platform is used. This option is +# ignored if log_config_append is set. (boolean value) +#watch_log_file = false + # # From oslo.messaging # +# Directory for holding IPC sockets. (string value) +#rpc_zmq_ipc_dir = /var/run/openstack + +# Number of retries to find free port number before fail with +# ZMQBindError. (integer value) +#rpc_zmq_bind_port_retries = 100 + +# AMQP topic used for OpenStack notifications. (list value) +# Deprecated group/name - [rpc_notifier2]/topics +# Deprecated group/name - [DEFAULT]/notification_topics +#topics = notifications + +# Name of this node. Must be a valid hostname, FQDN, or IP address. +# Must match "host" option, if running Nova. (string value) +#rpc_zmq_host = localhost + +# The messaging driver to use, defaults to rabbit. Other drivers +# include amqp and zmq. (string value) +#rpc_backend = rabbit + +# Host to locate redis. (string value) +#host = 127.0.0.1 + +# Seconds to wait before a cast expires (TTL). Only supported by +# impl_zmq. (integer value) +#rpc_cast_timeout = 30 + +# Seconds to wait for a response from a call. (integer value) +#rpc_response_timeout = 60 + +# Use this port to connect to redis host. (port value) +# Minimum value: 1 +# Maximum value: 65535 +#port = 6379 + +# The default number of seconds that poll should wait. Poll raises +# timeout exception when timeout expired. (integer value) +#rpc_poll_timeout = 1 + +# A URL representing the messaging driver to use and its full +# configuration. If not set, we fall back to the rpc_backend option +# and driver specific configuration. (string value) +#transport_url = + +# Password for Redis server (optional). (string value) +#password = + +# Configures zmq-messaging to use proxy with non PUB/SUB patterns. +# (boolean value) +#direct_over_proxy = true + +# The Drivers(s) to handle sending notifications. Possible values are +# messaging, messagingv2, routing, log, test, noop (multi valued) +# Deprecated group/name - [DEFAULT]/notification_driver +#driver = + +# Size of executor thread pool. (integer value) +# Deprecated group/name - [DEFAULT]/rpc_thread_pool_size +#executor_thread_pool_size = 64 + # Size of RPC connection pool. (integer value) # Deprecated group/name - [DEFAULT]/rpc_conn_pool_size #rpc_conn_pool_size = 30 +# Use PUB/SUB pattern for fanout methods. PUB/SUB always uses proxy. +# (boolean value) +#use_pub_sub = true + +# A URL representing the messaging driver to use for notifications. If +# not set, we fall back to the same configuration used for RPC. +# (string value) +# Deprecated group/name - [DEFAULT]/notification_transport_url +#transport_url = + # ZeroMQ bind address. Should be a wildcard (*), an ethernet # interface, or IP. The "host" option should point or resolve to this # address. (string value) @@ -127,6 +199,11 @@ # MatchMaker driver. (string value) #rpc_zmq_matchmaker = redis +# Minimal port number for random ports range. (port value) +# Minimum value: 1 +# Maximum value: 65535 +#rpc_zmq_min_port = 49152 + # Type of concurrency used. Either "native" or "eventlet" (string # value) #rpc_zmq_concurrency = eventlet @@ -134,95 +211,20 @@ # Number of ZeroMQ contexts, defaults to 1. (integer value) #rpc_zmq_contexts = 1 -# Maximum number of ingress messages to locally buffer per topic. -# Default is unlimited. (integer value) -#rpc_zmq_topic_backlog = - -# Directory for holding IPC sockets. (string value) -#rpc_zmq_ipc_dir = /var/run/openstack - -# Name of this node. Must be a valid hostname, FQDN, or IP address. -# Must match "host" option, if running Nova. (string value) -#rpc_zmq_host = localhost - -# Seconds to wait before a cast expires (TTL). Only supported by -# impl_zmq. (integer value) -#rpc_cast_timeout = 30 - -# The default number of seconds that poll should wait. Poll raises -# timeout exception when timeout expired. (integer value) -#rpc_poll_timeout = 1 - -# Configures zmq-messaging to use proxy with non PUB/SUB patterns. -# (boolean value) -#direct_over_proxy = true - -# Use PUB/SUB pattern for fanout methods. PUB/SUB always uses proxy. -# (boolean value) -#use_pub_sub = true - -# Minimal port number for random ports range. (port value) -# Minimum value: 1 -# Maximum value: 65535 -#rpc_zmq_min_port = 49152 - # Maximal port number for random ports range. (integer value) # Minimum value: 1 # Maximum value: 65536 #rpc_zmq_max_port = 65536 -# Number of retries to find free port number before fail with -# ZMQBindError. (integer value) -#rpc_zmq_bind_port_retries = 100 - -# Host to locate redis. (string value) -#host = 127.0.0.1 - -# Use this port to connect to redis host. (port value) -# Minimum value: 1 -# Maximum value: 65535 -#port = 6379 - -# Password for Redis server (optional). (string value) -#password = - -# Size of executor thread pool. (integer value) -# Deprecated group/name - [DEFAULT]/rpc_thread_pool_size -#executor_thread_pool_size = 64 - -# The Drivers(s) to handle sending notifications. Possible values are -# messaging, messagingv2, routing, log, test, noop (multi valued) -# Deprecated group/name - [DEFAULT]/notification_driver -#driver = - -# A URL representing the messaging driver to use for notifications. If -# not set, we fall back to the same configuration used for RPC. -# (string value) -# Deprecated group/name - [DEFAULT]/notification_transport_url -#transport_url = - -# AMQP topic used for OpenStack notifications. (list value) -# Deprecated group/name - [rpc_notifier2]/topics -# Deprecated group/name - [DEFAULT]/notification_topics -#topics = notifications - -# Seconds to wait for a response from a call. (integer value) -#rpc_response_timeout = 60 - -# A URL representing the messaging driver to use and its full -# configuration. If not set, we fall back to the rpc_backend option -# and driver specific configuration. (string value) -#transport_url = - -# The messaging driver to use, defaults to rabbit. Other drivers -# include amqp and zmq. (string value) -#rpc_backend = rabbit - # The default exchange under which topics are scoped. May be # overridden by an exchange name specified in the transport_url # option. (string value) #control_exchange = openstack +# Maximum number of ingress messages to locally buffer per topic. +# Default is unlimited. (integer value) +#rpc_zmq_topic_backlog = + [api] @@ -247,18 +249,6 @@ # From oslo.db # -# The file name to use with SQLite. (string value) -# Deprecated group/name - [DEFAULT]/sqlite_db -#sqlite_db = oslo.sqlite - -# If True, SQLite uses synchronous mode. (boolean value) -# Deprecated group/name - [DEFAULT]/sqlite_synchronous -#sqlite_synchronous = true - -# The back end to use for the database. (string value) -# Deprecated group/name - [DEFAULT]/db_backend -#backend = sqlalchemy - # The SQLAlchemy connection string to use to connect to the database. # (string value) # Deprecated group/name - [DEFAULT]/sql_connection @@ -266,15 +256,12 @@ # Deprecated group/name - [sql]/connection #connection = -# The SQLAlchemy connection string to use to connect to the slave -# database. (string value) -#slave_connection = +# Add Python stack traces to SQL as comment strings. (boolean value) +# Deprecated group/name - [DEFAULT]/sql_connection_trace +#connection_trace = false -# The SQL mode to be used for MySQL sessions. This option, including -# the default, overrides any server-set SQL mode. To use whatever SQL -# mode is set by the server configuration, set this to no value. -# Example: mysql_sql_mode= (string value) -#mysql_sql_mode = TRADITIONAL +# Seconds between retries of a database transaction. (integer value) +#db_retry_interval = 1 # Timeout before idle SQL connections are reaped. (integer value) # Deprecated group/name - [DEFAULT]/sql_idle_timeout @@ -282,11 +269,45 @@ # Deprecated group/name - [sql]/idle_timeout #idle_timeout = 3600 -# Minimum number of SQL connections to keep open in a pool. (integer +# If set, use this value for pool_timeout with SQLAlchemy. (integer # value) -# Deprecated group/name - [DEFAULT]/sql_min_pool_size -# Deprecated group/name - [DATABASE]/sql_min_pool_size -#min_pool_size = 1 +# Deprecated group/name - [DATABASE]/sqlalchemy_pool_timeout +#pool_timeout = + +# If True, SQLite uses synchronous mode. (boolean value) +# Deprecated group/name - [DEFAULT]/sqlite_synchronous +#sqlite_synchronous = true + +# If db_inc_retry_interval is set, the maximum seconds between retries +# of a database operation. (integer value) +#db_max_retry_interval = 10 + +# Enable the experimental use of database reconnect on connection +# lost. (boolean value) +#use_db_reconnect = false + +# Interval between retries of opening a SQL connection. (integer +# value) +# Deprecated group/name - [DEFAULT]/sql_retry_interval +# Deprecated group/name - [DATABASE]/reconnect_interval +#retry_interval = 10 + +# The file name to use with SQLite. (string value) +# Deprecated group/name - [DEFAULT]/sqlite_db +#sqlite_db = oslo.sqlite + +# Maximum retries in case of connection error or deadlock error before +# error is raised. Set to -1 to specify an infinite retry count. +# (integer value) +#db_max_retries = 20 + +# If True, increases the interval between retries of a database +# operation up to db_max_retry_interval. (boolean value) +#db_inc_retry_interval = true + +# The SQLAlchemy connection string to use to connect to the slave +# database. (string value) +#slave_connection = # Maximum number of SQL connections to keep open in a pool. (integer # value) @@ -300,11 +321,16 @@ # Deprecated group/name - [DATABASE]/sql_max_retries #max_retries = 10 -# Interval between retries of opening a SQL connection. (integer +# Minimum number of SQL connections to keep open in a pool. (integer # value) -# Deprecated group/name - [DEFAULT]/sql_retry_interval -# Deprecated group/name - [DATABASE]/reconnect_interval -#retry_interval = 10 +# Deprecated group/name - [DEFAULT]/sql_min_pool_size +# Deprecated group/name - [DATABASE]/sql_min_pool_size +#min_pool_size = 1 + +# Verbosity of SQL debugging information: 0=None, 100=Everything. +# (integer value) +# Deprecated group/name - [DEFAULT]/sql_connection_debug +#connection_debug = 0 # If set, use this value for max_overflow with SQLAlchemy. (integer # value) @@ -312,39 +338,15 @@ # Deprecated group/name - [DATABASE]/sqlalchemy_max_overflow #max_overflow = -# Verbosity of SQL debugging information: 0=None, 100=Everything. -# (integer value) -# Deprecated group/name - [DEFAULT]/sql_connection_debug -#connection_debug = 0 +# The SQL mode to be used for MySQL sessions. This option, including +# the default, overrides any server-set SQL mode. To use whatever SQL +# mode is set by the server configuration, set this to no value. +# Example: mysql_sql_mode= (string value) +#mysql_sql_mode = TRADITIONAL -# Add Python stack traces to SQL as comment strings. (boolean value) -# Deprecated group/name - [DEFAULT]/sql_connection_trace -#connection_trace = false - -# If set, use this value for pool_timeout with SQLAlchemy. (integer -# value) -# Deprecated group/name - [DATABASE]/sqlalchemy_pool_timeout -#pool_timeout = - -# Enable the experimental use of database reconnect on connection -# lost. (boolean value) -#use_db_reconnect = false - -# Seconds between retries of a database transaction. (integer value) -#db_retry_interval = 1 - -# If True, increases the interval between retries of a database -# operation up to db_max_retry_interval. (boolean value) -#db_inc_retry_interval = true - -# If db_inc_retry_interval is set, the maximum seconds between retries -# of a database operation. (integer value) -#db_max_retry_interval = 10 - -# Maximum retries in case of connection error or deadlock error before -# error is raised. Set to -1 to specify an infinite retry count. -# (integer value) -#db_max_retries = 20 +# The back end to use for the database. (string value) +# Deprecated group/name - [DEFAULT]/db_backend +#backend = sqlalchemy [keystone_authtoken] @@ -353,109 +355,28 @@ # From keystonemiddleware.auth_token # -# Complete public Identity API endpoint. (string value) -#auth_uri = - -# API version of the admin Identity API endpoint. (string value) -#auth_version = - -# Do not handle authorization requests within the middleware, but -# delegate the authorization decision to downstream WSGI components. -# (boolean value) -#delay_auth_decision = false - -# Request timeout value for communicating with Identity API server. -# (integer value) -#http_connect_timeout = - -# How many times are we trying to reconnect when communicating with -# Identity API Server. (integer value) -#http_request_max_retries = 3 - -# Env key for the swift cache. (string value) -#cache = - -# Required if identity server requires client certificate (string -# value) -#certfile = - -# Required if identity server requires client certificate (string -# value) -#keyfile = - # A PEM encoded Certificate Authority to use when verifying HTTPs # connections. Defaults to system CAs. (string value) #cafile = -# Verify HTTPS connections. (boolean value) -#insecure = false - -# The region in which the identity server can be found. (string value) -#region_name = - -# Directory used to cache files related to PKI tokens. (string value) -#signing_dir = - -# Optionally specify a list of memcached server(s) to use for caching. -# If left undefined, tokens will instead be cached in-process. (list -# value) -# Deprecated group/name - [DEFAULT]/memcache_servers -#memcached_servers = - -# In order to prevent excessive effort spent validating tokens, the -# middleware caches previously-seen tokens for a configurable duration -# (in seconds). Set to -1 to disable caching completely. (integer -# value) -#token_cache_time = 300 - -# Determines the frequency at which the list of revoked tokens is -# retrieved from the Identity service (in seconds). A high number of -# revocation events combined with a low cache duration may -# significantly reduce performance. (integer value) -#revocation_cache_time = 10 - -# (Optional) If defined, indicate whether token data should be -# authenticated or authenticated and encrypted. Acceptable values are -# MAC or ENCRYPT. If MAC, token data is authenticated (with HMAC) in -# the cache. If ENCRYPT, token data is encrypted and authenticated in -# the cache. If the value is not one of these options or empty, -# auth_token will raise an exception on initialization. (string value) -#memcache_security_strategy = - -# (Optional, mandatory if memcache_security_strategy is defined) This -# string is used for key derivation. (string value) -#memcache_secret_key = - -# (Optional) Number of seconds memcached server is considered dead -# before it is tried again. (integer value) -#memcache_pool_dead_retry = 300 - -# (Optional) Maximum total number of open connections to every -# memcached server. (integer value) -#memcache_pool_maxsize = 10 - -# (Optional) Socket timeout in seconds for communicating with a -# memcached server. (integer value) -#memcache_pool_socket_timeout = 3 - -# (Optional) Number of seconds a connection to memcached is held -# unused in the pool before it is closed. (integer value) -#memcache_pool_unused_timeout = 60 - -# (Optional) Number of seconds that an operation will wait to get a -# memcached client connection from the pool. (integer value) -#memcache_pool_conn_get_timeout = 10 - -# (Optional) Use the advanced (eventlet safe) memcached client pool. -# The advanced pool will only work under python 2.x. (boolean value) -#memcache_use_advanced_pool = false - # (Optional) Indicate whether to set the X-Service-Catalog header. If # False, middleware will not ask for service catalog on token # validation and will not set the X-Service-Catalog header. (boolean # value) #include_service_catalog = true +# (Optional) If defined, indicate whether token data should be +# authenticated or authenticated and encrypted. If MAC, token data is +# authenticated (with HMAC) in the cache. If ENCRYPT, token data is +# encrypted and authenticated in the cache. If the value is not one of +# these options or empty, auth_token will raise an exception on +# initialization. (string value) +# Allowed values: None, MAC, ENCRYPT +#memcache_security_strategy = None + +# Verify HTTPS connections. (boolean value) +#insecure = false + # Used to control the use and type of token binding. Can be set to: # "disabled" to not check token binding. "permissive" (default) to # validate binding information if the bind type is of a form known to @@ -465,11 +386,21 @@ # binding method that must be present in tokens. (string value) #enforce_token_bind = permissive +# The region in which the identity server can be found. (string value) +#region_name = + # If true, the revocation list will be checked for cached tokens. This # requires that PKI tokens are configured on the identity server. # (boolean value) #check_revocations_for_cached = false +# Request timeout value for communicating with Identity API server. +# (integer value) +#http_connect_timeout = + +# Directory used to cache files related to PKI tokens. (string value) +#signing_dir = + # Hash algorithms to use for hashing PKI tokens. This may be a single # algorithm or multiple. The algorithms are those supported by Python # standard hashlib.new(). The hashes will be tried in the order given, @@ -481,42 +412,15 @@ # value) #hash_algorithms = md5 -# Prefix to prepend at the beginning of the path. Deprecated, use -# identity_uri. (string value) -#auth_admin_prefix = - -# Host providing the admin Identity API endpoint. Deprecated, use -# identity_uri. (string value) -#auth_host = 127.0.0.1 - -# Port of the admin Identity API endpoint. Deprecated, use -# identity_uri. (integer value) -#auth_port = 35357 - -# Protocol of the admin Identity API endpoint (http or https). -# Deprecated, use identity_uri. (string value) -#auth_protocol = https - -# Complete admin Identity API endpoint. This should specify the -# unversioned root endpoint e.g. https://localhost:35357/ (string +# Optionally specify a list of memcached server(s) to use for caching. +# If left undefined, tokens will instead be cached in-process. (list # value) -#identity_uri = +# Deprecated group/name - [DEFAULT]/memcache_servers +#memcached_servers = -# This option is deprecated and may be removed in a future release. -# Single shared secret with the Keystone configuration used for -# bootstrapping a Keystone installation, or otherwise bypassing the -# normal authentication process. This option should not be used, use -# `admin_user` and `admin_password` instead. (string value) -#admin_token = - -# Service username. (string value) -#admin_user = - -# Service user password. (string value) -#admin_password = - -# Service tenant name. (string value) -#admin_tenant_name = admin +# Required if identity server requires client certificate (string +# value) +#certfile = # Authentication type to load (unknown value) # Deprecated group/name - [DEFAULT]/auth_plugin @@ -526,6 +430,68 @@ # value) #auth_section = +# In order to prevent excessive effort spent validating tokens, the +# middleware caches previously-seen tokens for a configurable duration +# (in seconds). Set to -1 to disable caching completely. (integer +# value) +#token_cache_time = 300 + +# API version of the admin Identity API endpoint. (string value) +#auth_version = + +# Determines the frequency at which the list of revoked tokens is +# retrieved from the Identity service (in seconds). A high number of +# revocation events combined with a low cache duration may +# significantly reduce performance. (integer value) +#revocation_cache_time = 10 + +# Complete public Identity API endpoint. (string value) +#auth_uri = + +# (Optional) Socket timeout in seconds for communicating with a +# memcached server. (integer value) +#memcache_pool_socket_timeout = 3 + +# (Optional, mandatory if memcache_security_strategy is defined) This +# string is used for key derivation. (string value) +#memcache_secret_key = + +# Do not handle authorization requests within the middleware, but +# delegate the authorization decision to downstream WSGI components. +# (boolean value) +#delay_auth_decision = false + +# (Optional) Use the advanced (eventlet safe) memcached client pool. +# The advanced pool will only work under python 2.x. (boolean value) +#memcache_use_advanced_pool = false + +# (Optional) Maximum total number of open connections to every +# memcached server. (integer value) +#memcache_pool_maxsize = 10 + +# How many times are we trying to reconnect when communicating with +# Identity API Server. (integer value) +#http_request_max_retries = 3 + +# (Optional) Number of seconds memcached server is considered dead +# before it is tried again. (integer value) +#memcache_pool_dead_retry = 300 + +# (Optional) Number of seconds a connection to memcached is held +# unused in the pool before it is closed. (integer value) +#memcache_pool_unused_timeout = 60 + +# (Optional) Number of seconds that an operation will wait to get a +# memcached client connection from the pool. (integer value) +#memcache_pool_conn_get_timeout = 10 + +# Env key for the swift cache. (string value) +#cache = + +# Required if identity server requires client certificate (string +# value) +#keyfile = + [matchmaker_redis] @@ -533,6 +499,9 @@ # From oslo.messaging # +# Password for Redis server (optional). (string value) +#password = + # Host to locate redis. (string value) #host = 127.0.0.1 @@ -541,9 +510,6 @@ # Maximum value: 65535 #port = 6379 -# Password for Redis server (optional). (string value) -#password = - [oslo_messaging_amqp] @@ -551,33 +517,13 @@ # From oslo.messaging # -# address prefix used when sending to a specific server (string value) -# Deprecated group/name - [amqp1]/server_request_prefix -#server_request_prefix = exclusive - -# address prefix used when broadcasting to all servers (string value) -# Deprecated group/name - [amqp1]/broadcast_prefix -#broadcast_prefix = broadcast - -# address prefix when sending to any server in group (string value) -# Deprecated group/name - [amqp1]/group_request_prefix -#group_request_prefix = unicast - -# Name for the AMQP container (string value) -# Deprecated group/name - [amqp1]/container_name -#container_name = - # Timeout for inactive connections (in seconds) (integer value) # Deprecated group/name - [amqp1]/idle_timeout #idle_timeout = 0 -# Debug: dump AMQP frames to stdout (boolean value) -# Deprecated group/name - [amqp1]/trace -#trace = false - -# CA certificate PEM file to verify server certificate (string value) -# Deprecated group/name - [amqp1]/ssl_ca_file -#ssl_ca_file = +# Password for message broker authentication (string value) +# Deprecated group/name - [amqp1]/password +#password = # Identifying certificate PEM file to present to clients (string # value) @@ -589,34 +535,54 @@ # Deprecated group/name - [amqp1]/ssl_key_file #ssl_key_file = -# Password for decrypting ssl_key_file (if encrypted) (string value) -# Deprecated group/name - [amqp1]/ssl_key_password -#ssl_key_password = - -# Accept clients using either SSL or plain TCP (boolean value) -# Deprecated group/name - [amqp1]/allow_insecure_clients -#allow_insecure_clients = false - -# Space separated list of acceptable SASL mechanisms (string value) -# Deprecated group/name - [amqp1]/sasl_mechanisms -#sasl_mechanisms = +# address prefix when sending to any server in group (string value) +# Deprecated group/name - [amqp1]/group_request_prefix +#group_request_prefix = unicast # Path to directory that contains the SASL configuration (string # value) # Deprecated group/name - [amqp1]/sasl_config_dir #sasl_config_dir = +# Debug: dump AMQP frames to stdout (boolean value) +# Deprecated group/name - [amqp1]/trace +#trace = false + +# Password for decrypting ssl_key_file (if encrypted) (string value) +# Deprecated group/name - [amqp1]/ssl_key_password +#ssl_key_password = + +# address prefix used when sending to a specific server (string value) +# Deprecated group/name - [amqp1]/server_request_prefix +#server_request_prefix = exclusive + # Name of configuration file (without .conf suffix) (string value) # Deprecated group/name - [amqp1]/sasl_config_name #sasl_config_name = +# Name for the AMQP container (string value) +# Deprecated group/name - [amqp1]/container_name +#container_name = + +# Accept clients using either SSL or plain TCP (boolean value) +# Deprecated group/name - [amqp1]/allow_insecure_clients +#allow_insecure_clients = false + +# CA certificate PEM file to verify server certificate (string value) +# Deprecated group/name - [amqp1]/ssl_ca_file +#ssl_ca_file = + # User name for message broker authentication (string value) # Deprecated group/name - [amqp1]/username #username = -# Password for message broker authentication (string value) -# Deprecated group/name - [amqp1]/password -#password = +# address prefix used when broadcasting to all servers (string value) +# Deprecated group/name - [amqp1]/broadcast_prefix +#broadcast_prefix = broadcast + +# Space separated list of acceptable SASL mechanisms (string value) +# Deprecated group/name - [amqp1]/sasl_mechanisms +#sasl_mechanisms = [oslo_messaging_rabbit] @@ -625,51 +591,61 @@ # From oslo.messaging # -# Use durable queues in AMQP. (boolean value) -# Deprecated group/name - [DEFAULT]/amqp_durable_queues -# Deprecated group/name - [DEFAULT]/rabbit_durable_queues -#amqp_durable_queues = false - -# Auto-delete queues in AMQP. (boolean value) -# Deprecated group/name - [DEFAULT]/amqp_auto_delete -#amqp_auto_delete = false - -# SSL version to use (valid only if SSL enabled). Valid values are -# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be -# available on some distributions. (string value) -# Deprecated group/name - [DEFAULT]/kombu_ssl_version -#kombu_ssl_version = - -# SSL key file (valid only if SSL enabled). (string value) -# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile -#kombu_ssl_keyfile = +# The RabbitMQ password. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_password +#rabbit_password = guest # SSL cert file (valid only if SSL enabled). (string value) # Deprecated group/name - [DEFAULT]/kombu_ssl_certfile #kombu_ssl_certfile = +# The RabbitMQ login method. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_login_method +#rabbit_login_method = AMQPLAIN + +# How long to backoff for between retries when connecting to RabbitMQ. +# (integer value) +# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff +#rabbit_retry_backoff = 2 + # SSL certification authority file (valid only if SSL enabled). # (string value) # Deprecated group/name - [DEFAULT]/kombu_ssl_ca_certs #kombu_ssl_ca_certs = +# The RabbitMQ virtual host. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_virtual_host +#rabbit_virtual_host = / + # How long to wait before reconnecting in response to an AMQP consumer # cancel notification. (floating point value) # Deprecated group/name - [DEFAULT]/kombu_reconnect_delay #kombu_reconnect_delay = 1.0 +# How frequently to retry connecting with RabbitMQ. (integer value) +#rabbit_retry_interval = 1 + # How long to wait a missing client beforce abandoning to send it its # replies. This value should not be longer than rpc_response_timeout. # (integer value) # Deprecated group/name - [DEFAULT]/kombu_reconnect_timeout #kombu_missing_consumer_retry_timeout = 60 +# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake +# (boolean value) +# Deprecated group/name - [DEFAULT]/fake_rabbit +#fake_rabbit = false + # Determines how the next RabbitMQ node is chosen in case the one we # are currently connected to becomes unavailable. Takes effect only if # more than one RabbitMQ node is provided in config. (string value) # Allowed values: round-robin, shuffle #kombu_failover_strategy = round-robin +# How often times during the heartbeat_timeout_threshold we check the +# heartbeat. (integer value) +#heartbeat_rate = 2 + # The RabbitMQ broker address where a single node is used. (string # value) # Deprecated group/name - [DEFAULT]/rabbit_host @@ -681,61 +657,51 @@ # Deprecated group/name - [DEFAULT]/rabbit_port #rabbit_port = 5672 -# RabbitMQ HA cluster host:port pairs. (list value) -# Deprecated group/name - [DEFAULT]/rabbit_hosts -#rabbit_hosts = $rabbit_host:$rabbit_port +# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this +# option, you must wipe the RabbitMQ database. (boolean value) +# Deprecated group/name - [DEFAULT]/rabbit_ha_queues +#rabbit_ha_queues = false -# Connect over SSL for RabbitMQ. (boolean value) -# Deprecated group/name - [DEFAULT]/rabbit_use_ssl -#rabbit_use_ssl = false - -# The RabbitMQ userid. (string value) -# Deprecated group/name - [DEFAULT]/rabbit_userid -#rabbit_userid = guest - -# The RabbitMQ password. (string value) -# Deprecated group/name - [DEFAULT]/rabbit_password -#rabbit_password = guest - -# The RabbitMQ login method. (string value) -# Deprecated group/name - [DEFAULT]/rabbit_login_method -#rabbit_login_method = AMQPLAIN - -# The RabbitMQ virtual host. (string value) -# Deprecated group/name - [DEFAULT]/rabbit_virtual_host -#rabbit_virtual_host = / - -# How frequently to retry connecting with RabbitMQ. (integer value) -#rabbit_retry_interval = 1 - -# How long to backoff for between retries when connecting to RabbitMQ. -# (integer value) -# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff -#rabbit_retry_backoff = 2 +# Use durable queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_durable_queues +# Deprecated group/name - [DEFAULT]/rabbit_durable_queues +#amqp_durable_queues = false # Maximum number of RabbitMQ connection retries. Default is 0 # (infinite retry count). (integer value) # Deprecated group/name - [DEFAULT]/rabbit_max_retries #rabbit_max_retries = 0 -# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this -# option, you must wipe the RabbitMQ database. (boolean value) -# Deprecated group/name - [DEFAULT]/rabbit_ha_queues -#rabbit_ha_queues = false +# RabbitMQ HA cluster host:port pairs. (list value) +# Deprecated group/name - [DEFAULT]/rabbit_hosts +#rabbit_hosts = $rabbit_host:$rabbit_port + +# Auto-delete queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_auto_delete +#amqp_auto_delete = false # Number of seconds after which the Rabbit broker is considered down # if heartbeat's keep-alive fails (0 disable the heartbeat). # EXPERIMENTAL (integer value) #heartbeat_timeout_threshold = 60 -# How often times during the heartbeat_timeout_threshold we check the -# heartbeat. (integer value) -#heartbeat_rate = 2 +# Connect over SSL for RabbitMQ. (boolean value) +# Deprecated group/name - [DEFAULT]/rabbit_use_ssl +#rabbit_use_ssl = false -# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake -# (boolean value) -# Deprecated group/name - [DEFAULT]/fake_rabbit -#fake_rabbit = false +# SSL version to use (valid only if SSL enabled). Valid values are +# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be +# available on some distributions. (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_version +#kombu_ssl_version = + +# The RabbitMQ userid. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_userid +#rabbit_userid = guest + +# SSL key file (valid only if SSL enabled). (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile +#kombu_ssl_keyfile = [watcher_applier] @@ -744,21 +710,25 @@ # From watcher # -# The number of worker (integer value) -#applier_worker = 1 +# Number of workers for applier, default value is 1. (integer value) +# Minimum value: 1 +#workers = 1 + +# Select the engine to use to execute the workflow (string value) +#workflow_engine = taskflow # The topic name used forcontrol events, this topic used for rpc call # (string value) #topic_control = watcher.applier.control -# The topic name used for status events, this topic is used so as to -# notifythe others components of the system (string value) -#topic_status = watcher.applier.status - # The identifier used by watcher module on the message broker (string # value) #publisher_id = watcher.applier.api +# The topic name used for status events, this topic is used so as to +# notifythe others components of the system (string value) +#topic_status = watcher.applier.status + [watcher_decision_engine] @@ -766,22 +736,22 @@ # From watcher # -# The topic name used forcontrol events, this topic used for rpc call -# (string value) -#topic_control = watcher.decision.control - -# The topic name used for status events, this topic is used so as to -# notifythe others components of the system (string value) -#topic_status = watcher.decision.status - # The identifier used by watcher module on the message broker (string # value) #publisher_id = watcher.decision.api +# The topic name used forcontrol events, this topic used for rpc call +# (string value) +#topic_control = watcher.decision.control + # The maximum number of threads that can be used to execute strategies # (integer value) #max_workers = 2 +# The topic name used for status events, this topic is used so as to +# notifythe others components of the system (string value) +#topic_status = watcher.decision.status + [watcher_goals] diff --git a/requirements.txt b/requirements.txt index d30f66e5b..2e66858f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,4 +26,5 @@ python-openstackclient>=1.5.0 six>=1.9.0 SQLAlchemy>=0.9.9,<1.1.0 stevedore>=1.5.0 # Apache-2.0 +taskflow>=1.25.0 # Apache-2.0 WSME>=0.7 diff --git a/setup.cfg b/setup.cfg index aef6ff09f..80ace47e7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,9 +47,13 @@ watcher_strategies = outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl watcher_actions = - migrate = watcher.applier.primitives.migration:Migrate - nop = watcher.applier.primitives.nop:Nop - change_nova_service_state = watcher.applier.primitives.change_nova_service_state:ChangeNovaServiceState + migrate = watcher.applier.actions.migration:Migrate + nop = watcher.applier.actions.nop:Nop + sleep = watcher.applier.actions.sleep:Sleep + change_nova_service_state = watcher.applier.actions.change_nova_service_state:ChangeNovaServiceState + +watcher_workflow_engines = + taskflow = watcher.applier.workflow_engine.default:DefaultWorkFlowEngine watcher_planners = default = watcher.decision_engine.planner.default:DefaultPlanner diff --git a/watcher/applier/action_plan/default.py b/watcher/applier/action_plan/default.py index fa05e1bf4..cf9ba5625 100644 --- a/watcher/applier/action_plan/default.py +++ b/watcher/applier/action_plan/default.py @@ -18,51 +18,51 @@ # from oslo_log import log -from watcher.applier.action_plan.base import BaseActionPlanHandler -from watcher.applier.default import DefaultApplier -from watcher.applier.messaging.events import Events -from watcher.common.messaging.events.event import Event -from watcher.objects.action_plan import ActionPlan -from watcher.objects.action_plan import Status +from watcher.applier.action_plan import base +from watcher.applier import default +from watcher.applier.messaging import event_types +from watcher.common.messaging.events import event +from watcher import objects LOG = log.getLogger(__name__) -class DefaultActionPlanHandler(BaseActionPlanHandler): - def __init__(self, context, manager_applier, action_plan_uuid): +class DefaultActionPlanHandler(base.BaseActionPlanHandler): + def __init__(self, context, applier_manager, action_plan_uuid): super(DefaultActionPlanHandler, self).__init__() self.ctx = context self.action_plan_uuid = action_plan_uuid - self.manager_applier = manager_applier + self.applier_manager = applier_manager def notify(self, uuid, event_type, state): - action_plan = ActionPlan.get_by_uuid(self.ctx, uuid) + action_plan = objects.ActionPlan.get_by_uuid(self.ctx, uuid) action_plan.state = state action_plan.save() - event = Event() - event.type = event_type - event.data = {} + ev = event.Event() + ev.type = event_type + ev.data = {} payload = {'action_plan__uuid': uuid, 'action_plan_state': state} - self.manager_applier.topic_status.publish_event(event.type.name, + self.applier_manager.topic_status.publish_event(ev.type.name, payload) def execute(self): try: # update state self.notify(self.action_plan_uuid, - Events.LAUNCH_ACTION_PLAN, - Status.ONGOING) - applier = DefaultApplier(self.manager_applier, self.ctx) + event_types.EventTypes.LAUNCH_ACTION_PLAN, + objects.action_plan.Status.ONGOING) + applier = default.DefaultApplier(self.applier_manager, self.ctx) result = applier.execute(self.action_plan_uuid) except Exception as e: + LOG.exception(e) result = False - LOG.error("Launch Action Plan " + unicode(e)) finally: if result is True: - status = Status.SUCCEEDED + status = objects.action_plan.Status.SUCCEEDED else: - status = Status.FAILED + status = objects.action_plan.Status.FAILED # update state - self.notify(self.action_plan_uuid, Events.LAUNCH_ACTION_PLAN, + self.notify(self.action_plan_uuid, + event_types.EventTypes.LAUNCH_ACTION_PLAN, status) diff --git a/watcher/applier/execution/__init__.py b/watcher/applier/actions/__init__.py similarity index 100% rename from watcher/applier/execution/__init__.py rename to watcher/applier/actions/__init__.py diff --git a/watcher/applier/primitives/base.py b/watcher/applier/actions/base.py similarity index 86% rename from watcher/applier/primitives/base.py rename to watcher/applier/actions/base.py index ff663d3a5..86f930c04 100644 --- a/watcher/applier/primitives/base.py +++ b/watcher/applier/actions/base.py @@ -32,16 +32,15 @@ the appropriate commands to Nova for this type of """ import abc -import six -from watcher.applier import promise +import six @six.add_metaclass(abc.ABCMeta) -class BasePrimitive(object): +class BaseAction(object): def __init__(self): - self._input_parameters = None - self._applies_to = None + self._input_parameters = {} + self._applies_to = "" @property def input_parameters(self): @@ -59,12 +58,18 @@ class BasePrimitive(object): def applies_to(self, a): self._applies_to = a - @promise.Promise @abc.abstractmethod def execute(self): raise NotImplementedError() - @promise.Promise @abc.abstractmethod - def undo(self): + def revert(self): + raise NotImplementedError() + + @abc.abstractmethod + def precondition(self): + raise NotImplementedError() + + @abc.abstractmethod + def postcondition(self): raise NotImplementedError() diff --git a/watcher/applier/primitives/change_nova_service_state.py b/watcher/applier/actions/change_nova_service_state.py similarity index 80% rename from watcher/applier/primitives/change_nova_service_state.py rename to watcher/applier/actions/change_nova_service_state.py index ec8e673e1..b7bb40524 100644 --- a/watcher/applier/primitives/change_nova_service_state.py +++ b/watcher/applier/actions/change_nova_service_state.py @@ -19,30 +19,23 @@ from watcher._i18n import _ -from watcher.applier.primitives import base -from watcher.applier import promise +from watcher.applier.actions import base from watcher.common import exception from watcher.common import keystone as kclient from watcher.common import nova as nclient from watcher.decision_engine.model import hypervisor_state as hstate -class ChangeNovaServiceState(base.BasePrimitive): - def __init__(self): - """This class allows us to change the state of nova-compute service.""" - super(ChangeNovaServiceState, self).__init__() - self._host = self.applies_to - self._state = self.input_parameters.get('state') +class ChangeNovaServiceState(base.BaseAction): @property def host(self): - return self._host + return self.applies_to @property def state(self): - return self._state + return self.input_parameters.get('state') - @promise.Promise def execute(self): target_state = None if self.state == hstate.HypervisorState.OFFLINE.value: @@ -51,8 +44,7 @@ class ChangeNovaServiceState(base.BasePrimitive): target_state = True return self.nova_manage_service(target_state) - @promise.Promise - def undo(self): + def revert(self): target_state = None if self.state == hstate.HypervisorState.OFFLINE.value: target_state = True @@ -72,3 +64,9 @@ class ChangeNovaServiceState(base.BasePrimitive): return wrapper.enable_service_nova_compute(self.host) else: return wrapper.disable_service_nova_compute(self.host) + + def precondition(self): + pass + + def postcondition(self): + pass diff --git a/watcher/applier/primitives/factory.py b/watcher/applier/actions/factory.py similarity index 95% rename from watcher/applier/primitives/factory.py rename to watcher/applier/actions/factory.py index 40c67d163..d21600adc 100644 --- a/watcher/applier/primitives/factory.py +++ b/watcher/applier/actions/factory.py @@ -19,7 +19,7 @@ from __future__ import unicode_literals from oslo_log import log -from watcher.applier.primitives.loading import default +from watcher.applier.actions.loading import default LOG = log.getLogger(__name__) diff --git a/watcher/applier/primitives/__init__.py b/watcher/applier/actions/loading/__init__.py similarity index 100% rename from watcher/applier/primitives/__init__.py rename to watcher/applier/actions/loading/__init__.py diff --git a/watcher/applier/primitives/loading/default.py b/watcher/applier/actions/loading/default.py similarity index 89% rename from watcher/applier/primitives/loading/default.py rename to watcher/applier/actions/loading/default.py index 0540feb76..3ebab3742 100644 --- a/watcher/applier/primitives/loading/default.py +++ b/watcher/applier/actions/loading/default.py @@ -19,11 +19,11 @@ from __future__ import unicode_literals from oslo_log import log -from watcher.common.loader.default import DefaultLoader +from watcher.common.loader import default LOG = log.getLogger(__name__) -class DefaultActionLoader(DefaultLoader): +class DefaultActionLoader(default.DefaultLoader): def __init__(self): super(DefaultActionLoader, self).__init__(namespace='watcher_actions') diff --git a/watcher/applier/primitives/migration.py b/watcher/applier/actions/migration.py similarity index 57% rename from watcher/applier/primitives/migration.py rename to watcher/applier/actions/migration.py index 17f07f974..dbb447d1f 100644 --- a/watcher/applier/primitives/migration.py +++ b/watcher/applier/actions/migration.py @@ -17,27 +17,42 @@ # limitations under the License. # +from oslo_log import log -from watcher.applier.primitives import base -from watcher.applier import promise +from watcher.applier.actions import base from watcher.common import exception from watcher.common import keystone as kclient from watcher.common import nova as nclient +LOG = log.getLogger(__name__) -class Migrate(base.BasePrimitive): - def __init__(self): - super(Migrate, self).__init__() - self.instance_uuid = self.applies_to - self.migration_type = self.input_parameters.get('migration_type') + +class Migrate(base.BaseAction): + @property + def instance_uuid(self): + return self.applies_to + + @property + def migration_type(self): + return self.input_parameters.get('migration_type') + + @property + def dst_hypervisor(self): + return self.input_parameters.get('dst_hypervisor') + + @property + def src_hypervisor(self): + return self.input_parameters.get('src_hypervisor') def migrate(self, destination): keystone = kclient.KeystoneClient() wrapper = nclient.NovaClient(keystone.get_credentials(), session=keystone.get_session()) + LOG.debug("Migrate instance %s to %s ", self.instance_uuid, + destination) instance = wrapper.find_instance(self.instance_uuid) if instance: - if self.migration_type is 'live': + if self.migration_type == 'live': return wrapper.live_migrate_instance( instance_id=self.instance_uuid, dest_hostname=destination) else: @@ -45,10 +60,17 @@ class Migrate(base.BasePrimitive): else: raise exception.InstanceNotFound(name=self.instance_uuid) - @promise.Promise def execute(self): - return self.migrate(self.input_parameters.get('dst_hypervisor_uuid')) + return self.migrate(destination=self.dst_hypervisor) - @promise.Promise - def undo(self): - return self.migrate(self.input_parameters.get('src_hypervisor_uuid')) + def revert(self): + return self.migrate(destination=self.src_hypervisor) + + def precondition(self): + # todo(jed) check if the instance exist/ check if the instance is on + # the src_hypervisor + pass + + def postcondition(self): + # todo(jed) we can image to check extra parameters (nework reponse,ect) + pass diff --git a/watcher/applier/primitives/nop.py b/watcher/applier/actions/nop.py similarity index 67% rename from watcher/applier/primitives/nop.py rename to watcher/applier/actions/nop.py index d0e3ee4ff..afefc0d7d 100644 --- a/watcher/applier/primitives/nop.py +++ b/watcher/applier/actions/nop.py @@ -19,23 +19,28 @@ from oslo_log import log - -from watcher.applier.primitives import base -from watcher.applier import promise +from watcher.applier.actions import base LOG = log.getLogger(__name__) -class Nop(base.BasePrimitive): +class Nop(base.BaseAction): + + @property + def message(self): + return self.input_parameters.get('message') - @promise.Promise def execute(self): - LOG.debug("executing action NOP message:%s ", - self.input_parameters.get('message')) + LOG.debug("executing action NOP message:%s ", self.message) return True - @promise.Promise - def undo(self): - LOG.debug("undo action NOP") + def revert(self): + LOG.debug("revert action NOP") return True + + def precondition(self): + pass + + def postcondition(self): + pass diff --git a/watcher/applier/actions/sleep.py b/watcher/applier/actions/sleep.py new file mode 100644 index 000000000..8256e8c28 --- /dev/null +++ b/watcher/applier/actions/sleep.py @@ -0,0 +1,48 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import time + +from oslo_log import log + +from watcher.applier.actions import base + + +LOG = log.getLogger(__name__) + + +class Sleep(base.BaseAction): + + @property + def duration(self): + return int(self.input_parameters.get('duration')) + + def execute(self): + LOG.debug("Starting action Sleep duration:%s ", self.duration) + time.sleep(self.duration) + return True + + def revert(self): + LOG.debug("revert action Sleep") + return True + + def precondition(self): + pass + + def postcondition(self): + pass diff --git a/watcher/applier/default.py b/watcher/applier/default.py index 4d070ca6d..d2b4529c5 100644 --- a/watcher/applier/default.py +++ b/watcher/applier/default.py @@ -16,24 +16,48 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from oslo_config import cfg +from oslo_log import log from watcher.applier import base -from watcher.applier.execution import default +from watcher.applier.workflow_engine.loading import default from watcher import objects +LOG = log.getLogger(__name__) +CONF = cfg.CONF + class DefaultApplier(base.BaseApplier): - def __init__(self, manager_applier, context): + def __init__(self, applier_manager, context): super(DefaultApplier, self).__init__() - self.manager_applier = manager_applier - self.context = context - self.executor = default.DefaultActionPlanExecutor(manager_applier, - context) + self._applier_manager = applier_manager + self._loader = default.DefaultWorkFlowEngineLoader() + self._engine = None + self._context = context + + @property + def context(self): + return self._context + + @property + def applier_manager(self): + return self._applier_manager + + @property + def engine(self): + if self._engine is None: + selected_workflow_engine = CONF.watcher_applier.workflow_engine + LOG.debug("Loading workflow engine %s ", selected_workflow_engine) + self._engine = self._loader.load(name=selected_workflow_engine) + self._engine.context = self.context + self._engine.applier_manager = self.applier_manager + return self._engine def execute(self, action_plan_uuid): + LOG.debug("Executing action plan %s ", action_plan_uuid) action_plan = objects.ActionPlan.get_by_uuid(self.context, action_plan_uuid) # todo(jed) remove direct access to dbapi need filter in object filters = {'action_plan_id': action_plan.id} actions = objects.Action.dbapi.get_action_list(self.context, filters) - return self.executor.execute(actions) + return self.engine.execute(actions) diff --git a/watcher/applier/execution/default.py b/watcher/applier/execution/default.py deleted file mode 100644 index 43b18dcd7..000000000 --- a/watcher/applier/execution/default.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from oslo_log import log - -from watcher._i18n import _LE -from watcher.applier.execution import base -from watcher.applier.execution import deploy_phase -from watcher.objects import action_plan - -LOG = log.getLogger(__name__) - - -class DefaultActionPlanExecutor(base.BaseActionPlanExecutor): - def __init__(self, manager_applier, context): - super(DefaultActionPlanExecutor, self).__init__(manager_applier, - context) - self.deploy = deploy_phase.DeployPhase(self) - - def execute(self, actions): - for action in actions: - try: - self.notify(action, action_plan.Status.ONGOING) - loaded_action = self.action_factory.make_action(action) - result = self.deploy.execute_primitive(loaded_action) - if result is False: - self.notify(action, action_plan.Status.FAILED) - self.deploy.rollback() - return False - else: - self.deploy.populate(loaded_action) - self.notify(action, action_plan.Status.SUCCEEDED) - except Exception as e: - LOG.expection(e) - LOG.debug('The ActionPlanExecutor failed to execute the action' - ' %s ', action) - - LOG.error(_LE("Trigger a rollback")) - self.notify(action, action_plan.Status.FAILED) - self.deploy.rollback() - return False - return True diff --git a/watcher/applier/execution/deploy_phase.py b/watcher/applier/execution/deploy_phase.py deleted file mode 100644 index a7985d7d7..000000000 --- a/watcher/applier/execution/deploy_phase.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from oslo_log import log - -LOG = log.getLogger(__name__) - - -class DeployPhase(object): - def __init__(self, executor): - # todo(jed) oslo_conf 10 secondes - self._max_timeout = 100000 - self._actions = [] - self._executor = executor - - @property - def actions(self): - return self._actions - - @property - def max_timeout(self): - return self._max_timeout - - @max_timeout.setter - def max_timeout(self, m): - self._max_timeout = m - - def populate(self, action): - self._actions.append(action) - - def execute_primitive(self, primitive): - future = primitive.execute(primitive) - return future.result(self.max_timeout) - - def rollback(self): - reverted = sorted(self.actions, reverse=True) - for primitive in reverted: - try: - self.execute_primitive(primitive) - except Exception as e: - LOG.error(e) diff --git a/watcher/applier/manager.py b/watcher/applier/manager.py index d4150ca03..ce933ba30 100644 --- a/watcher/applier/manager.py +++ b/watcher/applier/manager.py @@ -16,20 +16,24 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from concurrent.futures import ThreadPoolExecutor from oslo_config import cfg from oslo_log import log -from watcher.applier.messaging.trigger import TriggerActionPlan -from watcher.common.messaging.messaging_core import MessagingCore +from watcher.applier.messaging import trigger +from watcher.common.messaging import messaging_core LOG = log.getLogger(__name__) CONF = cfg.CONF + # Register options APPLIER_MANAGER_OPTS = [ - cfg.IntOpt('applier_worker', default='1', help='The number of worker'), + cfg.IntOpt('workers', + default='1', + min=1, + required=True, + help='Number of workers for applier, default value is 1.'), cfg.StrOpt('topic_control', default='watcher.applier.control', help='The topic name used for' @@ -45,7 +49,11 @@ APPLIER_MANAGER_OPTS = [ cfg.StrOpt('publisher_id', default='watcher.applier.api', help='The identifier used by watcher ' - 'module on the message broker') + 'module on the message broker'), + cfg.StrOpt('workflow_engine', + default='taskflow', + required=True, + help='Select the engine to use to execute the workflow') ] opt_group = cfg.OptGroup(name='watcher_applier', @@ -55,7 +63,7 @@ CONF.register_group(opt_group) CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) -class ApplierManager(MessagingCore): +class ApplierManager(messaging_core.MessagingCore): def __init__(self): super(ApplierManager, self).__init__( CONF.watcher_applier.publisher_id, @@ -63,10 +71,7 @@ class ApplierManager(MessagingCore): CONF.watcher_applier.topic_status, api_version=self.API_VERSION, ) - # shared executor of the workflow - self.executor = ThreadPoolExecutor(max_workers=1) - # trigger action_plan - self.topic_control.add_endpoint(TriggerActionPlan(self)) + self.topic_control.add_endpoint(trigger.TriggerActionPlan(self)) def join(self): self.topic_control.join() diff --git a/watcher/applier/messaging/events.py b/watcher/applier/messaging/event_types.py similarity index 96% rename from watcher/applier/messaging/events.py rename to watcher/applier/messaging/event_types.py index eb6de5c74..d6b916964 100644 --- a/watcher/applier/messaging/events.py +++ b/watcher/applier/messaging/event_types.py @@ -20,6 +20,6 @@ import enum -class Events(enum.Enum): +class EventTypes(enum.Enum): LAUNCH_ACTION_PLAN = "launch_action_plan" LAUNCH_ACTION = "launch_action" diff --git a/watcher/applier/messaging/trigger.py b/watcher/applier/messaging/trigger.py index b66295908..1c4b3a756 100644 --- a/watcher/applier/messaging/trigger.py +++ b/watcher/applier/messaging/trigger.py @@ -16,30 +16,35 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from concurrent import futures + +from oslo_config import cfg from oslo_log import log -from watcher.applier.action_plan.default import DefaultActionPlanHandler +from watcher.applier.action_plan import default LOG = log.getLogger(__name__) +CONF = cfg.CONF class TriggerActionPlan(object): - def __init__(self, manager_applier): - self.manager_applier = manager_applier + def __init__(self, applier_manager): + self.applier_manager = applier_manager + workers = CONF.watcher_applier.workers + self.executor = futures.ThreadPoolExecutor(max_workers=workers) def do_launch_action_plan(self, context, action_plan_uuid): try: - cmd = DefaultActionPlanHandler(context, - self.manager_applier, - action_plan_uuid) + cmd = default.DefaultActionPlanHandler(context, + self.applier_manager, + action_plan_uuid) cmd.execute() except Exception as e: LOG.exception(e) def launch_action_plan(self, context, action_plan_uuid): - LOG.debug("Trigger ActionPlan %s" % action_plan_uuid) + LOG.debug("Trigger ActionPlan %s", action_plan_uuid) # submit - self.manager_applier.executor.submit(self.do_launch_action_plan, - context, - action_plan_uuid) + self.executor.submit(self.do_launch_action_plan, context, + action_plan_uuid) return action_plan_uuid diff --git a/watcher/applier/promise.py b/watcher/applier/promise.py deleted file mode 100644 index dbede895e..000000000 --- a/watcher/applier/promise.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from concurrent.futures import Future -from concurrent.futures import ThreadPoolExecutor - - -class Promise(object): - executor = ThreadPoolExecutor( - max_workers=10) - - def __init__(self, func): - self.func = func - - def resolve(self, *args, **kwargs): - resolved_args = [] - resolved_kwargs = {} - - for i, arg in enumerate(args): - if isinstance(arg, Future): - resolved_args.append(arg.result()) - else: - resolved_args.append(arg) - - for kw, arg in kwargs.items(): - if isinstance(arg, Future): - resolved_kwargs[kw] = arg.result() - else: - resolved_kwargs[kw] = arg - - return self.func(*resolved_args, **resolved_kwargs) - - def __call__(self, *args, **kwargs): - return self.executor.submit(self.resolve, *args, **kwargs) diff --git a/watcher/applier/rpcapi.py b/watcher/applier/rpcapi.py index bae28074e..e3c4f6db6 100644 --- a/watcher/applier/rpcapi.py +++ b/watcher/applier/rpcapi.py @@ -23,8 +23,8 @@ import oslo_messaging as om from watcher.applier.manager import APPLIER_MANAGER_OPTS from watcher.applier.manager import opt_group from watcher.common import exception -from watcher.common.messaging.messaging_core import MessagingCore -from watcher.common.messaging.notification_handler import NotificationHandler +from watcher.common.messaging import messaging_core +from watcher.common.messaging import notification_handler as notification from watcher.common import utils @@ -34,7 +34,7 @@ CONF.register_group(opt_group) CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) -class ApplierAPI(MessagingCore): +class ApplierAPI(messaging_core.MessagingCore): def __init__(self): super(ApplierAPI, self).__init__( @@ -43,7 +43,7 @@ class ApplierAPI(MessagingCore): CONF.watcher_applier.topic_status, api_version=self.API_VERSION, ) - self.handler = NotificationHandler(self.publisher_id) + self.handler = notification.NotificationHandler(self.publisher_id) self.handler.register_observer(self) self.topic_status.add_endpoint(self.handler) transport = om.get_transport(CONF) diff --git a/watcher/applier/primitives/loading/__init__.py b/watcher/applier/workflow_engine/__init__.py similarity index 100% rename from watcher/applier/primitives/loading/__init__.py rename to watcher/applier/workflow_engine/__init__.py diff --git a/watcher/applier/execution/base.py b/watcher/applier/workflow_engine/base.py similarity index 71% rename from watcher/applier/execution/base.py rename to watcher/applier/workflow_engine/base.py index bb147bbe7..5e42eec27 100644 --- a/watcher/applier/execution/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -20,26 +20,34 @@ import abc import six -from watcher.applier.messaging import events -from watcher.applier.primitives import factory +from watcher.applier.actions import factory +from watcher.applier.messaging import event_types from watcher.common.messaging.events import event from watcher import objects @six.add_metaclass(abc.ABCMeta) -class BaseActionPlanExecutor(object): - def __init__(self, manager_applier, context): - self._manager_applier = manager_applier - self._context = context +class BaseWorkFlowEngine(object): + def __init__(self): + self._applier_manager = None + self._context = None self._action_factory = factory.ActionFactory() @property def context(self): return self._context + @context.setter + def context(self, c): + self._context = c + @property - def manager_applier(self): - return self._manager_applier + def applier_manager(self): + return self._applier_manager + + @applier_manager.setter + def applier_manager(self, a): + self._applier_manager = a @property def action_factory(self): @@ -50,11 +58,11 @@ class BaseActionPlanExecutor(object): db_action.state = state db_action.save() ev = event.Event() - ev.type = events.Events.LAUNCH_ACTION + ev.type = event_types.EventTypes.LAUNCH_ACTION ev.data = {} payload = {'action_uuid': action.uuid, 'action_state': state} - self.manager_applier.topic_status.publish_event(ev.type.name, + self.applier_manager.topic_status.publish_event(ev.type.name, payload) @abc.abstractmethod diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py new file mode 100644 index 000000000..97b3d75d3 --- /dev/null +++ b/watcher/applier/workflow_engine/default.py @@ -0,0 +1,159 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from oslo_log import log +from taskflow import engines +from taskflow.patterns import graph_flow as gf +from taskflow import task + +from watcher._i18n import _LE, _LW, _LC +from watcher.applier.workflow_engine import base +from watcher.objects import action as obj_action + +LOG = log.getLogger(__name__) + + +class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): + def decider(self, history): + # FIXME(jed) not possible with the current Watcher Planner + # + # decider – A callback function that will be expected to + # decide at runtime whether v should be allowed to execute + # (or whether the execution of v should be ignored, + # and therefore not executed). It is expected to take as single + # keyword argument history which will be the execution results of + # all u decideable links that have v as a target. It is expected + # to return a single boolean + # (True to allow v execution or False to not). + return True + + def execute(self, actions): + try: + # NOTE(jed) We want to have a strong separation of concern + # between the Watcher planner and the Watcher Applier in order + # to us the possibility to support several workflow engine. + # We want to provide the 'taskflow' engine by + # default although we still want to leave the possibility for + # the users to change it. + # todo(jed) we need to change the way the actions are stored. + # The current implementation only use a linked list of actions. + # todo(jed) add olso conf for retry and name + flow = gf.Flow("watcher_flow") + previous = None + for a in actions: + task = TaskFlowActionContainer(a, self) + flow.add(task) + if previous is None: + previous = task + # we have only one Action in the Action Plan + if len(actions) == 1: + nop = TaskFlowNop() + flow.add(nop) + flow.link(previous, nop) + else: + # decider == guard (UML) + flow.link(previous, task, decider=self.decider) + previous = task + + e = engines.load(flow) + e.run() + return True + except Exception as e: + LOG.exception(e) + return False + + +class TaskFlowActionContainer(task.Task): + def __init__(self, db_action, engine): + name = "action_type:{0} uuid:{1}".format(db_action.action_type, + db_action.uuid) + super(TaskFlowActionContainer, self).__init__(name=name) + self._db_action = db_action + self._engine = engine + self.loaded_action = None + + @property + def action(self): + if self.loaded_action is None: + action = self.engine.action_factory.make_action(self._db_action) + self.loaded_action = action + return self.loaded_action + + @property + def engine(self): + return self._engine + + def pre_execute(self): + try: + self.engine.notify(self._db_action, + obj_action.Status.ONGOING) + LOG.debug("Precondition action %s", self.name) + self.action.precondition() + except Exception as e: + LOG.exception(e) + self.engine.notify(self._db_action, + obj_action.Status.FAILED) + raise + + def execute(self, *args, **kwargs): + try: + LOG.debug("Running action %s", self.name) + + # todo(jed) remove return (true or false) raise an Exception + result = self.action.execute() + if result is not True: + self.engine.notify(self._db_action, + obj_action.Status.FAILED) + else: + self.engine.notify(self._db_action, + obj_action.Status.SUCCEEDED) + except Exception as e: + LOG.exception(e) + LOG.error(_LE('The WorkFlow Engine has failed ' + 'to execute the action %s'), self.name) + + self.engine.notify(self._db_action, + obj_action.Status.FAILED) + raise + + def post_execute(self): + try: + LOG.debug("postcondition action %s", self.name) + self.action.postcondition() + except Exception as e: + LOG.exception(e) + self.engine.notify(self._db_action, + obj_action.Status.FAILED) + raise + + def revert(self, *args, **kwargs): + LOG.warning(_LW("Revert action %s"), self.name) + try: + # todo(jed) do we need to update the states in case of failure ? + self.action.revert() + except Exception as e: + LOG.exception(e) + LOG.critical(_LC("Oops! We need disaster recover plan")) + + +class TaskFlowNop(task.Task): + """This class is use in case of the workflow have only one Action. + + We need at least two atoms to create a link + """ + def execute(self): + pass diff --git a/watcher/tests/applier/execution/__init__.py b/watcher/applier/workflow_engine/loading/__init__.py similarity index 100% rename from watcher/tests/applier/execution/__init__.py rename to watcher/applier/workflow_engine/loading/__init__.py diff --git a/watcher/applier/workflow_engine/loading/default.py b/watcher/applier/workflow_engine/loading/default.py new file mode 100644 index 000000000..6494987e1 --- /dev/null +++ b/watcher/applier/workflow_engine/loading/default.py @@ -0,0 +1,30 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import unicode_literals + +from oslo_log import log + +from watcher.common.loader import default + +LOG = log.getLogger(__name__) + + +class DefaultWorkFlowEngineLoader(default.DefaultLoader): + def __init__(self): + super(DefaultWorkFlowEngineLoader, self).__init__( + namespace='watcher_workflow_engines') diff --git a/watcher/common/nova.py b/watcher/common/nova.py index 636f00223..c5d0ffccb 100644 --- a/watcher/common/nova.py +++ b/watcher/common/nova.py @@ -328,7 +328,7 @@ class NovaClient(object): return False def live_migrate_instance(self, instance_id, dest_hostname, - block_migration=True, retry=120): + block_migration=False, retry=120): """This method does a live migration of a given instance This method uses the Nova built-in live_migrate() diff --git a/watcher/decision_engine/planner/default.py b/watcher/decision_engine/planner/default.py index 1d37ca426..35a2bc6b5 100644 --- a/watcher/decision_engine/planner/default.py +++ b/watcher/decision_engine/planner/default.py @@ -30,8 +30,9 @@ LOG = log.getLogger(__name__) class DefaultPlanner(base.BasePlanner): priorities = { 'nop': 0, - 'migrate': 1, + 'sleep': 1, 'change_nova_service_state': 2, + 'migrate': 3, } def create_action(self, @@ -53,7 +54,7 @@ class DefaultPlanner(base.BasePlanner): return action def schedule(self, context, audit_id, solution): - LOG.debug('Create an action plan for the audit uuid') + LOG.debug('Create an action plan for the audit uuid: %s ', audit_id) action_plan = self._create_action_plan(context, audit_id) actions = list(solution.actions) @@ -76,18 +77,20 @@ class DefaultPlanner(base.BasePlanner): action_plan.first_action_id = None action_plan.save() else: + # create the first action parent_action = self._create_action(context, scheduled[0][1], None) + # remove first scheduled.pop(0) action_plan.first_action_id = parent_action.id action_plan.save() for s_action in scheduled: - action = self._create_action(context, s_action[1], - parent_action) - parent_action = action + current_action = self._create_action(context, s_action[1], + parent_action) + parent_action = current_action return action_plan @@ -105,16 +108,19 @@ class DefaultPlanner(base.BasePlanner): return new_action_plan def _create_action(self, context, _action, parent_action): - action_description = str(_action) - LOG.debug("Create a action for the following resquest : %s" - % action_description) + try: + LOG.debug("Creating the %s in watcher db", + _action.get("action_type")) - new_action = objects.Action(context, **_action) - new_action.create(context) - new_action.save() + new_action = objects.Action(context, **_action) + new_action.create(context) + new_action.save() - if parent_action: - parent_action.next = new_action.id - parent_action.save() + if parent_action: + parent_action.next = new_action.id + parent_action.save() - return new_action + return new_action + except Exception as exc: + LOG.exception(exc) + raise diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index e69de29bb..5b7521b26 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -0,0 +1,27 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from watcher.decision_engine.strategy.strategies import basic_consolidation +from watcher.decision_engine.strategy.strategies import dummy_strategy +from watcher.decision_engine.strategy.strategies import outlet_temp_control + +BasicConsolidation = basic_consolidation.BasicConsolidation +OutletTempControl = outlet_temp_control.OutletTempControl +DummyStrategy = dummy_strategy.DummyStrategy + + +__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy) diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index f332e26bd..5b2172d67 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -336,11 +336,11 @@ class BasicConsolidation(BaseStrategy): def add_migration(self, applies_to, migration_type, - src_hypervisor_uuid, - dst_hypervisor_uuid): + src_hypervisor, + dst_hypervisor): parameters = {'migration_type': migration_type, - 'src_hypervisor_uuid': src_hypervisor_uuid, - 'dst_hypervisor_uuid': dst_hypervisor_uuid} + 'src_hypervisor': src_hypervisor, + 'dst_hypervisor': dst_hypervisor} self.solution.add_action(action_type=self.MIGRATION, applies_to=applies_to, input_parameters=parameters) diff --git a/watcher/decision_engine/strategy/strategies/dummy_strategy.py b/watcher/decision_engine/strategy/strategies/dummy_strategy.py index 73f811dfc..a9b3340b1 100644 --- a/watcher/decision_engine/strategy/strategies/dummy_strategy.py +++ b/watcher/decision_engine/strategy/strategies/dummy_strategy.py @@ -28,6 +28,7 @@ class DummyStrategy(BaseStrategy): DEFAULT_DESCRIPTION = "Dummy Strategy" NOP = "nop" + SLEEP = "sleep" def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION): super(DummyStrategy, self).__init__(name, description) @@ -38,6 +39,12 @@ class DummyStrategy(BaseStrategy): applies_to="", input_parameters=parameters) - # todo(jed) add a new action to test the flow - # with two differents actions + parameters = {'message': 'Welcome'} + self.solution.add_action(action_type=self.NOP, + applies_to="", + input_parameters=parameters) + + self.solution.add_action(action_type=self.SLEEP, + applies_to="", + input_parameters={'duration': '5'}) return self.solution diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index 1e093ea17..7b7f3954b 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -237,8 +237,8 @@ class OutletTempControl(BaseStrategy): mig_src_hypervisor, mig_dst_hypervisor): parameters = {'migration_type': 'live', - 'src_hypervisor_uuid': mig_src_hypervisor, - 'dst_hypervisor_uuid': mig_dst_hypervisor} + 'src_hypervisor': mig_src_hypervisor, + 'dst_hypervisor': mig_dst_hypervisor} self.solution.add_action(action_type=self.MIGRATION, applies_to=vm_src, input_parameters=parameters) diff --git a/watcher/locale/fr/LC_MESSAGES/watcher.po b/watcher/locale/fr/LC_MESSAGES/watcher.po index 54ca4e7c6..5000eabcf 100644 --- a/watcher/locale/fr/LC_MESSAGES/watcher.po +++ b/watcher/locale/fr/LC_MESSAGES/watcher.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: python-watcher 0.21.1.dev32\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2016-01-15 10:25+0100\n" +"POT-Creation-Date: 2016-01-19 17:54+0100\n" "PO-Revision-Date: 2015-12-11 15:42+0100\n" "Last-Translator: FULL NAME \n" "Language: fr\n" @@ -71,14 +71,24 @@ msgstr "" msgid "Error parsing HTTP response: %s" msgstr "" -#: watcher/applier/execution/default.py:52 -msgid "Trigger a rollback" -msgstr "" - -#: watcher/applier/primitives/change_nova_service_state.py:66 +#: watcher/applier/actions/change_nova_service_state.py:58 msgid "The target state is not defined" msgstr "" +#: watcher/applier/workflow_engine/default.py:69 +#, python-format +msgid "The WorkFlow Engine has failed to execute the action %s" +msgstr "Le moteur de workflow a echoué lors de l'éxécution de l'action %s" + +#: watcher/applier/workflow_engine/default.py:77 +#, python-format +msgid "Revert action %s" +msgstr "Annulation de l'action %s" + +#: watcher/applier/workflow_engine/default.py:83 +msgid "Oops! We need disaster recover plan" +msgstr "Oops! Nous avons besoin d'un plan de reprise d'activité" + #: watcher/cmd/api.py:46 watcher/cmd/applier.py:39 #: watcher/cmd/decisionengine.py:40 #, python-format @@ -353,7 +363,7 @@ msgstr "" msgid "'obj' argument type is not valid" msgstr "" -#: watcher/decision_engine/planner/default.py:75 +#: watcher/decision_engine/planner/default.py:76 msgid "The action plan is empty" msgstr "" @@ -547,3 +557,9 @@ msgstr "" #~ msgid "The hypervisor could not be found" #~ msgstr "" +#~ msgid "Trigger a rollback" +#~ msgstr "" + +#~ msgid "The WorkFlow Engine has failedto execute the action %s" +#~ msgstr "" + diff --git a/watcher/locale/watcher.pot b/watcher/locale/watcher.pot index b51c39576..80394f0ce 100644 --- a/watcher/locale/watcher.pot +++ b/watcher/locale/watcher.pot @@ -7,9 +7,9 @@ #, fuzzy msgid "" msgstr "" -"Project-Id-Version: python-watcher 0.22.1.dev19\n" +"Project-Id-Version: python-watcher 0.22.1.dev28\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2016-01-15 10:25+0100\n" +"POT-Creation-Date: 2016-01-19 17:54+0100\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" @@ -70,12 +70,22 @@ msgstr "" msgid "Error parsing HTTP response: %s" msgstr "" -#: watcher/applier/execution/default.py:52 -msgid "Trigger a rollback" +#: watcher/applier/actions/change_nova_service_state.py:58 +msgid "The target state is not defined" msgstr "" -#: watcher/applier/primitives/change_nova_service_state.py:66 -msgid "The target state is not defined" +#: watcher/applier/workflow_engine/default.py:69 +#, python-format +msgid "The WorkFlow Engine has failed to execute the action %s" +msgstr "" + +#: watcher/applier/workflow_engine/default.py:77 +#, python-format +msgid "Revert action %s" +msgstr "" + +#: watcher/applier/workflow_engine/default.py:83 +msgid "Oops! We need disaster recover plan" msgstr "" #: watcher/cmd/api.py:46 watcher/cmd/applier.py:39 @@ -351,7 +361,7 @@ msgstr "" msgid "'obj' argument type is not valid" msgstr "" -#: watcher/decision_engine/planner/default.py:75 +#: watcher/decision_engine/planner/default.py:76 msgid "The action plan is empty" msgstr "" diff --git a/watcher/tests/applier/action_plan/test_default_action_handler.py b/watcher/tests/applier/action_plan/test_default_action_handler.py index dbd099bb1..2e139c838 100644 --- a/watcher/tests/applier/action_plan/test_default_action_handler.py +++ b/watcher/tests/applier/action_plan/test_default_action_handler.py @@ -20,7 +20,7 @@ from mock import call from mock import MagicMock from watcher.applier.action_plan.default import DefaultActionPlanHandler -from watcher.applier.messaging.events import Events +from watcher.applier.messaging.event_types import EventTypes from watcher.objects.action_plan import Status from watcher.objects import ActionPlan from watcher.tests.db.base import DbTestCase @@ -33,17 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase): self.action_plan = obj_utils.create_test_action_plan( self.context) - def test_launch_action_plan_wihout_errors(self): - try: - - command = DefaultActionPlanHandler(self.context, MagicMock(), - self.action_plan.uuid) - command.execute() - except Exception as e: - self.fail( - "The ActionPlan should be trigged wihtour error" + unicode(e)) - - def test_launch_action_plan_state_failed(self): + def test_launch_action_plan(self): command = DefaultActionPlanHandler(self.context, MagicMock(), self.action_plan.uuid) command.execute() @@ -57,10 +47,10 @@ class TestDefaultActionPlanHandler(DbTestCase): self.action_plan.uuid) command.execute() - call_on_going = call(Events.LAUNCH_ACTION_PLAN.name, { + call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, { 'action_plan_status': Status.ONGOING, 'action_plan__uuid': self.action_plan.uuid}) - call_succeeded = call(Events.LAUNCH_ACTION_PLAN.name, { + call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, { 'action_plan_status': Status.SUCCEEDED, 'action_plan__uuid': self.action_plan.uuid}) diff --git a/watcher/tests/applier/actions/loading/__init__.py b/watcher/tests/applier/actions/loading/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/applier/actions/loading/test_default_actions_loader.py b/watcher/tests/applier/actions/loading/test_default_actions_loader.py new file mode 100644 index 000000000..57716e536 --- /dev/null +++ b/watcher/tests/applier/actions/loading/test_default_actions_loader.py @@ -0,0 +1,32 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import unicode_literals + +from watcher.applier.actions import base as abase +from watcher.applier.actions.loading import default +from watcher.tests import base + + +class TestDefaultActionLoader(base.TestCase): + def setUp(self): + super(TestDefaultActionLoader, self).setUp() + self.loader = default.DefaultActionLoader() + + def test_endpoints(self): + for endpoint in self.loader.list_available(): + loaded = self.loader.load(endpoint) + self.assertIsNotNone(loaded) + self.assertIsInstance(loaded, abase.BaseAction) diff --git a/watcher/tests/applier/execution/test_default_action_plan_executor.py b/watcher/tests/applier/execution/test_default_action_plan_executor.py deleted file mode 100644 index 99588d70c..000000000 --- a/watcher/tests/applier/execution/test_default_action_plan_executor.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import mock - -from watcher.applier.execution import default -from watcher.common import utils -from watcher import objects -from watcher.tests.db import base - - -class TestDefaultActionPlanExecutor(base.DbTestCase): - def setUp(self): - super(TestDefaultActionPlanExecutor, self).setUp() - self.executor = default.DefaultActionPlanExecutor(mock.MagicMock(), - self.context) - - def test_execute(self): - actions = mock.MagicMock() - result = self.executor.execute(actions) - self.assertEqual(result, True) - - def test_execute_with_actions(self): - actions = [] - action = { - 'uuid': utils.generate_uuid(), - 'action_plan_id': 0, - 'action_type': "nop", - 'applies_to': '', - 'input_parameters': {'state': 'OFFLINE'}, - 'state': objects.action.Status.PENDING, - 'alarm': None, - 'next': None, - } - new_action = objects.Action(self.context, **action) - new_action.create(self.context) - new_action.save() - actions.append(objects.Action.get_by_uuid(self.context, - action['uuid'])) - result = self.executor.execute(actions) - self.assertEqual(result, True) diff --git a/watcher/tests/applier/messaging/test_launch_action_plan_endpoint.py b/watcher/tests/applier/messaging/test_trigger_action_plan_endpoint.py similarity index 87% rename from watcher/tests/applier/messaging/test_launch_action_plan_endpoint.py rename to watcher/tests/applier/messaging/test_trigger_action_plan_endpoint.py index 2ce121bb5..2231dd8f9 100644 --- a/watcher/tests/applier/messaging/test_launch_action_plan_endpoint.py +++ b/watcher/tests/applier/messaging/test_trigger_action_plan_endpoint.py @@ -18,8 +18,9 @@ # -from mock import MagicMock -from watcher.applier.messaging.trigger import TriggerActionPlan +import mock + +from watcher.applier.messaging import trigger from watcher.common import utils from watcher.tests import base @@ -27,8 +28,8 @@ from watcher.tests import base class TestTriggerActionPlan(base.TestCase): def __init__(self, *args, **kwds): super(TestTriggerActionPlan, self).__init__(*args, **kwds) - self.applier = MagicMock() - self.endpoint = TriggerActionPlan(self.applier) + self.applier = mock.MagicMock() + self.endpoint = trigger.TriggerActionPlan(self.applier) def setUp(self): super(TestTriggerActionPlan, self).setUp() diff --git a/watcher/tests/applier/workflow_engine/__init__.py b/watcher/tests/applier/workflow_engine/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/applier/workflow_engine/loading/__init__.py b/watcher/tests/applier/workflow_engine/loading/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/applier/workflow_engine/loading/test_default_engine_loader.py b/watcher/tests/applier/workflow_engine/loading/test_default_engine_loader.py new file mode 100644 index 000000000..d9ee39a29 --- /dev/null +++ b/watcher/tests/applier/workflow_engine/loading/test_default_engine_loader.py @@ -0,0 +1,32 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import unicode_literals + +from watcher.applier.workflow_engine import base as wbase +from watcher.applier.workflow_engine.loading import default +from watcher.tests import base + + +class TestDefaultActionLoader(base.TestCase): + def setUp(self): + super(TestDefaultActionLoader, self).setUp() + self.loader = default.DefaultWorkFlowEngineLoader() + + def test_endpoints(self): + for endpoint in self.loader.list_available(): + loaded = self.loader.load(endpoint) + self.assertIsNotNone(loaded) + self.assertIsInstance(loaded, wbase.BaseWorkFlowEngine) diff --git a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py new file mode 100644 index 000000000..ff6ff34fc --- /dev/null +++ b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py @@ -0,0 +1,164 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import abc +import mock + +import six +from stevedore import driver +from stevedore import extension + +from watcher.applier.actions import base as abase +from watcher.applier.workflow_engine import default as tflow +from watcher.common import utils +from watcher import objects +from watcher.tests.db import base + + +@six.add_metaclass(abc.ABCMeta) +class FakeAction(abase.BaseAction): + def precondition(self): + pass + + def revert(self): + pass + + def execute(self): + raise Exception() + + @classmethod + def namespace(cls): + return "TESTING" + + @classmethod + def get_name(cls): + return 'fake_action' + + +class TestDefaultWorkFlowEngine(base.DbTestCase): + def setUp(self): + super(TestDefaultWorkFlowEngine, self).setUp() + self.engine = tflow.DefaultWorkFlowEngine() + self.engine.context = self.context + self.engine.applier_manager = mock.MagicMock() + + def test_execute(self): + actions = mock.MagicMock() + result = self.engine.execute(actions) + self.assertEqual(result, True) + + def create_action(self, action_type, applies_to, parameters, next): + action = { + 'uuid': utils.generate_uuid(), + 'action_plan_id': 0, + 'action_type': action_type, + 'applies_to': applies_to, + 'input_parameters': parameters, + 'state': objects.action.Status.PENDING, + 'alarm': None, + 'next': next, + } + new_action = objects.Action(self.context, **action) + new_action.create(self.context) + new_action.save() + return new_action + + def check_action_state(self, action, expected_state): + to_check = objects.Action.get_by_uuid(self.context, action.uuid) + self.assertEqual(to_check.state, expected_state) + + def check_actions_state(self, actions, expected_state): + for a in actions: + self.check_action_state(a, expected_state) + + def test_execute_with_no_actions(self): + actions = [] + result = self.engine.execute(actions) + self.assertEqual(result, True) + + def test_execute_with_one_action(self): + actions = [self.create_action("nop", "", {'message': 'test'}, None)] + result = self.engine.execute(actions) + self.assertEqual(result, True) + self.check_actions_state(actions, objects.action.Status.SUCCEEDED) + + def test_execute_with_two_actions(self): + actions = [] + next = self.create_action("sleep", "", {'duration': '0'}, None) + first = self.create_action("nop", "", {'message': 'test'}, next.id) + + actions.append(first) + actions.append(next) + + result = self.engine.execute(actions) + self.assertEqual(result, True) + self.check_actions_state(actions, objects.action.Status.SUCCEEDED) + + def test_execute_with_three_actions(self): + actions = [] + next2 = self.create_action("nop", "vm1", {'message': 'next'}, None) + next = self.create_action("sleep", "vm1", {'duration': '0'}, next2.id) + first = self.create_action("nop", "vm1", {'message': 'hello'}, next.id) + self.check_action_state(first, objects.action.Status.PENDING) + self.check_action_state(next, objects.action.Status.PENDING) + self.check_action_state(next2, objects.action.Status.PENDING) + + actions.append(first) + actions.append(next) + actions.append(next2) + + result = self.engine.execute(actions) + self.assertEqual(result, True) + self.check_actions_state(actions, objects.action.Status.SUCCEEDED) + + def test_execute_with_exception(self): + actions = [] + next2 = self.create_action("no_exist", + "vm1", {'message': 'next'}, None) + next = self.create_action("sleep", "vm1", + {'duration': '0'}, next2.id) + first = self.create_action("nop", "vm1", + {'message': 'hello'}, next.id) + + self.check_action_state(first, objects.action.Status.PENDING) + self.check_action_state(next, objects.action.Status.PENDING) + self.check_action_state(next2, objects.action.Status.PENDING) + actions.append(first) + actions.append(next) + actions.append(next2) + + result = self.engine.execute(actions) + self.assertEqual(result, False) + self.check_action_state(first, objects.action.Status.SUCCEEDED) + self.check_action_state(next, objects.action.Status.SUCCEEDED) + self.check_action_state(next2, objects.action.Status.FAILED) + + @mock.patch("watcher.common.loader.default.DriverManager") + def test_execute_with_action_exception(self, m_driver): + m_driver.return_value = driver.DriverManager.make_test_instance( + extension=extension.Extension(name=FakeAction.get_name(), + entry_point="%s:%s" % ( + FakeAction.__module__, + FakeAction.__name__), + plugin=FakeAction, + obj=None), + namespace=FakeAction.namespace()) + actions = [self.create_action("dontcare", "vm1", {}, None)] + result = self.engine.execute(actions) + self.assertEqual(result, False) + self.check_action_state(actions[0], objects.action.Status.FAILED) diff --git a/watcher/tests/decision_engine/planner/test_default_planner.py b/watcher/tests/decision_engine/planner/test_default_planner.py index 239285449..e5ab5f2f5 100644 --- a/watcher/tests/decision_engine/planner/test_default_planner.py +++ b/watcher/tests/decision_engine/planner/test_default_planner.py @@ -18,25 +18,26 @@ import mock from watcher.common import utils from watcher.db import api as db_api -from watcher.decision_engine.planner.default import DefaultPlanner -from watcher.decision_engine.solution.default import DefaultSolution -from watcher.decision_engine.strategy.strategies.basic_consolidation import \ - BasicConsolidation +from watcher.decision_engine.planner import default as pbase +from watcher.decision_engine.solution import default as dsol +from watcher.decision_engine.strategy import strategies +from watcher import objects from watcher.tests.db import base from watcher.tests.db import utils as db_utils -from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \ - import FakerModelCollector -from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector \ - import FakerMetricsCollector +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_state +from watcher.tests.decision_engine.strategy.strategies \ + import faker_metrics_collector as fake from watcher.tests.objects import utils as obj_utils class SolutionFaker(object): @staticmethod def build(): - metrics = FakerMetricsCollector() - current_state_cluster = FakerModelCollector() - sercon = BasicConsolidation("basic", "Basic offline consolidation") + metrics = fake.FakerMetricsCollector() + current_state_cluster = faker_cluster_state.FakerModelCollector() + sercon = strategies.BasicConsolidation("basic", + "Basic offline consolidation") sercon.ceilometer = mock.\ MagicMock(get_statistics=metrics.mock_get_statistics) return sercon.execute(current_state_cluster.generate_scenario_1()) @@ -45,9 +46,10 @@ class SolutionFaker(object): class SolutionFakerSingleHyp(object): @staticmethod def build(): - metrics = FakerMetricsCollector() - current_state_cluster = FakerModelCollector() - sercon = BasicConsolidation("basic", "Basic offline consolidation") + metrics = fake.FakerMetricsCollector() + current_state_cluster = faker_cluster_state.FakerModelCollector() + sercon = strategies.BasicConsolidation("basic", + "Basic offline consolidation") sercon.ceilometer = \ mock.MagicMock(get_statistics=metrics.mock_get_statistics) @@ -57,9 +59,9 @@ class SolutionFakerSingleHyp(object): class TestActionScheduling(base.DbTestCase): def test_schedule_actions(self): - default_planner = DefaultPlanner() + default_planner = pbase.DefaultPlanner() audit = db_utils.create_test_audit(uuid=utils.generate_uuid()) - solution = DefaultSolution() + solution = dsol.DefaultSolution() parameters = { "src_uuid_hypervisor": "server1", @@ -70,7 +72,7 @@ class TestActionScheduling(base.DbTestCase): input_parameters=parameters) with mock.patch.object( - DefaultPlanner, "create_action", + pbase.DefaultPlanner, "create_action", wraps=default_planner.create_action) as m_create_action: action_plan = default_planner.schedule( self.context, audit.id, solution @@ -78,12 +80,46 @@ class TestActionScheduling(base.DbTestCase): self.assertIsNotNone(action_plan.uuid) self.assertEqual(m_create_action.call_count, 1) + filters = {'action_plan_id': action_plan.id} + actions = objects.Action.dbapi.get_action_list(self.context, filters) + self.assertEqual(actions[0].action_type, "migrate") + + def test_schedule_two_actions(self): + default_planner = pbase.DefaultPlanner() + audit = db_utils.create_test_audit(uuid=utils.generate_uuid()) + solution = dsol.DefaultSolution() + + parameters = { + "src_uuid_hypervisor": "server1", + "dst_uuid_hypervisor": "server2", + } + solution.add_action(action_type="migrate", + applies_to="b199db0c-1408-4d52-b5a5-5ca14de0ff36", + input_parameters=parameters) + + solution.add_action(action_type="nop", + applies_to="", + input_parameters={}) + + with mock.patch.object( + pbase.DefaultPlanner, "create_action", + wraps=default_planner.create_action) as m_create_action: + action_plan = default_planner.schedule( + self.context, audit.id, solution + ) + self.assertIsNotNone(action_plan.uuid) + self.assertEqual(m_create_action.call_count, 2) + # check order + filters = {'action_plan_id': action_plan.id} + actions = objects.Action.dbapi.get_action_list(self.context, filters) + self.assertEqual(actions[0].action_type, "nop") + self.assertEqual(actions[1].action_type, "migrate") class TestDefaultPlanner(base.DbTestCase): def setUp(self): super(TestDefaultPlanner, self).setUp() - self.default_planner = DefaultPlanner() + self.default_planner = pbase.DefaultPlanner() obj_utils.create_test_audit_template(self.context) p = mock.patch.object(db_api.BaseConnection, 'create_action_plan')