diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst index 31a1f0183..db4b76d30 100644 --- a/doc/source/user/jobs.rst +++ b/doc/source/user/jobs.rst @@ -288,6 +288,40 @@ optionally expire after a given amount of time). See :py:class:`~taskflow.jobs.backends.impl_redis.RedisJobBoard` for implementation details. +Etcd +---- + +**Board type**: ``'etcd'`` + +Uses `etcd`_ to provide the jobboard capabilities by using Etcd key values data +structures and individual job ownership key (that can optionally expire after a +given amount of time). + +Additional *kwarg* parameters: + +* ``persistence``: a class that provides a :doc:`persistence ` + backend interface; it will be used for loading jobs logbooks for usage at + runtime or for usage before a job is claimed for introspection. + +Additional *configuration* parameters: + +* ``path``: the Etcd path to store job information (*defaults* to + ``jobboard``) +* ``host``: the Etcd host to connect to (*defaults* to ``localhost``) +* ``port``: the port of the Etcd server (*defaults* to ``2379``) +* ``api_path``: the path of the Etcd API endpoint +* ``protocol``: the protocol used to communicate with the server (*defaults* to + ``http``, choices are ``http`` or ``https``) +* ``ca_cert``, ``cert_key`` and ``cert_cert``: the certificate information + passed to Etcd3gw for ``https`` communications +* ``timeout``: the timeout used when performing operations with Etcd +* ``ttl``: the default time-to-live when claiming a job (*defaults* to + ``None``) + +.. note:: + See :py:class:`~taskflow.jobs.backends.impl_etcd.EtcdJobBoard` + for implementation details. + Considerations ============== @@ -356,6 +390,11 @@ Redis .. automodule:: taskflow.jobs.backends.impl_redis +Etcd +---- + +.. automodule:: taskflow.jobs.backends.impl_etcd + Hierarchy ========= @@ -363,6 +402,7 @@ Hierarchy taskflow.jobs.base taskflow.jobs.backends.impl_redis taskflow.jobs.backends.impl_zookeeper + taskflow.jobs.backends.impl_etcd :parts: 1 .. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer @@ -370,3 +410,4 @@ Hierarchy .. _kazoo: https://kazoo.readthedocs.io/en/latest/ .. _stevedore: https://docs.openstack.org/stevedore/latest .. _redis: https://redis.io/ +.. _etcd: https://etcd.io/ diff --git a/releasenotes/notes/etcd-jobboard-backend-8a9fea2238fb0f12.yaml b/releasenotes/notes/etcd-jobboard-backend-8a9fea2238fb0f12.yaml new file mode 100644 index 000000000..19775236b --- /dev/null +++ b/releasenotes/notes/etcd-jobboard-backend-8a9fea2238fb0f12.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Added an Etcd-based backend for jobboard. This backend is similar to the + Redis backend, it requires that the consumer extends the expiry of the job + that is being running. diff --git a/setup.cfg b/setup.cfg index 41aef5656..54e6d2249 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,6 +34,7 @@ packages = taskflow.jobboards = zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard redis = taskflow.jobs.backends.impl_redis:RedisJobBoard + etcd = taskflow.jobs.backends.impl_etcd:EtcdJobBoard taskflow.conductors = blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor @@ -63,6 +64,8 @@ zookeeper = zake>=0.1.6 # Apache-2.0 redis = redis>=4.0.0 # MIT +etcd = + etcd3gw>=2.0.0 # Apache-2.0 workers = kombu>=4.3.0 # BSD eventlet = diff --git a/taskflow/jobs/backends/impl_etcd.py b/taskflow/jobs/backends/impl_etcd.py new file mode 100644 index 000000000..36b47330c --- /dev/null +++ b/taskflow/jobs/backends/impl_etcd.py @@ -0,0 +1,610 @@ +# Copyright (C) Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import typing + +import etcd3gw +import fasteners +from oslo_serialization import jsonutils +from oslo_utils import timeutils +from oslo_utils import uuidutils + +from taskflow import exceptions as exc +from taskflow.jobs import base +from taskflow import logging +from taskflow import states +from taskflow.utils import misc +if typing.TYPE_CHECKING: + from taskflow.types import entity + +LOG = logging.getLogger(__name__) + + +class EtcdJob(base.Job): + """An Etcd job.""" + + board: 'EtcdJobBoard' + + def __init__(self, board: 'EtcdJobBoard', name, client, key, + uuid=None, details=None, backend=None, + book=None, book_data=None, + priority=base.JobPriority.NORMAL, + sequence=None, created_on=None): + super().__init__(board, name, uuid=uuid, details=details, + backend=backend, book=book, book_data=book_data) + + self._client = client + self._key = key + self._priority = priority + self._sequence = sequence + self._created_on = created_on + self._root = board._root_path + + self._lease = None + + @property + def key(self): + return self._key + + @property + def last_modified(self): + try: + raw_data = self.board.get_last_modified(self) + data = jsonutils.loads(raw_data) + ret = timeutils.parse_strtime(data["last_modified"]) + return ret + except Exception: + LOG.exception("Cannot read load_modified key.") + return 0 + + @property + def created_on(self): + return self._created_on + + @property + def state(self): + """Access the current state of this job.""" + owner, data = self.board.get_owner_and_data(self) + if not data: + if owner is not None: + LOG.info(f"Owner key was found for job {self.uuid}, " + f"but the key {self.key} is missing") + return states.COMPLETE + if not owner: + return states.UNCLAIMED + return states.CLAIMED + + @property + def sequence(self): + return self._sequence + + @property + def priority(self): + return self._priority + + @property + def lease(self): + if not self._lease: + owner_data = self.board.get_owner_data(self) + if 'lease_id' not in owner_data: + return None + lease_id = owner_data['lease_id'] + self._lease = etcd3gw.Lease(id=lease_id, + client=self._client) + return self._lease + + def expires_in(self): + """How many seconds until the claim expires.""" + if self.lease is None: + return -1 + return self.lease.ttl() + + def extend_expiry(self, expiry): + """Extends the owner key (aka the claim) expiry for this job. + + Returns ``True`` if the expiry request was performed + otherwise ``False``. + """ + if self.lease is None: + return False + ret = self.lease.refresh() + return (ret > 0) + + @property + def root(self): + return self._root + + def __lt__(self, other): + if not isinstance(other, EtcdJob): + return NotImplemented + if self.root == other.root: + if self.priority == other.priority: + return self.sequence < other.sequence + else: + ordered = base.JobPriority.reorder( + (self.priority, self), (other.priority, other)) + if ordered[0] is self: + return False + return True + else: + # Different jobboards with different roots... + return self.root < other.root + + def __eq__(self, other): + if not isinstance(other, EtcdJob): + return NotImplemented + return ((self.root, self.sequence, self.priority) == + (other.root, other.sequence, other.priority)) + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.key) + + +class EtcdJobBoard(base.JobBoard): + """A jobboard backed by `etcd`_. + + This jobboard creates sequenced key/value pairs in etcd. Each key + represents a job and its associated value contains the parameter of the + job encoded in + json. + The users of the jobboard can iterate over the available job and decide if + they want to attempt to claim one job by calling the :meth:`.claim` method. + Claiming a job consists in atomically create a key based on the key of job + and the ".lock" postfix. If the atomic creation of the key is successful + the job belongs to the user. Any attempt to lock an already locked job + will fail. + When a job is complete, the user consumes the job by calling the + :meth:`.consume` method, it deletes the job and the lock from etcd. + Alternatively, a user can trash (:meth:`.trash`) or abandon + (:meth:`.abandon`) if they want to delete the job or leave it for another + user. + Etcd doesn't provide a method for unlocking the jobs when a consumer dies. + The Etcd jobboard provides timed expirations, based on a global ``ttl`` + configuration setting or the ``expiry`` parameter of the :meth:`.claim` + method. When this time-to-live/expiry is reached, the job is automatically + unlocked and another consumer can claim it. If it is expected that a task + of a job takes more time than the defined time-to-live, the + consumer can refresh the timer by calling the :meth:`EtcdJob.extend_expiry` + function. + + .. _etcd: https://etcd.io/ + """ + ROOT_PATH = "/taskflow/jobs" + + TRASH_PATH = "/taskflow/.trash" + + DEFAULT_PATH = "jobboard" + + JOB_PREFIX = "job" + + SEQUENCE_KEY = "sequence" + + DATA_POSTFIX = ".data" + + LOCK_POSTFIX = ".lock" + + LAST_MODIFIED_POSTFIX = ".last_modified" + + ETCD_CONFIG_OPTIONS = ( + ("host", str), + ("port", int), + ("protocol", str), + ("ca_cert", str), + ("cert_key", str), + ("cert_cert", str), + ("timeout", int), + ("api_path", str), + ) + + INIT_STATE = 'init' + CONNECTED_STATE = 'connected' + FETCH_STATE = 'fetched' + + _client: etcd3gw.Etcd3Client + + def __init__(self, name, conf, client=None, persistence=None): + super().__init__(name, conf) + + self._client = client + self._persistence = persistence + self._state = self.INIT_STATE + + path_elems = [self.ROOT_PATH, + self._conf.get("path", self.DEFAULT_PATH)] + self._root_path = self.join(*path_elems) + + self._job_cache = {} + self._job_cond = threading.Condition() + + self._open_close_lock = threading.RLock() + + self._watcher_thd = None + self._thread_cancel = None + self._watcher = None + self._watcher_cancel = None + + def join(self, root, *args): + return "/".join([root] + [a.strip("/") for a in args]) + + def incr(self, key): + """Atomically increment an integer, create it if it doesn't exist""" + while True: + value = self._client.get(key) + if not value: + res = self._client.create(key, 1) + if res: + return 1 + # Another thread has just created the key after we failed to + # read it, retry to get the new current value + continue + + value = int(value[0]) + next_value = value + 1 + + res = self._client.replace(key, value, next_value) + if res: + return next_value + + def get_one(self, key): + if self._client is None: + raise exc.JobFailure(f"Cannot read key {key}, client is closed") + value = self._client.get(key) + if not value: + return None + return value[0] + + def _fetch_jobs(self, only_unclaimed=False, ensure_fresh=False): + # TODO(gthiemonge) only_unclaimed is ignored + if ensure_fresh or self._state != self.FETCH_STATE: + self._ensure_fresh() + return sorted(self._job_cache.values()) + + def _ensure_fresh(self): + prefix = self.join(self._root_path, self.JOB_PREFIX) + jobs = self._client.get_prefix(prefix) + listed_jobs = {} + for job in jobs: + data, metadata = job + key = misc.binary_decode(metadata['key']) + if key.endswith(self.DATA_POSTFIX): + key = key.rstrip(self.DATA_POSTFIX) + listed_jobs[key] = data + + removed_jobs = [] + with self._job_cond: + for key in self._job_cache.keys(): + if key not in listed_jobs: + removed_jobs.append(key) + for key in removed_jobs: + self._remove_job_from_cache(key) + + for key, data in listed_jobs.items(): + self._process_incoming_job(key, data) + self._state = self.FETCH_STATE + + def _process_incoming_job(self, key, data): + try: + job_data = jsonutils.loads(data) + except jsonutils.json.JSONDecodeError: + msg = ("Incorrectly formatted job data found at " + f"key: {key}") + LOG.warning(msg, exc_info=True) + LOG.info("Deleting invalid job data at key: %s", key) + self._client.delete(key) + raise exc.JobFailure(msg) + + with self._job_cond: + if key not in self._job_cache: + job_priority = base.JobPriority.convert(job_data["priority"]) + new_job = EtcdJob(self, + job_data["name"], + self._client, + key, + uuid=job_data["uuid"], + details=job_data.get("details", {}), + backend=self._persistence, + book_data=job_data.get("book"), + priority=job_priority, + sequence=job_data["sequence"]) + self._job_cache[key] = new_job + self._job_cond.notify_all() + + def _remove_job_from_cache(self, key): + """Remove job from cache.""" + with self._job_cond: + if key in self._job_cache: + self._job_cache.pop(key, None) + + def _board_removal_func(self, job): + try: + self._remove_job_from_cache(job.key) + self._client.delete_prefix(job.key) + except Exception: + LOG.exception(f"Failed to delete prefix {job.key}") + + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + """Returns an iterator of jobs that are currently on this board.""" + return base.JobBoardIterator( + self, LOG, only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh, + board_fetch_func=self._fetch_jobs, + board_removal_func=self._board_removal_func) + + def wait(self, timeout=None): + """Waits a given amount of time for **any** jobs to be posted.""" + # Wait until timeout expires (or forever) for jobs to appear. + watch = timeutils.StopWatch(duration=timeout) + watch.start() + with self._job_cond: + while True: + if not self._job_cache: + if watch.expired(): + raise exc.NotFound("Expired waiting for jobs to" + " arrive; waited %s seconds" + % watch.elapsed()) + # This is done since the given timeout can not be provided + # to the condition variable, since we can not ensure that + # when we acquire the condition that there will actually + # be jobs (especially if we are spuriously awaken), so we + # must recalculate the amount of time we really have left. + self._job_cond.wait(watch.leftover(return_none=True)) + else: + curr_jobs = self._fetch_jobs() + fetch_func = lambda ensure_fresh: curr_jobs + removal_func = lambda a_job: self._remove_job_from_cache( + a_job.key) + return base.JobBoardIterator( + self, LOG, board_fetch_func=fetch_func, + board_removal_func=removal_func) + + @property + def job_count(self): + """Returns how many jobs are on this jobboard.""" + return len(self._job_cache) + + def get_owner_data(self, job: EtcdJob) -> typing.Optional[dict]: + owner_key = job.key + self.LOCK_POSTFIX + owner_data = self.get_one(owner_key) + if not owner_data: + return None + return jsonutils.loads(owner_data) + + def find_owner(self, job: EtcdJob) -> typing.Optional[dict]: + """Gets the owner of the job if one exists.""" + data = self.get_owner_data(job) + if data: + return data['owner'] + return None + + def get_data(self, job: EtcdJob) -> bytes: + key = job.key + self.DATA_POSTFIX + return self.get_one(key) + + def get_owner_and_data(self, job: EtcdJob) -> tuple[ + typing.Optional[str], typing.Optional[bytes]]: + if self._client is None: + raise exc.JobFailure("Cannot retrieve information, " + "not connected") + + job_data = None + job_owner = None + + for data, metadata in self._client.get_prefix(job.key + "."): + key = misc.binary_decode(metadata["key"]) + if key.endswith(self.DATA_POSTFIX): + # bytes? + job_data = data + elif key.endswith(self.LOCK_POSTFIX): + data = jsonutils.loads(data) + job_owner = data["owner"] + + return job_owner, job_data + + def set_last_modified(self, job: EtcdJob): + key = job.key + self.LAST_MODIFIED_POSTFIX + + now = timeutils.utcnow() + self._client.put(key, jsonutils.dumps({"last_modified": now})) + + def get_last_modified(self, job: EtcdJob): + key = job.key + self.LAST_MODIFIED_POSTFIX + + return self.get_one(key) + + def post(self, name, book=None, details=None, + priority=base.JobPriority.NORMAL) -> EtcdJob: + """Atomically creates and posts a job to the jobboard.""" + job_priority = base.JobPriority.convert(priority) + job_uuid = uuidutils.generate_uuid() + job_posting = base.format_posting(job_uuid, name, + created_on=timeutils.utcnow(), + book=book, details=details, + priority=job_priority) + seq = self.incr(self.join(self._root_path, self.SEQUENCE_KEY)) + key = self.join(self._root_path, f"{self.JOB_PREFIX}{seq}") + + job_posting["sequence"] = seq + raw_job_posting = jsonutils.dumps(job_posting) + + data_key = key + self.DATA_POSTFIX + + self._client.create(data_key, raw_job_posting) + job = EtcdJob(self, name, self._client, key, + uuid=job_uuid, + details=details, + backend=self._persistence, + book=book, + book_data=job_posting.get('book'), + priority=job_priority, + sequence=seq) + with self._job_cond: + self._job_cache[key] = job + self._job_cond.notify_all() + return job + + @base.check_who + def claim(self, job, who, expiry=None): + """Atomically attempts to claim the provided job.""" + owner_key = job.key + self.LOCK_POSTFIX + + ttl = expiry or self._conf.get('ttl', None) + + if ttl: + lease = self._client.lease(ttl=ttl) + else: + lease = None + + owner_dict = { + "owner": who, + } + if lease: + owner_dict["lease_id"] = lease.id + + owner_value = jsonutils.dumps(owner_dict) + + # Create a lock for the job, if the lock already exists, the job + # is owned by another worker + created = self._client.create(owner_key, owner_value, lease=lease) + if not created: + # Creation is denied, revoke the lease, we cannot claim the job. + if lease: + lease.revoke() + + owner = self.find_owner(job) + if owner: + message = f"Job {job.uuid} already claimed by '{owner}'" + else: + message = f"Job {job.uuid} already claimed" + raise exc.UnclaimableJob(message) + + # Ensure that the job still exists, it may have been claimed and + # consumed by another thread before we enter this function + if not self.get_data(job): + # Revoke the lease + if lease: + lease.revoke() + else: + self._client.delete(owner_key) + raise exc.UnclaimableJob(f"Job {job.uuid} already deleted.") + + self.set_last_modified(job) + + @base.check_who + def consume(self, job, who): + """Permanently (and atomically) removes a job from the jobboard.""" + owner, data = self.get_owner_and_data(job) + if data is None or owner is None: + raise exc.NotFound(f"Cannot find job {job.uuid}") + if owner != who: + raise exc.JobFailure(f"Cannot consume a job {job.uuid}" + f" which is not owned by {who}") + + self._client.delete_prefix(job.key + ".") + self._remove_job_from_cache(job.key) + + @base.check_who + def abandon(self, job, who): + """Atomically attempts to abandon the provided job.""" + owner, data = self.get_owner_and_data(job) + if data is None or owner is None: + raise exc.NotFound(f"Cannot find job {job.uuid}") + if owner != who: + raise exc.JobFailure(f"Cannot abandon a job {job.uuid}" + f" which is not owned by {who}") + + owner_key = job.key + self.LOCK_POSTFIX + self._client.delete(owner_key) + + @base.check_who + def trash(self, job, who): + """Trash the provided job.""" + owner, data = self.get_owner_and_data(job) + if data is None or owner is None: + raise exc.NotFound(f"Cannot find job {job.uuid}") + if owner != who: + raise exc.JobFailure(f"Cannot trash a job {job.uuid} " + f"which is not owned by {who}") + + trash_key = job.key.replace(self.ROOT_PATH, self.TRASH_PATH) + self._client.create(trash_key, data) + self._client.delete_prefix(job.key + ".") + self._remove_job_from_cache(job.key) + + def register_entity(self, entity: 'entity.Entity'): + """Register an entity to the jobboard('s backend), e.g: a conductor""" + # TODO(gthiemonge) Doesn't seem to be useful with Etcd + + @property + def connected(self): + """Returns if this jobboard is connected.""" + return self._client is not None + + @fasteners.locked(lock='_open_close_lock') + def connect(self): + """Opens the connection to any backend system.""" + if self._client is None: + etcd_conf = {} + for config_opts in self.ETCD_CONFIG_OPTIONS: + key, value_type = config_opts + if key in self._conf: + etcd_conf[key] = value_type(self._conf[key]) + + self._client = etcd3gw.Etcd3Client(**etcd_conf) + self._state = self.CONNECTED_STATE + + watch_url = self.join(self._root_path, self.JOB_PREFIX) + self._thread_cancel = threading.Event() + try: + (self._watcher, + self._watcher_cancel) = self._client.watch_prefix(watch_url) + except etcd3gw.exceptions.ConnectionFailedError: + exc.raise_with_cause(exc.JobFailure, + "Failed to connect to Etcd") + self._watcher_thd = threading.Thread(target=self._watcher_thread) + self._watcher_thd.start() + + def _watcher_thread(self): + while not self._thread_cancel.is_set(): + for event in self._watcher: + if "kv" not in event: + continue + + key_value = event["kv"] + key = misc.binary_decode(key_value["key"]) + + if key.endswith(self.DATA_POSTFIX): + key = key.rstrip(self.DATA_POSTFIX) + if event.get("type") == "DELETE": + self._remove_job_from_cache(key) + else: + data = key_value["value"] + self._process_incoming_job(key, data) + + @fasteners.locked(lock='_open_close_lock') + def close(self): + """Close the connection to any backend system.""" + if self._client is not None: + if self._watcher_cancel is not None: + self._watcher_cancel() + if self._thread_cancel is not None: + self._thread_cancel.set() + if self._watcher_thd is not None: + self._watcher_thd.join() + del self._client + self._client = None + self._state = self.INIT_STATE diff --git a/taskflow/tests/unit/jobs/test_etcd_job.py b/taskflow/tests/unit/jobs/test_etcd_job.py new file mode 100644 index 000000000..0c46a8c13 --- /dev/null +++ b/taskflow/tests/unit/jobs/test_etcd_job.py @@ -0,0 +1,421 @@ +# Copyright (C) Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from oslo_serialization import jsonutils +from oslo_utils import uuidutils +import testtools + +from taskflow import exceptions as exc +from taskflow.jobs.backends import impl_etcd +from taskflow.jobs import base as jobs_base +from taskflow import test +from taskflow.tests.unit.jobs import base +from taskflow.tests import utils as test_utils + +ETCD_AVAILABLE = test_utils.etcd_available() + + +class EtcdJobBoardMixin: + def create_board(self, conf=None, persistence=None): + self.path = f"test-{uuidutils.generate_uuid()}" + board_conf = { + "path": self.path, + } + if conf: + board_conf.update(conf) + board = impl_etcd.EtcdJobBoard("etcd", board_conf, + persistence=persistence) + return board._client, board + + +class MockedEtcdJobBoard(test.TestCase, EtcdJobBoardMixin): + + def test_create_board(self): + _, jobboard = self.create_board() + self.assertEqual(f"/taskflow/jobs/{self.path}", jobboard._root_path) + + _, jobboard = self.create_board({"path": "/testpath"}) + self.assertEqual("/taskflow/jobs/testpath", jobboard._root_path) + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard.incr") + @mock.patch("threading.Condition") + @mock.patch("oslo_utils.uuidutils.generate_uuid") + @mock.patch("oslo_utils.timeutils.utcnow") + def test_post(self, + mock_utcnow: mock.Mock, + mock_generated_uuid: mock.Mock, + mock_cond: mock.Mock, + mock_incr: mock.Mock): + mock_incr.return_value = 12 + mock_generated_uuid.return_value = "uuid1" + mock_utcnow.return_value = "utcnow1" + + mock_book = mock.Mock() + mock_book.name = "book1_name" + mock_book.uuid = "book1_uuid" + mock_details = mock.Mock() + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + job = jobboard.post("post1", book=mock_book, + details=mock_details, + priority=jobs_base.JobPriority.NORMAL) + + expected_key = ( + f"/taskflow/jobs/{self.path}/job12") + expected_data_key = expected_key + jobboard.DATA_POSTFIX + expected_book_data = { + "name": "book1_name", + "uuid": "book1_uuid" + } + expected_job_posting = { + "uuid": "uuid1", + "name": "post1", + "priority": "NORMAL", + "created_on": "utcnow1", + "details": mock_details, + "book": expected_book_data, + "sequence": 12, + } + + mock_incr.assert_called_with(f"/taskflow/jobs/{self.path}/sequence") + + jobboard._client.create.assert_called_with( + expected_data_key, jsonutils.dumps(expected_job_posting)) + + self.assertEqual("post1", job.name) + self.assertEqual(expected_key, job.key) + self.assertEqual(mock_details, job.details) + self.assertEqual(mock_book, job.book) + self.assertEqual(expected_book_data, job._book_data) + self.assertEqual(jobs_base.JobPriority.NORMAL, job.priority) + self.assertEqual(12, job.sequence) + + self.assertEqual(1, len(jobboard._job_cache)) + self.assertEqual(job, jobboard._job_cache[expected_key]) + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "set_last_modified") + def test_claim(self, mock_set_last_modified): + who = "owner1" + lease_id = uuidutils.generate_uuid() + + _, jobboard = self.create_board(conf={"ttl": 37}) + jobboard._client = mock.Mock() + + mock_lease = mock.Mock(id=lease_id) + jobboard._client.lease.return_value = mock_lease + jobboard._client.create.return_value = True + jobboard._client.get.return_value = [mock.Mock()] + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7", + uuid=uuidutils.generate_uuid(), + details=mock.Mock(), + backend="etcd", + book=mock.Mock(), + book_data=mock.Mock(), + priority=jobs_base.JobPriority.NORMAL, + sequence=7, + created_on="date") + + jobboard.claim(job, who) + + jobboard._client.lease.assert_called_once_with(ttl=37) + + jobboard._client.create.assert_called_once_with( + f"{job.key}{jobboard.LOCK_POSTFIX}", + jsonutils.dumps({"owner": who, + "lease_id": lease_id}), + lease=mock_lease) + + jobboard._client.get.assert_called_once_with( + job.key + jobboard.DATA_POSTFIX) + mock_lease.revoke.assert_not_called() + + mock_set_last_modified.assert_called_once_with(job) + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "set_last_modified") + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "find_owner") + def test_claim_already_claimed(self, mock_find_owner, + mock_set_last_modified): + who = "owner1" + lease_id = uuidutils.generate_uuid() + + mock_find_owner.return_value = who + + _, jobboard = self.create_board({"ttl": 37}) + jobboard._client = mock.Mock() + + mock_lease = mock.Mock(id=lease_id) + jobboard._client.lease.return_value = mock_lease + jobboard._client.create.return_value = False + jobboard._client.get.return_value = [] + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7", + uuid=uuidutils.generate_uuid(), + details=mock.Mock(), + backend="etcd", + book=mock.Mock(), + book_data=mock.Mock(), + priority=jobs_base.JobPriority.NORMAL, + sequence=7, + created_on="date") + + self.assertRaisesRegex(exc.UnclaimableJob, "already claimed by", + jobboard.claim, job, who) + + jobboard._client.lease.assert_called_once_with(ttl=37) + + jobboard._client.create.assert_called_once_with( + f"{job.key}{jobboard.LOCK_POSTFIX}", + jsonutils.dumps({"owner": who, + "lease_id": lease_id}), + lease=mock_lease) + + mock_lease.revoke.assert_called_once() + + mock_set_last_modified.assert_not_called() + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "set_last_modified") + def test_claim_deleted(self, mock_set_last_modified): + who = "owner1" + lease_id = uuidutils.generate_uuid() + + _, jobboard = self.create_board({"ttl": 37}) + jobboard._client = mock.Mock() + + mock_lease = mock.Mock(id=lease_id) + jobboard._client.lease.return_value = mock_lease + jobboard._client.create.return_value = True + jobboard._client.get.return_value = [] + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7", + uuid=uuidutils.generate_uuid(), + details=mock.Mock(), + backend="etcd", + book=mock.Mock(), + book_data=mock.Mock(), + priority=jobs_base.JobPriority.NORMAL, + sequence=7, + created_on="date") + + self.assertRaisesRegex(exc.UnclaimableJob, "already deleted", + jobboard.claim, job, who) + + jobboard._client.lease.assert_called_once_with(ttl=37) + + jobboard._client.create.assert_called_once_with( + f"{job.key}{jobboard.LOCK_POSTFIX}", + jsonutils.dumps({"owner": who, + "lease_id": lease_id}), + lease=mock_lease) + + jobboard._client.get.assert_called_once_with( + job.key + jobboard.DATA_POSTFIX) + mock_lease.revoke.assert_called_once() + + mock_set_last_modified.assert_not_called() + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "_remove_job_from_cache") + def test_consume(self, mock__remove_job_from_cache, + mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner1", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + jobboard.consume(job, "owner1") + + jobboard._client.delete_prefix.assert_called_once_with(job.key + ".") + mock__remove_job_from_cache.assert_called_once_with(job.key) + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + def test_consume_bad_owner(self, mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner2", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + self.assertRaisesRegex(exc.JobFailure, "which is not owned", + jobboard.consume, job, "owner1") + + jobboard._client.delete_prefix.assert_not_called() + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + def test_abandon(self, mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner1", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + jobboard.abandon(job, "owner1") + + jobboard._client.delete.assert_called_once_with( + f"{job.key}{jobboard.LOCK_POSTFIX}") + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + def test_abandon_bad_owner(self, mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner2", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + self.assertRaisesRegex(exc.JobFailure, "which is not owned", + jobboard.abandon, job, "owner1") + + jobboard._client.delete.assert_not_called() + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "_remove_job_from_cache") + def test_trash(self, mock__remove_job_from_cache, + mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner1", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + jobboard.trash(job, "owner1") + + jobboard._client.create.assert_called_once_with( + f"/taskflow/.trash/{self.path}/job7", mock.ANY) + jobboard._client.delete_prefix.assert_called_once_with(job.key + ".") + mock__remove_job_from_cache.assert_called_once_with(job.key) + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "_remove_job_from_cache") + def test_trash_bad_owner(self, mock__remove_job_from_cache, + mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner2", mock.Mock()] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + self.assertRaisesRegex(exc.JobFailure, "which is not owned", + jobboard.trash, job, "owner1") + + jobboard._client.create.assert_not_called() + jobboard._client.delete_prefix.assert_not_called() + mock__remove_job_from_cache.assert_not_called() + + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "get_owner_and_data") + @mock.patch("taskflow.jobs.backends.impl_etcd.EtcdJobBoard." + "_remove_job_from_cache") + def test_trash_deleted_job(self, mock__remove_job_from_cache, + mock_get_owner_and_data): + mock_get_owner_and_data.return_value = ["owner1", None] + + _, jobboard = self.create_board() + jobboard._client = mock.Mock() + + job = impl_etcd.EtcdJob(jobboard, + "job7", + jobboard._client, + f"/taskflow/jobs/{self.path}/job7") + self.assertRaisesRegex(exc.NotFound, "Cannot find job", + jobboard.trash, job, "owner1") + + jobboard._client.create.assert_not_called() + jobboard._client.delete_prefix.assert_not_called() + mock__remove_job_from_cache.assert_not_called() + + +@testtools.skipIf(not ETCD_AVAILABLE, 'Etcd is not available') +class EtcdJobBoardTest(test.TestCase, base.BoardTestMixin, EtcdJobBoardMixin): + def setUp(self): + super().setUp() + self.client, self.board = self.create_board() + + def test__incr(self): + key = uuidutils.generate_uuid() + + self.board.connect() + self.addCleanup(self.board.close) + self.addCleanup(self.board._client.delete, key) + + self.assertEqual(1, self.board.incr(key)) + self.assertEqual(2, self.board.incr(key)) + self.assertEqual(3, self.board.incr(key)) + + self.assertEqual(b'3', self.board.get_one(key)) + self.board.close() + + def test_get_one(self): + key1 = uuidutils.generate_uuid() + + self.board.connect() + self.addCleanup(self.board._client.delete, key1) + + # put data and get it + self.board._client.put(key1, "testset1") + self.assertEqual(b"testset1", self.board.get_one(key1)) + + # delete data and check that it's not found + self.board._client.delete(key1) + self.assertIsNone(self.board.get_one(key1)) + + # get a non-existant data + key2 = uuidutils.generate_uuid() + # (ensure it doesn't exist) + self.board._client.delete(key2) + self.assertIsNone(self.board.get_one(key2)) + self.board.close() diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 2b027599e..ea147e07d 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -19,6 +19,7 @@ import string import threading import time +import etcd3gw from oslo_utils import timeutils import redis @@ -88,6 +89,15 @@ def redis_available(min_version): return ok +def etcd_available(): + client = etcd3gw.Etcd3Client() + try: + client.get("/") + except Exception: + return False + return True + + class NoopRetry(retry.AlwaysRevert): pass diff --git a/test-requirements.txt b/test-requirements.txt index aeb3776d6..743880eda 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,6 +9,9 @@ zake>=0.1.6 # Apache-2.0 # redis redis>=4.0.0 # MIT +# etcd3gw +etcd3gw>=2.0.0 # Apache-2.0 + # workers kombu>=4.3.0 # BSD