Move the code over for now

This commit is contained in:
Joshua Harlow 2013-05-07 10:49:44 -07:00
parent d6e3d62f7e
commit c5fff21053
27 changed files with 1370 additions and 0 deletions

176
LICENSE Normal file
View File

@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

0
setup.py Normal file
View File

37
taskflow/__init__.py Normal file
View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Useful to know when other tasks are being activated and finishing.
STARTING = 'STARTING'
COMPLETED = 'COMPLETED'
ERRORED = 'ERRORED'
class Failure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure."""
def __init__(self, task, name, workflow, exception):
self.task = task
self.name = name
self.workflow = workflow
self.exception = exception

59
taskflow/job.py Normal file
View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
CLAIMED = 'claimed'
UNCLAIMED = 'unclaimed'
class Job(object):
__metaclass__ = abc.ABCMeta
def __init__(self, name, type, reservation):
self.name = name
# A link back to the reservation which
# can be used to query information about
# this job (and its subsequent
# workflows and tasks).
self.reservation = reservation
# TBD - likely more details about this job
self.details = None
self.state = UNCLAIMED
self.owner = None
@abc.abstractproperty
def type(self):
# Returns which type of job this is.
#
# For example, a 'run_instance' job, or a 'delete_instance' job could
# be possible types.
raise NotImplementedError()
@abc.abstractmethod
def claim(self, owner):
# This can be used to transition this job from unclaimed to claimed.
raise NotImplementedError()
@abc.abstractmethod
def consume(self):
# This can be used to transition this job from active to finished.
#
# During said transition the job and any details of it may be removed
# from some backing storage (if applicable).
raise NotImplementedError()

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The jobboard service for Nova. Different implementations can be plugged
according to the Nova configuration.
"""
from nova.workflow.jobboard import api
API = api.API

