""" Copyright 2013 Rackspace, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import abc import collections import threading import uuid import structlog from teeth_rest import encoding from teeth_rest import errors as rest_errors from teeth_agent import errors class AgentCommandStatus(object): RUNNING = 'RUNNING' SUCCEEDED = 'SUCCEEDED' FAILED = 'FAILED' class BaseCommandResult(encoding.Serializable): def __init__(self, command_name, command_params): self.id = str(uuid.uuid4()) self.command_name = command_name self.command_params = command_params self.command_status = AgentCommandStatus.RUNNING self.command_error = None self.command_result = None def serialize(self, view): return collections.OrderedDict([ ('id', self.id), ('command_name', self.command_name), ('command_params', self.command_params), ('command_status', self.command_status), ('command_error', self.command_error), ('command_result', self.command_result), ]) def is_done(self): return self.command_status != AgentCommandStatus.RUNNING def join(self): return self class SyncCommandResult(BaseCommandResult): def __init__(self, command_name, command_params, success, result_or_error): super(SyncCommandResult, self).__init__(command_name, command_params) if success: self.command_status = AgentCommandStatus.SUCCEEDED self.command_result = result_or_error else: self.command_status = AgentCommandStatus.FAILED self.command_error = result_or_error class AsyncCommandResult(BaseCommandResult): """A command that executes asynchronously in the background. Subclasses should override `execute` to implement actual command execution. """ def __init__(self, command_name, command_params): super(AsyncCommandResult, self).__init__(command_name, command_params) self.command_state_lock = threading.Lock() thread_name = 'agent-command-{}'.format(self.id) self.execution_thread = threading.Thread(target=self.run, name=thread_name) def serialize(self, view): with self.command_state_lock: return super(AsyncCommandResult, self).serialize(view) def start(self): self.execution_thread.start() return self def join(self): self.execution_thread.join() return self def is_done(self): with self.command_state_lock: return super(AsyncCommandResult, self).is_done() def run(self): try: result = self.execute() with self.command_state_lock: self.command_result = result self.command_status = AgentCommandStatus.SUCCEEDED except Exception as e: if not isinstance(e, rest_errors.RESTError): e = errors.CommandExecutionError(str(e)) with self.command_state_lock: self.command_error = e self.command_status = AgentCommandStatus.FAILED @abc.abstractmethod def execute(self): pass class BaseAgentMode(object): def __init__(self, name): super(BaseAgentMode, self).__init__() self.log = structlog.get_logger(agent_mode=name) self.name = name self.command_map = {} def execute(self, command_name, **kwargs): if command_name not in self.command_map: raise errors.InvalidCommandError(command_name) result = self.command_map[command_name](command_name, **kwargs) # In order to enable extremely succinct synchronous commands, we allow # them to return a value directly, and we'll handle wrapping it up in a # SyncCommandResult if not isinstance(result, BaseCommandResult): result = SyncCommandResult(command_name, kwargs, True, result) return result