95
taskflow/jobboard/api.py Normal file
View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
from oslo.config import cfg
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
jobboard_driver_opt = cfg.StrOpt('job_board_driver',
default='memory',
help='The driver that satisfies the '
'job providing service ( '
'valid options are: db, mq, zk, memory)')
CONF = cfg.CONF
CONF.register_opts([jobboard_driver_opt])
class API(object):
_driver = None
_driver_name_class_mapping = {
# TBD
}
def __new__(cls, *args, **kwargs):
'''Create an instance of the lock provider API.
args and kwargs are passed down to the job board driver when it gets
created.
'''
if not cls._driver:
LOG.debug(_('Job board driver defined as an instance of %s'),
str(CONF.job_board_driver))
driver_name = CONF.job_board_driver
try:
driver_class = cls._driver_name_class_mapping[driver_name]
except KeyError:
raise TypeError(_("Unknown job board driver name: %s")
% driver_name)
cls._driver = importutils.import_object(driver_class,
*args, **kwargs)
utils.check_isinstance(cls._driver, JobBoardDriver)
return super(API, cls).__new__(cls)
class JobBoardDriver(object):
"""Base class for job board drivers."""
__metaclass__ = abc.ABCMeta
def __init__(self):
self._listeners = []
@abc.abstractmethod
def post(self, job):
raise NotImplementedError()
def _notify_posted(self, job):
for i in self._listeners:
i.notify_posted(job)
@abc.abstractmethod
def await(self, blocking=True):
raise NotImplementedError()
def subscribe(self, listener):
self._listeners.append(listener)
def unsubscribe(self, listener):
if listener in self._listeners:
self._listeners.remove(listener)
def close(self):
"""Allows the job board provider to free any resources that it has."""
pass

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo import client as kazoo_client
from nova.workflow.jobboard import api
from oslo.config import cfg
CONF = cfg.CONF
CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper')
CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper')
class JobBoard(api.JobBoardDriver):
def __init__(self):
super(JobBoard, self).__init__()
self._client = kazoo_client.KazooClient(hosts=CONF.address,
timeout=CONF.recv_timeout)
self._client.start()
def post(self, job):

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The lock provider service for Nova. Different implementations can be plugged
according to the Nova configuration.
"""
from nova.workflow.locks import api
API = api.API

128
taskflow/locks/api.py Normal file
View File

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
from oslo.config import cfg
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova import utils
LOG = logging.getLogger(__name__)
lockprovider_driver_opt = cfg.StrOpt('lock_provider_driver',
default='memory',
help='The driver that satisfies the '
'lock providing service ('
'valid options are: db, file, '
'memory, zk)')
CONF = cfg.CONF
CONF.register_opts([lockprovider_driver_opt])
class API(object):
_driver = None
_driver_name_class_mapping = {
# This can attempt to provide a distributed lock but extreme! care
# has to be taken to know when to expire a database lock, as well as
# extreme! care has to be taken when acquiring said lock. It is likely
# not possible to guarantee locking consistently and correctness when
# using a database to do locking.
'db': 'nova.locks.drivers.db.LockProvider',
# This can provide per system level locks but can not provide across
# system level locks.
'file': 'nova.locks.drivers.file.LockProvider',
# Note this driver can be used for distributed locking of resources.
'zk': 'nova.locks.drivers.zk.LockProvider',
# This driver is pretty much only useful for testing since it can
# only provide per-process locking using greenlet/thread level locking.
'memory': 'nova.locks.drivers.memory.LockProvider',
}
def __new__(cls, *args, **kwargs):
'''Create an instance of the lock provider API.
args and kwargs are passed down to the lock provider driver when it
gets created (if applicable).
'''
if not cls._driver:
LOG.debug(_('Lock provider driver defined as an instance of %s'),
str(CONF.lock_provider_driver))
driver_name = CONF.lock_provider_driver
try:
driver_class = cls._driver_name_class_mapping[driver_name]
except KeyError:
raise TypeError(_("Unknown lock provider driver name: %s")
% driver_name)
cls._driver = importutils.import_object(driver_class,
*args, **kwargs)
utils.check_isinstance(cls._driver, LockProvider)
return super(API, cls).__new__(cls)
class Lock(object):
"""Base class for what a lock (distributed or local or in-between) should
provide"""
__metaclass__ = abc.ABCMeta
def __init__(self, resource_uri, blocking=True):
self.uri = resource_uri
self.blocking = blocking
@abc.abstractmethod
def acquire(self):
raise NotImplementedError()
@abc.abstractmethod
def release(self):
raise NotImplementedError()
@abc.abstractmethod
def is_locked(self):
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
raise NotImplementedError()
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
class LockProvider(object):
"""Base class for lock provider drivers."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def provide(self, resource_uri, blocking=True):
"""Returns a single lock object which can be used to acquire the lock
on the given resource uri"""
raise NotImplementedError()
def close(self):
"""Allows the lock provider to free any resources that it has."""
pass

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.workflow.locks import api
class Lock(api.Lock):
pass
class LockProvider(api.LockProvider):
def provide(self, resource_uri, blocking=True):
return Lock(resource_uri, blocking)

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.workflow.locks import api
class Lock(api.Lock):
pass
class LockProvider(api.LockProvider):
def provide(self, resource_uri, blocking=True):
return Lock(resource_uri, blocking)

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.workflow.locks import api
class Lock(api.Lock):
pass
class LockProvider(api.LockProvider):
def provide(self, resource_uri, blocking=True):
return Lock(resource_uri, blocking)

View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo import client as kazoo_client
from kazoo.recipe import lock as kazoo_lock
from nova.workflow.locks import api
from oslo.config import cfg
CONF = cfg.CONF
CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper')
CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper')
class Lock(api.Lock):
def __init__(self, resource_uri, blocking, client):
super(Lock, self).__init__(resource_uri, blocking)
self._client = client
self._lock = kazoo_lock.Lock(client, resource_uri)
def acquire(self):
return self._lock.acquire(self._blocking)
def is_locked(self):
return self._lock.is_acquired
def release(self):
return self._lock.release()
def cancel(self):
return self._lock.cancel()
class LockProvider(api.LockProvider):
def __init__(self, *args, **kwargs):
self._client = kazoo_client.KazooClient(hosts=CONF.address,
timeout=CONF.recv_timeout)
self._client.start()
def provide(self, resource_uri, blocking=True):
return Lock(self._client, resource_uri, blocking)
def close(self):
if self._client:
self._client.stop()

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The logbook provider service for Nova. Different implementations can be plugged
according to the Nova configuration.
"""
from nova.workflow.logbook import api
API = api.API

151
taskflow/logbook/api.py Normal file
View File

@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo.config import cfg
"""Define APIs for the logbook providers."""
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
logprovider_driver_opt = cfg.StrOpt('logbook_provider_driver',
default='memory',
help='The driver for that satisfies the '
'remote lock providing service ( '
'valid options are: db, memory, zk)')
CONF = cfg.CONF
CONF.register_opts([logprovider_driver_opt])
class API(object):
_driver = None
_driver_name_class_mapping = {
# Note these drivers can be used for persistant logging.
'zk': 'nova.logbook.drivers.zk.ZooKeeperProviderDriver',
'db': 'nova.logbook.drivers.db.DBProviderDriver',
# This driver is pretty much only useful for testing.
'memory': 'nova.logbook.drivers.memory.MemoryProviderDriver',
}
def __new__(cls, *args, **kwargs):
'''Create an instance of the logbook provider API.
args and kwargs are passed down to the logbook provider driver when it
gets created.
'''
if not cls._driver:
LOG.debug(_('Logbook provider driver defined as an instance of %s'),
str(CONF.logbook_provider_driver))
driver_name = CONF.logbook_provider_driver
try:
driver_class = cls._driver_name_class_mapping[driver_name]
except KeyError:
raise TypeError(_("Unknown logbook provider driver name: %s")
% driver_name)
cls._driver = importutils.import_object(driver_class,
*args, **kwargs)
utils.check_isinstance(cls._driver, LogBookProviderDriver)
return super(API, cls).__new__(cls)
class RecordNotFound(Exception):
pass
class LogBook(object):
"""Base class for what a logbook (distributed or local or in-between)
should provide"""
__metaclass__ = abc.ABCMeta
def __init__(self, resource_uri):
self.uri = resource_uri
@abc.abstractmethod
def add_record(self, name, metadata=None):
"""Atomically adds a new entry to the given logbook with the supplied
metadata (if any)."""
raise NotImplementedError()
@abc.abstractmethod
def fetch_record(self, name):
"""Fetchs a record with the given name and returns any metadata about
said record."""
raise NotImplementedError()
@abc.abstractmethod
def __contains__(self, name):
"""Determines if any entry with the given name exists in this
logbook."""
raise NotImplementedError()
@abc.abstractmethod
def mark(self, name, metadata, merge_functor=None):
"""Marks the given logbook entry (which must exist) with the given
metadata, if said entry already exists then the provided merge functor
or a default function, will be activated to merge the existing metadata
with the supplied metadata."""
raise NotImplementedError()
@abc.abstractmethod
def __iter__(self):
"""Iterates over all names and metadata and provides back both of these
via a (name, metadata) tuple. The order will be in the same order that
they were added."""
raise NotImplementedError()
class LogBookProvider(object):
"""Base class for logbook provider drivers."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def provide(self, resource_uri):
"""Returns a new (if not existent) logbook object or used logbook
object (if already existent) which can be used to determine the actions
performed on a given resource uri"""
raise NotImplementedError()
def close(self):
"""Allows the log provider to free any resources that it has."""
pass

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo import client as kazoo_client
from nova.openstack.common import jsonutils
from nova.workflow.logbook import api
from oslo.config import cfg
CONF = cfg.CONF
CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper')
CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper')
class LogBook(api.LogBook):
prefix = "entry-"
def __init__(self, client, resource_uri):
super(LogBook, self).__init__(self, "%s/logbook" % (resource_uri))
self._client = client
self._paths_made = False
self._paths = [self.uri]
def _ensure_paths(self):
if self._paths_made:
return
for p in self._paths:
self._client.ensure_path(p)
self._paths_made = True
def add_record(self, name, metadata=None):
self._ensure_paths()
(path, value) = self._make_storage(name, metadata)
self._client.create(path, jsonutils.dumps(value), sequence=True)
def _make_storage(self, name, metadata):
path = "{root}/{prefix}".format(root=self.uri, prefix=self.prefix)
value = {
'name': name,
'metadata': metadata,
}
return (path, value)
def __iter__(self):
for c in self._client.get_children(self.uri + "/"):
if not c.startswith(self.prefix):
continue
(value, _zk) = self._client.get("%s/%s" % (self.uri, c))
value = jsonutils.loads(value)
yield (value['name'], value['metadata'])
def __contains__(self, name):
for (n, metadata) in self:
if name == n:
return True
return False
def mark(self, name, metadata, merge_func=None):
if merge_func is None:
merge_func = lambda old,new : new
for c in self._client.get_children(self.uri + "/"):
if not c.startswith(self.prefix):
continue
(value, _zk) = self._client.get("%s/%s" % (self.uri, c))
value = jsonutils.loads(value)
if value['name'] == name:
value['metadata'] = merge_func(value['metadata'], metadata)
self._client.set("%s/%s" % (self.uri, c),
jsonutils.dumps(value))
return
raise api.RecordNotFound()
def fetch_record(self, name):
for n, metadata in self:
if name == n:
return metadata
raise api.RecordNotFound()
class LogBookProvider(api.LogBookProvider):
def __init__(self, *args, **kwargs):
self._client = kazoo_client.KazooClient(hosts=CONF.address,
timeout=CONF.recv_timeout)
self._client.start()
def close(self):
if self._client:
self._client.stop()
def provide(self, resource_uri):
return LogBook(self._client, resource_uri)

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections as dict_provider
import copy
# OrderedDict is only in 2.7 or greater :-(
if not hasattr(dict_provider, 'OrderedDict'):
import ordereddict as dict_provider
from nova import workflow
from nova.openstack.common import excutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class Workflow(object):
"""A linear chain of independent tasks that can be applied as one unit or
rolled back as one unit."""
def __init__(self, name, tolerant=False, parents=None):
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self.reversions = []
self.name = name
# If this chain can ignore individual task reversion failure then this
# should be set to true, instead of the default value of false.
self.tolerant = tolerant
# Ordered dicts are used so that we can nicely refer to the tasks by
# name and easily fetch there results but also allow for the running
# of said tasks to happen in a linear order.
self.tasks = dict_provider.OrderedDict()
self.results = dict_provider.OrderedDict()
# If this workflow has a parent workflow/s which need to be reverted if
# this workflow fails then please include them here to allow this child
# to call the parents...
self.parents = parents
# This should be a functor that returns whether a given task has
# already ran by returning the return value of the task or returning
# 'None' if the task has not ran.
#
# NOTE(harlowja): This allows for resumption by skipping tasks which
# have already occurred. The previous return value is needed due to
# the contract we have with tasks that they will be given the value
# they returned if reversion is triggered.
self.result_fetcher = None
# Any objects that want to listen when a task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.listeners = []
def __setitem__(self, name, task):
self.tasks[name] = task
def __getitem__(self, name):
return self.results[name]
def run(self, context, *args, **kwargs):
for (name, task) in self.tasks.iteritems():
try:
self._on_task_start(context, task, name)
# See if we have already ran this...
result = None
if self.result_fetcher:
result = self.result_fetcher(context, name, self)
if result is None:
result = task.apply(context, *args, **kwargs)
# Keep a pristine copy of the result in the results table
# so that if said result is altered by other further states
# the one here will not be.
self.results[name] = copy.deepcopy(result)
self._on_task_finish(context, task, name, result)
except Exception as ex:
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task, name)
except Exception:
LOG.exception(_("Dropping exception catched when"
" notifying about existing task"
" exception."))
self.rollback(context,
workflow.Failure(task, name, self, ex))
def _on_task_error(self, context, task, name):
# Notify any listeners that the task has errored.
for i in self.listeners:
i.notify(context, workflow.ERRORED, self, task, name)
def _on_task_start(self, context, task, name):
# Notify any listeners that we are about to start the given task.
for i in self.listeners:
i.notify(context, workflow.STARTING, self, task, name)
def _on_task_finish(self, context, task, name, result):
# Notify any listeners that we are finishing the given task.
self.reversions.append((name, task))
for i in self.listeners:
i.notify(context, workflow.COMPLETED, self, task,
name, result=result)
def rollback(self, context, cause):
for (i, (name, task)) in enumerate(reversed(self.reversions)):
try:
task.revert(context, self.results[name], cause)
except Exception:
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
# chain validation due to Y exception.
msg = _("Failed rolling back stage %(index)s (%(name)s)"
" of workflow %(workflow)s, due to inner exception.")
LOG.warn(msg % {'index': (i + 1), 'stage': name,
'workflow': self.name})
if not self.tolerant:
# NOTE(harlowja): LOG a msg AND re-raise the exception if
# the chain does not tolerate exceptions happening in the
# rollback method.
raise
if self.parents:
# Rollback any parents workflows if they exist...
for p in self.parents:
p.rollback(context, cause)

30
taskflow/reservation.py Normal file
View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Reservation(object):
"""This is an abstraction of a promise to complete some type of job which
can be returned to a user for later lookup on the progress of said
promise"""
def __init__(self, for_who, id):
self.for_who = for_who
self.id = id
def __str__(self):
return "Reservation: '%s' for '%s'" % (self.id, self.for_who)

45
taskflow/task.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Task(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
__metaclass__ = abc.ABCMeta
def __str__(self):
return "Task: %s" % (self.__class__.__name__)
@abc.abstractmethod
def apply(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.
This method can be used to apply some given context and given set
of args and kwargs to accomplish some goal. Note that the result
that is returned needs to be serializable so that it can be passed
back into this task if reverting is triggered."""
raise NotImplementedError()
def revert(self, context, result, cause):
"""Revert this task using the given context, result that the apply
provided as well as any information which may have caused
said reversion."""
pass