From 44f17d005ff53008144ca7c509bcb1307d66b23f Mon Sep 17 00:00:00 2001 From: Takashi Kajinami Date: Tue, 17 May 2022 22:56:45 +0900 Subject: [PATCH] Remove six This library no longer supports Python 2, thus usage of six can be removed. This also removes workaround about pickle library used in Python 2 only. Change-Id: I19d298cf0f402d65f0b142dea0bf35cf992332a9 --- doc/source/user/utils.rst | 5 --- requirements.txt | 3 -- taskflow/atom.py | 25 ++++++------ taskflow/conductors/backends/impl_executor.py | 4 +- .../conductors/backends/impl_nonblocking.py | 3 +- taskflow/conductors/base.py | 4 +- taskflow/deciders.py | 4 +- .../engines/action_engine/actions/base.py | 5 +-- taskflow/engines/action_engine/compiler.py | 3 +- taskflow/engines/action_engine/completer.py | 4 +- taskflow/engines/action_engine/deciders.py | 9 ++--- taskflow/engines/action_engine/engine.py | 10 ++--- taskflow/engines/action_engine/executor.py | 4 +- .../engines/action_engine/process_executor.py | 36 ++++------------- taskflow/engines/base.py | 5 +-- taskflow/engines/helpers.py | 3 +- taskflow/engines/worker_based/executor.py | 3 +- taskflow/engines/worker_based/protocol.py | 12 +++--- taskflow/engines/worker_based/proxy.py | 5 +-- taskflow/engines/worker_based/types.py | 13 +++---- taskflow/examples/example_utils.py | 2 +- .../jobboard_produce_consume_colors.py | 24 ++++++------ taskflow/examples/parallel_table_multiply.py | 7 ++-- taskflow/examples/run_by_iter.py | 4 +- taskflow/examples/share_engine_thread.py | 3 +- taskflow/examples/simple_map_reduce.py | 4 +- taskflow/examples/tox_conductor.py | 5 +-- taskflow/examples/wbe_event_sender.py | 4 +- taskflow/examples/wbe_mandelbrot.py | 12 +++--- taskflow/exceptions.py | 17 ++++---- taskflow/flow.py | 6 +-- taskflow/jobs/backends/impl_redis.py | 12 +++--- taskflow/jobs/backends/impl_zookeeper.py | 5 +-- taskflow/jobs/base.py | 16 ++++---- taskflow/listeners/base.py | 4 +- taskflow/listeners/claims.py | 4 +- taskflow/listeners/timing.py | 3 +- taskflow/patterns/graph_flow.py | 7 ++-- taskflow/persistence/backends/impl_memory.py | 3 +- .../persistence/backends/impl_sqlalchemy.py | 13 +++---- taskflow/persistence/base.py | 8 +--- taskflow/persistence/models.py | 20 +++++----- taskflow/persistence/path_based.py | 7 +--- taskflow/retry.py | 6 +-- taskflow/storage.py | 23 ++++++----- taskflow/task.py | 19 ++++----- taskflow/test.py | 3 +- taskflow/tests/test_examples.py | 5 +-- .../tests/unit/action_engine/test_builder.py | 15 ++++--- taskflow/tests/unit/jobs/test_redis_job.py | 3 +- taskflow/tests/unit/jobs/test_zk_job.py | 11 +++--- .../unit/persistence/test_sql_persistence.py | 5 +-- taskflow/tests/unit/test_engines.py | 5 +-- taskflow/tests/unit/test_exceptions.py | 4 -- taskflow/tests/unit/test_failure.py | 24 ++++-------- taskflow/tests/unit/test_listeners.py | 5 +-- taskflow/tests/unit/test_types.py | 2 +- taskflow/tests/unit/test_utils.py | 3 +- taskflow/tests/unit/test_utils_binary.py | 17 +++----- taskflow/tests/unit/test_utils_iter_utils.py | 39 +++++++++---------- .../tests/unit/worker_based/test_server.py | 4 +- .../tests/unit/worker_based/test_worker.py | 4 +- taskflow/tests/utils.py | 15 +++---- taskflow/types/failure.py | 18 +++++---- taskflow/types/graph.py | 6 +-- taskflow/types/notifier.py | 11 +++--- taskflow/types/sets.py | 4 +- taskflow/types/timing.py | 4 +- taskflow/types/tree.py | 7 ++-- taskflow/utils/banner.py | 6 +-- taskflow/utils/iter_utils.py | 8 ++-- taskflow/utils/kazoo_utils.py | 10 ++--- taskflow/utils/misc.py | 27 ++++++------- taskflow/utils/mixins.py | 35 ----------------- taskflow/utils/redis_utils.py | 4 +- taskflow/utils/threading_utils.py | 6 +-- tools/schema_generator.py | 7 ++-- tools/speed_test.py | 5 +-- tox.ini | 1 - 79 files changed, 266 insertions(+), 455 deletions(-) delete mode 100644 taskflow/utils/mixins.py diff --git a/doc/source/user/utils.rst b/doc/source/user/utils.rst index 3c8c9b1d3..0a482f9ee 100644 --- a/doc/source/user/utils.rst +++ b/doc/source/user/utils.rst @@ -43,11 +43,6 @@ Miscellaneous .. automodule:: taskflow.utils.misc -Mixins -~~~~~~ - -.. automodule:: taskflow.utils.mixins - Persistence ~~~~~~~~~~~ diff --git a/requirements.txt b/requirements.txt index f1cdc8005..32ded5c76 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,9 +7,6 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 # Packages needed for using this library. -# Python 2->3 compatibility library. -six>=1.10.0 # MIT - # For async and/or periodic work futurist>=1.2.0 # Apache-2.0 diff --git a/taskflow/atom.py b/taskflow/atom.py index 6e41204d8..d7757227b 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -21,8 +21,6 @@ from collections import abc as cabc import itertools from oslo_utils import reflection -import six -from six.moves import zip as compat_zip from taskflow.types import sets from taskflow.utils import misc @@ -47,7 +45,7 @@ def _save_as_to_mapping(save_as): # atom returns is pretty crucial for other later operations. if save_as is None: return collections.OrderedDict() - if isinstance(save_as, six.string_types): + if isinstance(save_as, str): # NOTE(harlowja): this means that your atom will only return one item # instead of a dictionary-like object or a indexable object (like a # list or tuple). @@ -83,7 +81,7 @@ def _build_rebind_dict(req_args, rebind_args): # the required argument names (if they are the same length then # this determines how to remap the required argument names to the # rebound ones). - rebind = collections.OrderedDict(compat_zip(req_args, rebind_args)) + rebind = collections.OrderedDict(zip(req_args, rebind_args)) if len(req_args) < len(rebind_args): # Extra things were rebound, that may be because of *args # or **kwargs (or some other reason); so just keep all of them @@ -128,7 +126,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, # Add additional manually provided requirements to required mappings. if reqs: - if isinstance(reqs, six.string_types): + if isinstance(reqs, str): required.update({reqs: reqs}) else: required.update((a, a) for a in reqs) @@ -139,7 +137,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, # Determine if there are optional arguments that we may or may not take. if do_infer: opt_args = sets.OrderedSet(all_args) - opt_args = opt_args - set(itertools.chain(six.iterkeys(required), + opt_args = opt_args - set(itertools.chain(required.keys(), iter(ignore_list))) optional = collections.OrderedDict((a, a) for a in opt_args) else: @@ -147,7 +145,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, # Check if we are given some extra arguments that we aren't able to accept. if not reflection.accepts_kwargs(function): - extra_args = sets.OrderedSet(six.iterkeys(required)) + extra_args = sets.OrderedSet(required.keys()) extra_args -= all_args if extra_args: raise ValueError('Extra arguments given to atom %s: %s' @@ -161,8 +159,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, return required, optional -@six.add_metaclass(abc.ABCMeta) -class Atom(object): +class Atom(object, metaclass=abc.ABCMeta): """An unit of work that causes a flow to progress (in some manner). An atom is a named object that operates with input data to perform @@ -299,13 +296,13 @@ class Atom(object): # key value, then well there is no rebinding happening, otherwise # there will be. rebind = collections.OrderedDict() - for (arg_name, bound_name) in itertools.chain(six.iteritems(required), - six.iteritems(optional)): + for (arg_name, bound_name) in itertools.chain(required.items(), + optional.items()): rebind.setdefault(arg_name, bound_name) - requires = sets.OrderedSet(six.itervalues(required)) - optional = sets.OrderedSet(six.itervalues(optional)) + requires = sets.OrderedSet(required.values()) + optional = sets.OrderedSet(optional.values()) if self.inject: - inject_keys = frozenset(six.iterkeys(self.inject)) + inject_keys = frozenset(self.inject.keys()) requires -= inject_keys optional -= inject_keys return rebind, requires, optional diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index ab55821b0..298bc8747 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -20,7 +20,6 @@ import threading from oslo_utils import excutils from oslo_utils import timeutils -import six from taskflow.conductors import base from taskflow import exceptions as excp @@ -34,8 +33,7 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class ExecutorConductor(base.Conductor): +class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta): """Dispatches jobs from blocking :py:meth:`.run` method to some executor. This conductor iterates over jobs in the provided jobboard (waiting for diff --git a/taskflow/conductors/backends/impl_nonblocking.py b/taskflow/conductors/backends/impl_nonblocking.py index 76893d703..dcdd2388a 100644 --- a/taskflow/conductors/backends/impl_nonblocking.py +++ b/taskflow/conductors/backends/impl_nonblocking.py @@ -13,7 +13,6 @@ # under the License. import futurist -import six from taskflow.conductors.backends import impl_executor from taskflow.utils import threading_utils as tu @@ -63,7 +62,7 @@ class NonBlockingConductor(impl_executor.ExecutorConductor): if executor_factory is None: self._executor_factory = self._default_executor_factory else: - if not six.callable(executor_factory): + if not callable(executor_factory): raise ValueError("Provided keyword argument 'executor_factory'" " must be callable") self._executor_factory = executor_factory diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 1f0da0249..2f9296ded 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -17,7 +17,6 @@ import os import threading import fasteners -import six from taskflow import engines from taskflow import exceptions as excp @@ -26,8 +25,7 @@ from taskflow.types import notifier from taskflow.utils import misc -@six.add_metaclass(abc.ABCMeta) -class Conductor(object): +class Conductor(object, metaclass=abc.ABCMeta): """Base for all conductor implementations. Conductors act as entities which extract jobs from a jobboard, assign diff --git a/taskflow/deciders.py b/taskflow/deciders.py index c96b32185..9e316635f 100644 --- a/taskflow/deciders.py +++ b/taskflow/deciders.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import six - from taskflow.utils import misc @@ -74,7 +72,7 @@ class Depth(misc.StrEnum): if isinstance(desired_depth, cls): # Nothing to do in the first place... return desired_depth - if not isinstance(desired_depth, six.string_types): + if not isinstance(desired_depth, str): raise TypeError("Unexpected desired depth type, string type" " expected, not %s" % type(desired_depth)) try: diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 3a014e122..e4be82557 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -16,13 +16,10 @@ import abc -import six - from taskflow import states -@six.add_metaclass(abc.ABCMeta) -class Action(object): +class Action(object, metaclass=abc.ABCMeta): """An action that handles executing, state changes, ... of atoms.""" NO_RESULT = object() diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index c72506ad9..f1e0e5469 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -18,7 +18,6 @@ import threading import fasteners from oslo_utils import excutils -import six from taskflow import flow from taskflow import logging @@ -165,7 +164,7 @@ class FlowCompiler(object): decomposed = dict( (child, self._deep_compiler_func(child, parent=tree_node)[0]) for child in flow) - decomposed_graphs = list(six.itervalues(decomposed)) + decomposed_graphs = list(decomposed.values()) graph = gr.merge_graphs(graph, *decomposed_graphs, overlap_detector=_overlap_occurrence_detector) for u, v, attr_dict in flow.iter_links(): diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 59a2dbf32..028b64a14 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -19,7 +19,6 @@ import weakref from oslo_utils import reflection from oslo_utils import strutils -import six from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import executor as ex @@ -30,8 +29,7 @@ from taskflow import states as st LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class Strategy(object): +class Strategy(object, metaclass=abc.ABCMeta): """Failure resolution strategy base class.""" strategy = None diff --git a/taskflow/engines/action_engine/deciders.py b/taskflow/engines/action_engine/deciders.py index 4c0c04ba7..b8c399a47 100644 --- a/taskflow/engines/action_engine/deciders.py +++ b/taskflow/engines/action_engine/deciders.py @@ -17,8 +17,6 @@ import abc import itertools -import six - from taskflow import deciders from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import traversal @@ -28,8 +26,7 @@ from taskflow import states LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class Decider(object): +class Decider(object, metaclass=abc.ABCMeta): """Base class for deciders. Provides interface to be implemented by sub-classes. @@ -135,7 +132,7 @@ class IgnoreDecider(Decider): states_intentions = runtime.storage.get_atoms_states( ed.from_node.name for ed in self._edge_deciders if ed.kind in compiler.ATOMS) - for atom_name in six.iterkeys(states_intentions): + for atom_name in states_intentions.keys(): atom_state, _atom_intention = states_intentions[atom_name] if atom_state != states.IGNORE: history[atom_name] = runtime.storage.get(atom_name) @@ -155,7 +152,7 @@ class IgnoreDecider(Decider): LOG.trace("Out of %s deciders there were %s 'do no run it'" " voters, %s 'do run it' voters and %s 'ignored'" " voters for transition to atom '%s' given history %s", - sum(len(eds) for eds in six.itervalues(voters)), + sum(len(eds) for eds in voters.values()), list(ed.from_node.name for ed in voters['do_not_run_it']), list(ed.from_node.name for ed in voters['run_it']), diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index cf7c042e2..4895ac3a1 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -22,11 +22,11 @@ import threading from automaton import runners from concurrent import futures import fasteners +import functools import networkx as nx from oslo_utils import excutils from oslo_utils import strutils from oslo_utils import timeutils -import six from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler @@ -65,7 +65,7 @@ def _pre_check(check_compiled=True, check_storage_ensured=True, def decorator(meth): do_what = meth.__name__ - @six.wraps(meth) + @functools.wraps(meth) def wrapper(self, *args, **kwargs): if check_compiled and not self._compiled: raise exc.InvalidState("Can not %s an engine which" @@ -335,8 +335,8 @@ class ActionEngine(base.Engine): e_failures = self.storage.get_execute_failures() r_failures = self.storage.get_revert_failures() er_failures = itertools.chain( - six.itervalues(e_failures), - six.itervalues(r_failures)) + e_failures.values(), + r_failures.values()) failure.Failure.reraise_if_any(er_failures) finally: if w is not None: @@ -594,7 +594,7 @@ String (case insensitive) Executor used executor_cls = cls._default_executor_cls # Match the desired executor to a class that will work with it... desired_executor = options.get('executor') - if isinstance(desired_executor, six.string_types): + if isinstance(desired_executor, str): matched_executor_cls = None for m in cls._executor_str_matchers: if m.matches(desired_executor): diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 5dbaf58e3..d0c2ecf01 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -17,7 +17,6 @@ import abc import futurist -import six from taskflow import task as ta from taskflow.types import failure @@ -106,8 +105,7 @@ class SerialRetryExecutor(object): return fut -@six.add_metaclass(abc.ABCMeta) -class TaskExecutor(object): +class TaskExecutor(object, metaclass=abc.ABCMeta): """Executes and reverts tasks. This class takes task and its arguments and executes or reverts it. diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py index 4115653f8..d20f7a3f6 100644 --- a/taskflow/engines/action_engine/process_executor.py +++ b/taskflow/engines/action_engine/process_executor.py @@ -30,7 +30,6 @@ import time import futurist from oslo_utils import excutils -import six from taskflow.engines.action_engine import executor as base from taskflow import logging @@ -80,19 +79,6 @@ SCHEMAS = { }, } -# See http://bugs.python.org/issue1457119 for why this is so complex... -_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError] -try: - import cPickle - _DECODE_ENCODE_ERRORS.append(cPickle.PickleError) - del cPickle -except (ImportError, AttributeError): - pass -_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS) - -# Use the best pickle from here on out... -from six.moves import cPickle as pickle - class UnknownSender(Exception): """Exception raised when message from unknown sender is recvd.""" @@ -142,13 +128,13 @@ class Reader(object): ]) def __init__(self, auth_key, dispatch_func, msg_limit=-1): - if not six.callable(dispatch_func): + if not callable(dispatch_func): raise ValueError("Expected provided dispatch function" " to be callable") self.auth_key = auth_key self.dispatch_func = dispatch_func msg_limiter = iter_utils.iter_forever(msg_limit) - self.msg_count = six.next(msg_limiter) + self.msg_count = next(msg_limiter) self._msg_limiter = msg_limiter self._buffer = misc.BytesIO() self._state = None @@ -200,7 +186,7 @@ class Reader(object): # (instead of the receiver discarding it after the fact)... functools.partial(_decode_message, self.auth_key, data, self._memory['mac'])) - self.msg_count = six.next(self._msg_limiter) + self.msg_count = next(self._msg_limiter) self._memory.clear() def _transition(self): @@ -267,7 +253,7 @@ def _create_random_string(desired_length): def _calculate_hmac(auth_key, body): mac = hmac.new(auth_key, body, hashlib.md5).hexdigest() - if isinstance(mac, six.text_type): + if isinstance(mac, str): mac = mac.encode("ascii") return mac @@ -427,11 +413,8 @@ class DispatcherHandler(asyncore.dispatcher): CHUNK_SIZE = 8192 def __init__(self, sock, addr, dispatcher): - if six.PY2: - asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock) - else: - super(DispatcherHandler, self).__init__(map=dispatcher.map, - sock=sock) + super(DispatcherHandler, self).__init__(map=dispatcher.map, + sock=sock) self.blobs_to_write = list(dispatcher.challenge_pieces) self.reader = Reader(dispatcher.auth_key, self._dispatch) self.targets = dispatcher.targets @@ -508,7 +491,7 @@ class DispatcherHandler(asyncore.dispatcher): except (IOError, UnknownSender): LOG.warning("Invalid received message", exc_info=True) self.handle_close() - except _DECODE_ENCODE_ERRORS: + except (pickle.PickleError, TypeError): LOG.warning("Badly formatted message", exc_info=True) self.handle_close() except (ValueError, su.ValidationError): @@ -526,10 +509,7 @@ class Dispatcher(asyncore.dispatcher): MAX_BACKLOG = 5 def __init__(self, map, auth_key, identity): - if six.PY2: - asyncore.dispatcher.__init__(self, map=map) - else: - super(Dispatcher, self).__init__(map=map) + super(Dispatcher, self).__init__(map=map) self.identity = identity self.challenge_pieces = _encode_message(auth_key, CHALLENGE, identity, reverse=True) diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 84d227e3f..92ecdbd16 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -17,14 +17,11 @@ import abc -import six - from taskflow.types import notifier from taskflow.utils import misc -@six.add_metaclass(abc.ABCMeta) -class Engine(object): +class Engine(object, metaclass=abc.ABCMeta): """Base for all engines implementations. :ivar Engine.notifier: A notification object that will dispatch diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index 9bcbc12d1..e3f2a8be9 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -18,7 +18,6 @@ import contextlib from oslo_utils import importutils from oslo_utils import reflection -import six import stevedore.driver from taskflow import exceptions as exc @@ -68,7 +67,7 @@ def _fetch_factory(factory_name): def _fetch_validate_factory(flow_factory): - if isinstance(flow_factory, six.string_types): + if isinstance(flow_factory, str): factory_fun = _fetch_factory(flow_factory) factory_name = flow_factory else: diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 021503788..14139bf9e 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -18,7 +18,6 @@ import functools import threading from oslo_utils import timeutils -import six from taskflow.engines.action_engine import executor from taskflow.engines.worker_based import dispatcher @@ -141,7 +140,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): if not self._ongoing_requests: return with self._ongoing_requests_lock: - ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests)) + ongoing_requests_uuids = set(self._ongoing_requests.keys()) waiting_requests = {} expired_requests = {} for request_uuid in ongoing_requests_uuids: diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 78991ffd6..a71dbf387 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -25,7 +25,6 @@ import futurist from oslo_serialization import jsonutils from oslo_utils import reflection from oslo_utils import timeutils -import six from taskflow.engines.action_engine import executor from taskflow import exceptions as excp @@ -148,8 +147,7 @@ def failure_to_dict(failure): return failure.to_dict(include_args=False) -@six.add_metaclass(abc.ABCMeta) -class Message(object): +class Message(object, metaclass=abc.ABCMeta): """Base class for all message types.""" def __repr__(self): @@ -292,7 +290,7 @@ class Request(Message): }, 'action': { "type": "string", - "enum": list(six.iterkeys(ACTION_TO_EVENT)), + "enum": list(ACTION_TO_EVENT.keys()), }, # Keyword arguments that end up in the revert() or execute() # method of the remote task. @@ -367,7 +365,7 @@ class Request(Message): request['result'] = ('success', result) if self._failures: request['failures'] = {} - for atom_name, failure in six.iteritems(self._failures): + for atom_name, failure in self._failures.items(): request['failures'][atom_name] = failure_to_dict(failure) return request @@ -431,7 +429,7 @@ class Request(Message): # Validate all failure dictionaries that *may* be present... failures = [] if 'failures' in data: - failures.extend(six.itervalues(data['failures'])) + failures.extend(data['failures'].values()) result = data.get('result') if result is not None: result_data_type, result_data = result @@ -470,7 +468,7 @@ class Request(Message): arguments['result'] = result_data if failures is not None: arguments['failures'] = {} - for task, fail_data in six.iteritems(failures): + for task, fail_data in failures.items(): arguments['failures'][task] = ft.Failure.from_dict(fail_data) return _WorkUnit(task_cls, task_name, action, arguments) diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index e58c7a2ef..eff6cfb02 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -19,7 +19,6 @@ import threading import kombu from kombu import exceptions as kombu_exceptions -import six from taskflow.engines.worker_based import dispatcher from taskflow import logging @@ -85,7 +84,7 @@ class Proxy(object): ensure_options = self.DEFAULT_RETRY_OPTIONS.copy() if retry_options is not None: # Override the defaults with any user provided values... - for k in set(six.iterkeys(ensure_options)): + for k in set(ensure_options.keys()): if k in retry_options: # Ensure that the right type is passed in... val = retry_options[k] @@ -154,7 +153,7 @@ class Proxy(object): def publish(self, msg, routing_key, reply_to=None, correlation_id=None): """Publish message to the named exchange with given routing key.""" - if isinstance(routing_key, six.string_types): + if isinstance(routing_key, str): routing_keys = [routing_key] else: routing_keys = routing_key diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index b2334a3a6..bad803ebd 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -19,7 +19,6 @@ import threading from oslo_utils import reflection from oslo_utils import timeutils -import six from taskflow.engines.worker_based import protocol as pr from taskflow import logging @@ -39,7 +38,7 @@ class TopicWorker(object): def __init__(self, topic, tasks, identity=_NO_IDENTITY): self.tasks = [] for task in tasks: - if not isinstance(task, six.string_types): + if not isinstance(task, str): task = reflection.get_class_name(task) self.tasks.append(task) self.topic = topic @@ -47,7 +46,7 @@ class TopicWorker(object): self.last_seen = None def performs(self, task): - if not isinstance(task, six.string_types): + if not isinstance(task, str): task = reflection.get_class_name(task) return task in self.tasks @@ -215,18 +214,18 @@ class ProxyWorkerFinder(object): dead_workers = {} with self._cond: now = timeutils.now() - for topic, worker in six.iteritems(self._workers): + for topic, worker in self._workers.items(): if worker.last_seen is None: continue secs_since_last_seen = max(0, now - worker.last_seen) if secs_since_last_seen >= self._worker_expiry: dead_workers[topic] = (worker, secs_since_last_seen) - for topic in six.iterkeys(dead_workers): + for topic in dead_workers.keys(): self._workers.pop(topic) if dead_workers: self._cond.notify_all() if dead_workers and LOG.isEnabledFor(logging.INFO): - for worker, secs_since_last_seen in six.itervalues(dead_workers): + for worker, secs_since_last_seen in dead_workers.values(): LOG.info("Removed worker '%s' as it has not responded to" " notification requests in %0.3f seconds", worker, secs_since_last_seen) @@ -245,7 +244,7 @@ class ProxyWorkerFinder(object): """Gets a worker that can perform a given task.""" available_workers = [] with self._cond: - for worker in six.itervalues(self._workers): + for worker in self._workers.values(): if worker.performs(task): available_workers.append(worker) if available_workers: diff --git a/taskflow/examples/example_utils.py b/taskflow/examples/example_utils.py index be08e53aa..1c95ca7cd 100644 --- a/taskflow/examples/example_utils.py +++ b/taskflow/examples/example_utils.py @@ -21,7 +21,7 @@ import shutil import sys import tempfile -from six.moves import urllib_parse +from urllib import parse as urllib_parse from taskflow import exceptions from taskflow.persistence import backends diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py index 54983c474..a2553a987 100644 --- a/taskflow/examples/jobboard_produce_consume_colors.py +++ b/taskflow/examples/jobboard_produce_consume_colors.py @@ -30,8 +30,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) -import six -from six.moves import range as compat_range from zake import fake_client from taskflow import exceptions as excp @@ -139,7 +137,7 @@ def producer(ident, client): name = "P-%s" % (ident) safe_print(name, "started") with backends.backend(name, SHARED_CONF.copy(), client=client) as board: - for i in compat_range(0, PRODUCER_UNITS): + for i in range(0, PRODUCER_UNITS): job_name = "%s-%s" % (name, i) details = { 'color': random.choice(['red', 'blue']), @@ -151,22 +149,22 @@ def producer(ident, client): def main(): - if six.PY3: - # TODO(harlowja): Hack to make eventlet work right, remove when the - # following is fixed: https://github.com/eventlet/eventlet/issues/230 - from taskflow.utils import eventlet_utils as _eu # noqa - try: - import eventlet as _eventlet # noqa - except ImportError: - pass + # TODO(harlowja): Hack to make eventlet work right, remove when the + # following is fixed: https://github.com/eventlet/eventlet/issues/230 + from taskflow.utils import eventlet_utils as _eu # noqa + try: + import eventlet as _eventlet # noqa + except ImportError: + pass + with contextlib.closing(fake_client.FakeClient()) as c: created = [] - for i in compat_range(0, PRODUCERS): + for i in range(0, PRODUCERS): p = threading_utils.daemon_thread(producer, i + 1, c) created.append(p) p.start() consumed = collections.deque() - for i in compat_range(0, WORKERS): + for i in range(0, WORKERS): w = threading_utils.daemon_thread(worker, i + 1, c, consumed) created.append(w) w.start() diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py index 5cd8e9c82..3eaa15eeb 100644 --- a/taskflow/examples/parallel_table_multiply.py +++ b/taskflow/examples/parallel_table_multiply.py @@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) import futurist -from six.moves import range as compat_range from taskflow import engines from taskflow.patterns import unordered_flow as uf @@ -86,9 +85,9 @@ def main(): tbl = [] cols = random.randint(1, 100) rows = random.randint(1, 100) - for _i in compat_range(0, rows): + for _i in range(0, rows): row = [] - for _j in compat_range(0, cols): + for _j in range(0, cols): row.append(random.random()) tbl.append(row) @@ -112,7 +111,7 @@ def main(): # # TODO(harlowja): probably easier just to sort instead of search... computed_tbl = [] - for i in compat_range(0, len(tbl)): + for i in range(0, len(tbl)): for t in f: if t.index == i: computed_tbl.append(e.storage.get(t.name)) diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py index 37087ec94..061ba2ee9 100644 --- a/taskflow/examples/run_by_iter.py +++ b/taskflow/examples/run_by_iter.py @@ -18,8 +18,6 @@ import logging import os import sys -import six - logging.basicConfig(level=logging.ERROR) self_dir = os.path.abspath(os.path.dirname(__file__)) @@ -81,6 +79,6 @@ for f in flows: while engine_iters: for it in list(engine_iters): try: - print(six.next(it)) + print(next(it)) except StopIteration: engine_iters.remove(it) diff --git a/taskflow/examples/share_engine_thread.py b/taskflow/examples/share_engine_thread.py index 5223721b1..6372138d9 100644 --- a/taskflow/examples/share_engine_thread.py +++ b/taskflow/examples/share_engine_thread.py @@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) import futurist -import six from taskflow import engines from taskflow.patterns import unordered_flow as uf @@ -73,7 +72,7 @@ with futurist.ThreadPoolExecutor() as ex: # and there is no more engine work to be done. for it in cloned_iters: try: - six.next(it) + next(it) except StopIteration: try: iters.remove(it) diff --git a/taskflow/examples/simple_map_reduce.py b/taskflow/examples/simple_map_reduce.py index 6476b488a..bfaabdeda 100644 --- a/taskflow/examples/simple_map_reduce.py +++ b/taskflow/examples/simple_map_reduce.py @@ -33,8 +33,6 @@ sys.path.insert(0, self_dir) # produced values and perform a final summation and this result will then be # printed (and verified to ensure the calculation was as expected). -import six - from taskflow import engines from taskflow.patterns import linear_flow from taskflow.patterns import unordered_flow @@ -51,7 +49,7 @@ class TotalReducer(task.Task): def execute(self, *args, **kwargs): # Reduces all mapped summed outputs into a single value. total = 0 - for (k, v) in six.iteritems(kwargs): + for (k, v) in kwargs.items(): # If any other kwargs was passed in, we don't want to use those # in the calculation of the total... if k.startswith('reduction_'): diff --git a/taskflow/examples/tox_conductor.py b/taskflow/examples/tox_conductor.py index 66e575b55..7490650ab 100644 --- a/taskflow/examples/tox_conductor.py +++ b/taskflow/examples/tox_conductor.py @@ -34,7 +34,6 @@ sys.path.insert(0, top_dir) from oslo_utils import timeutils from oslo_utils import uuidutils -import six from zake import fake_client from taskflow.conductors import backends as conductors @@ -114,7 +113,7 @@ def review_iter(): """Makes reviews (never-ending iterator/generator).""" review_id_gen = itertools.count(0) while True: - review_id = six.next(review_id_gen) + review_id = next(review_id_gen) review = { 'id': review_id, } @@ -172,7 +171,7 @@ def generate_reviewer(client, saver, name=NAME): review_generator = review_iter() with contextlib.closing(jb): while not no_more.is_set(): - review = six.next(review_generator) + review = next(review_generator) details = { 'store': { 'review': review, diff --git a/taskflow/examples/wbe_event_sender.py b/taskflow/examples/wbe_event_sender.py index 9f9dbd82c..2d726834d 100644 --- a/taskflow/examples/wbe_event_sender.py +++ b/taskflow/examples/wbe_event_sender.py @@ -25,8 +25,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) -from six.moves import range as compat_range - from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import linear_flow as lf @@ -124,7 +122,7 @@ if __name__ == "__main__": try: # Create a set of worker threads to simulate actual remote workers... print('Running %s workers.' % (MEMORY_WORKERS)) - for i in compat_range(0, MEMORY_WORKERS): + for i in range(0, MEMORY_WORKERS): # Give each one its own unique topic name so that they can # correctly communicate with the engine (they will all share the # same exchange). diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py index 48db5e65c..bc954fea9 100644 --- a/taskflow/examples/wbe_mandelbrot.py +++ b/taskflow/examples/wbe_mandelbrot.py @@ -24,8 +24,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) -from six.moves import range as compat_range - from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import unordered_flow as uf @@ -84,7 +82,7 @@ class MandelCalculator(task.Task): def mandelbrot(x, y, max_iters): c = complex(x, y) z = 0.0j - for i in compat_range(max_iters): + for i in range(max_iters): z = z * z + c if (z.real * z.real + z.imag * z.imag) >= 4: return i @@ -95,10 +93,10 @@ class MandelCalculator(task.Task): pixel_size_x = (max_x - min_x) / width pixel_size_y = (max_y - min_y) / height block = [] - for y in compat_range(chunk[0], chunk[1]): + for y in range(chunk[0], chunk[1]): row = [] imag = min_y + y * pixel_size_y - for x in compat_range(0, width): + for x in range(0, width): real = min_x + x * pixel_size_x row.append(mandelbrot(real, imag, max_iters)) block.append(row) @@ -133,7 +131,7 @@ def calculate(engine_conf): # Compose our workflow. height, _width = IMAGE_SIZE chunk_size = int(math.ceil(height / float(CHUNK_COUNT))) - for i in compat_range(0, CHUNK_COUNT): + for i in range(0, CHUNK_COUNT): chunk_name = 'chunk_%s' % i task_name = "calculation_%s" % i # Break the calculation up into chunk size pieces. @@ -225,7 +223,7 @@ def create_fractal(): try: # Create a set of workers to simulate actual remote workers. print('Running %s workers.' % (WORKERS)) - for i in compat_range(0, WORKERS): + for i in range(0, WORKERS): worker_conf['topic'] = 'calculator_%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index fab32a939..0d9df104c 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -14,13 +14,12 @@ # License for the specific language governing permissions and limitations # under the License. +import io import os import traceback from oslo_utils import excutils from oslo_utils import reflection -import six -from taskflow.utils import mixins def raise_with_cause(exc_cls, message, *args, **kwargs): @@ -89,7 +88,7 @@ class TaskFlowException(Exception): if indent < 0: raise ValueError("Provided 'indent' must be greater than" " or equal to zero instead of %s" % indent) - buf = six.StringIO() + buf = io.StringIO() if show_root_class: buf.write(reflection.get_class_name(self, fully_qualified=False)) buf.write(": ") @@ -244,7 +243,7 @@ class NotImplementedError(NotImplementedError): """ -class WrappedFailure(mixins.StrMixin, Exception): +class WrappedFailure(Exception): """Wraps one or several failure objects. When exception/s cannot be re-raised (for example, because the value and @@ -298,17 +297,17 @@ class WrappedFailure(mixins.StrMixin, Exception): return None def __bytes__(self): - buf = six.BytesIO() + buf = io.BytesIO() buf.write(b'WrappedFailure: [') - causes_gen = (six.binary_type(cause) for cause in self._causes) + causes_gen = (bytes(cause) for cause in self._causes) buf.write(b", ".join(causes_gen)) buf.write(b']') return buf.getvalue() - def __unicode__(self): - buf = six.StringIO() + def __str__(self): + buf = io.StringIO() buf.write(u'WrappedFailure: [') - causes_gen = (six.text_type(cause) for cause in self._causes) + causes_gen = (str(cause) for cause in self._causes) buf.write(u", ".join(causes_gen)) buf.write(u']') return buf.getvalue() diff --git a/taskflow/flow.py b/taskflow/flow.py index 3b974f7b0..8fac64740 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -17,7 +17,6 @@ import abc from oslo_utils import reflection -import six # Link metadata keys that have inherent/special meaning. # @@ -43,8 +42,7 @@ _CHOP_PAT_LEN = len(_CHOP_PAT) LINK_DECIDER_DEPTH = 'decider_depth' -@six.add_metaclass(abc.ABCMeta) -class Flow(object): +class Flow(object, metaclass=abc.ABCMeta): """The base abstract class of all flow implementations. A flow is a structure that defines relationships between tasks. You can @@ -60,7 +58,7 @@ class Flow(object): """ def __init__(self, name, retry=None): - self._name = six.text_type(name) + self._name = str(name) self._retry = retry # NOTE(akarpinska): if retry doesn't have a name, # the name of its owner will be assigned diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 8c1e51136..30fa0f309 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -30,8 +30,6 @@ from oslo_utils import timeutils from oslo_utils import uuidutils from redis import exceptions as redis_exceptions from redis import sentinel -import six -from six.moves import range as compat_range from taskflow import exceptions as exc from taskflow.jobs import base @@ -620,9 +618,9 @@ return cmsgpack.pack(result) key_pieces = [key_piece] if more_key_pieces: key_pieces.extend(more_key_pieces) - for i in compat_range(0, len(namespace_pieces)): + for i in range(0, len(namespace_pieces)): namespace_pieces[i] = misc.binary_encode(namespace_pieces[i]) - for i in compat_range(0, len(key_pieces)): + for i in range(0, len(key_pieces)): key_pieces[i] = misc.binary_encode(key_pieces[i]) namespace = b"".join(namespace_pieces) key = self.KEY_PIECE_SEP.join(key_pieces) @@ -696,7 +694,7 @@ return cmsgpack.pack(result) 'already_claimed': self.SCRIPT_ALREADY_CLAIMED, } prepared_scripts = {} - for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES): + for n, raw_script_tpl in self.SCRIPT_TEMPLATES.items(): script_tpl = string.Template(raw_script_tpl) script_blob = script_tpl.substitute(**script_params) script = self._client.register_script(script_blob) @@ -761,7 +759,7 @@ return cmsgpack.pack(result) }) with _translate_failures(): raw_posting = self._dumps(posting) - raw_job_uuid = six.b(job_uuid) + raw_job_uuid = job_uuid.encode('latin-1') was_posted = bool(self._client.hsetnx(self.listings_key, raw_job_uuid, raw_posting)) if not was_posted: @@ -813,7 +811,7 @@ return cmsgpack.pack(result) with _translate_failures(): raw_postings = self._client.hgetall(self.listings_key) postings = [] - for raw_job_key, raw_posting in six.iteritems(raw_postings): + for raw_job_key, raw_posting in raw_postings.items(): try: job_data = self._loads(raw_posting) try: diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6ee222274..fc9399a26 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -30,7 +30,6 @@ from oslo_serialization import jsonutils from oslo_utils import excutils from oslo_utils import timeutils from oslo_utils import uuidutils -import six from taskflow.conductors import base as c_base from taskflow import exceptions as excp @@ -373,7 +372,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if ensure_fresh: self._force_refresh() with self._job_cond: - return sorted(six.itervalues(self._known_jobs)) + return sorted(self._known_jobs.values()) def _force_refresh(self): try: @@ -479,7 +478,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): investigate_paths = [] pending_removals = [] with self._job_cond: - for path in six.iterkeys(self._known_jobs): + for path in self._known_jobs.keys(): if path not in child_paths: pending_removals.append(path) for path in child_paths: diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index 4e9bf1a26..3bf5198c7 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -18,12 +18,12 @@ import abc import collections import contextlib +import functools import time import enum from oslo_utils import timeutils from oslo_utils import uuidutils -import six from taskflow import exceptions as excp from taskflow import states @@ -105,8 +105,7 @@ class JobPriority(enum.Enum): return tuple(values) -@six.add_metaclass(abc.ABCMeta) -class Job(object): +class Job(object, metaclass=abc.ABCMeta): """A abstraction that represents a named and trackable unit of work. A job connects a logbook, a owner, a priority, last modified and created @@ -195,7 +194,7 @@ class Job(object): return False if self.state == states.COMPLETE: return True - sleepy_secs = six.next(delay_gen) + sleepy_secs = next(delay_gen) if w is not None: sleepy_secs = min(w.leftover(), sleepy_secs) sleep_func(sleepy_secs) @@ -269,7 +268,7 @@ class Job(object): self.uuid, self.details) -class JobBoardIterator(six.Iterator): +class JobBoardIterator(object): """Iterator over a jobboard that iterates over potential jobs. It provides the following attributes: @@ -342,8 +341,7 @@ class JobBoardIterator(six.Iterator): return job -@six.add_metaclass(abc.ABCMeta) -class JobBoard(object): +class JobBoard(object, metaclass=abc.ABCMeta): """A place where jobs can be posted, reposted, claimed and transferred. There can be multiple implementations of this job board, depending on the @@ -559,9 +557,9 @@ class NotifyingJobBoard(JobBoard): def check_who(meth): - @six.wraps(meth) + @functools.wraps(meth) def wrapper(self, job, who, *args, **kwargs): - if not isinstance(who, six.string_types): + if not isinstance(who, str): raise TypeError("Job applicant must be a string type") if len(who) == 0: raise ValueError("Job applicant must be non-empty") diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index f5113a4e6..462a12132 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -17,7 +17,6 @@ import abc from oslo_utils import excutils -import six from taskflow import logging from taskflow import states @@ -161,8 +160,7 @@ class Listener(object): self._engine, exc_info=True) -@six.add_metaclass(abc.ABCMeta) -class DumpingListener(Listener): +class DumpingListener(Listener, metaclass=abc.ABCMeta): """Abstract base class for dumping listeners. This provides a simple listener that can be attached to an engine which can diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py index dac74ce41..000d4a475 100644 --- a/taskflow/listeners/claims.py +++ b/taskflow/listeners/claims.py @@ -17,8 +17,6 @@ import logging import os -import six - from taskflow import exceptions from taskflow.listeners import base from taskflow import states @@ -58,7 +56,7 @@ class CheckingClaimListener(base.Listener): if on_job_loss is None: self._on_job_loss = self._suspend_engine_on_loss else: - if not six.callable(on_job_loss): + if not callable(on_job_loss): raise ValueError("Custom 'on_job_loss' handler must be" " callable") self._on_job_loss = on_job_loss diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index 8634ee00a..587be8c8b 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -15,7 +15,6 @@ # under the License. import itertools -import six import time from oslo_utils import timeutils @@ -58,7 +57,7 @@ class DurationListener(base.Listener): super(DurationListener, self).deregister() # There should be none that still exist at deregistering time, so log a # warning if there were any that somehow still got left behind... - for item_type, timers in six.iteritems(self._timers): + for item_type, timers in self._timers.items(): leftover_timers = len(timers) if leftover_timers: LOG.warning("%s %s(s) did not enter %s states", diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 52c8178a7..903e81fe8 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -15,8 +15,7 @@ # under the License. import collections - -import six +import functools from taskflow import deciders as de from taskflow import exceptions as exc @@ -109,7 +108,7 @@ class Flow(flow.Flow): if not self._graph.has_node(v): raise ValueError("Node '%s' not found to link to" % (v)) if decider is not None: - if not six.callable(decider): + if not callable(decider): raise ValueError("Decider boolean callback must be callable") self._swap(self._link(u, v, manual=True, decider=decider, decider_depth=decider_depth)) @@ -316,7 +315,7 @@ class Flow(flow.Flow): def _reset_cached_subgraph(func): """Resets cached subgraph after execution, in case it was affected.""" - @six.wraps(func) + @functools.wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) self._subgraph = None diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 3b6f10cd8..d7fdef86b 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -21,7 +21,6 @@ import itertools import posixpath as pp import fasteners -import six from taskflow import exceptions as exc from taskflow.persistence import path_based @@ -261,7 +260,7 @@ class FakeFilesystem(object): if 'target' in node.metadata: return "%s (link to %s)" % (node.item, node.metadata['target']) else: - return six.text_type(node.item) + return str(node.item) def pformat(self): """Pretty format this in-memory filesystem.""" diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 2dbe6ff32..3ac0f3db2 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -22,7 +22,6 @@ import threading import time from oslo_utils import strutils -import six import sqlalchemy as sa from sqlalchemy import exc as sa_exc from sqlalchemy import pool as sa_pool @@ -116,7 +115,7 @@ def _log_statements(log_level, conn, cursor, statement, parameters, *args): def _in_any(reason, err_haystack): """Checks if any elements of the haystack are in the given reason.""" for err in err_haystack: - if reason.find(six.text_type(err)) != -1: + if reason.find(str(err)) != -1: return True return False @@ -173,10 +172,10 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy): try: dbapi_conn.cursor().execute('select 1') except dbapi_conn.OperationalError as ex: - if _in_any(six.text_type(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS): + if _in_any(str(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS): LOG.warning('Got mysql server has gone away', exc_info=True) raise sa_exc.DisconnectionError("Database server went away") - elif _in_any(six.text_type(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS): + elif _in_any(str(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS): LOG.warning('Got postgres server has gone away', exc_info=True) raise sa_exc.DisconnectionError("Database server went away") else: @@ -285,13 +284,13 @@ class SQLAlchemyBackend(base.Backend): txn_isolation_levels = conf.pop('isolation_levels', DEFAULT_TXN_ISOLATION_LEVELS) level_applied = False - for (driver, level) in six.iteritems(txn_isolation_levels): + for (driver, level) in txn_isolation_levels.items(): if driver == e_url.drivername: engine_args['isolation_level'] = level level_applied = True break if not level_applied: - for (driver, level) in six.iteritems(txn_isolation_levels): + for (driver, level) in txn_isolation_levels.items(): if e_url.drivername.find(driver) != -1: engine_args['isolation_level'] = level break @@ -362,7 +361,7 @@ class Connection(base.Connection): def _retry_on_exception(exc): LOG.warning("Engine connection (validate) failed due to '%s'", exc) if isinstance(exc, sa_exc.OperationalError) and \ - _is_db_connection_error(six.text_type(exc.args[0])): + _is_db_connection_error(str(exc.args[0])): # We may be able to fix this by retrying... return True if isinstance(exc, (sa_exc.TimeoutError, diff --git a/taskflow/persistence/base.py b/taskflow/persistence/base.py index 7f08c9253..dc041f7fc 100644 --- a/taskflow/persistence/base.py +++ b/taskflow/persistence/base.py @@ -16,13 +16,10 @@ import abc -import six - from taskflow.persistence import models -@six.add_metaclass(abc.ABCMeta) -class Backend(object): +class Backend(object, metaclass=abc.ABCMeta): """Base class for persistence backends.""" def __init__(self, conf): @@ -42,8 +39,7 @@ class Backend(object): """Closes any resources this backend has open.""" -@six.add_metaclass(abc.ABCMeta) -class Connection(object): +class Connection(object, metaclass=abc.ABCMeta): """Base class for backend connections.""" @abc.abstractproperty diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index 0c3385a80..8d3235d0d 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -21,7 +21,6 @@ import os from oslo_utils import timeutils from oslo_utils import uuidutils -import six from taskflow import exceptions as exc from taskflow import states @@ -259,7 +258,7 @@ class LogBook(object): return self._name def __iter__(self): - for fd in six.itervalues(self._flowdetails_by_id): + for fd in self._flowdetails_by_id.values(): yield fd def __len__(self): @@ -464,15 +463,14 @@ class FlowDetail(object): return self._name def __iter__(self): - for ad in six.itervalues(self._atomdetails_by_id): + for ad in self._atomdetails_by_id.values(): yield ad def __len__(self): return len(self._atomdetails_by_id) -@six.add_metaclass(abc.ABCMeta) -class AtomDetail(object): +class AtomDetail(object, metaclass=abc.ABCMeta): """A collection of atom specific runtime information and metadata. This is a base **abstract** class that contains attributes that are used @@ -887,7 +885,7 @@ class RetryDetail(AtomDetail): # contain tracebacks, which are not copyable. for (data, failures) in self.results: copied_failures = {} - for (key, failure) in six.iteritems(failures): + for (key, failure) in failures.items(): copied_failures[key] = failure results.append((data, copied_failures)) clone.results = results @@ -980,7 +978,7 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, data) in six.iteritems(failures): + for (key, data) in failures.items(): new_failures[key] = ft.Failure.from_dict(data) new_results.append((data, new_failures)) return new_results @@ -998,7 +996,7 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, failure) in six.iteritems(failures): + for (key, failure) in failures.items(): new_failures[key] = failure.to_dict() new_results.append((data, new_failures)) return new_results @@ -1041,7 +1039,7 @@ class RetryDetail(AtomDetail): # contain tracebacks, which are not copyable. for (data, failures) in other.results: copied_failures = {} - for (key, failure) in six.iteritems(failures): + for (key, failure) in failures.items(): if deep_copy: copied_failures[key] = failure.copy() else: @@ -1056,8 +1054,8 @@ _DETAIL_TO_NAME = { TaskDetail: 'TASK_DETAIL', } _NAME_TO_DETAIL = dict((name, cls) - for (cls, name) in six.iteritems(_DETAIL_TO_NAME)) -ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL)) + for (cls, name) in _DETAIL_TO_NAME.items()) +ATOM_TYPES = list(_NAME_TO_DETAIL.keys()) def atom_detail_class(atom_type): diff --git a/taskflow/persistence/path_based.py b/taskflow/persistence/path_based.py index f2d411b2a..6fa3c05eb 100644 --- a/taskflow/persistence/path_based.py +++ b/taskflow/persistence/path_based.py @@ -15,15 +15,13 @@ # under the License. import abc -import six from taskflow import exceptions as exc from taskflow.persistence import base from taskflow.persistence import models -@six.add_metaclass(abc.ABCMeta) -class PathBasedBackend(base.Backend): +class PathBasedBackend(base.Backend, metaclass=abc.ABCMeta): """Base class for persistence backends that address data by path Subclasses of this backend write logbooks, flow details, and atom details @@ -48,8 +46,7 @@ class PathBasedBackend(base.Backend): return self._path -@six.add_metaclass(abc.ABCMeta) -class PathBasedConnection(base.Connection): +class PathBasedConnection(base.Connection, metaclass=abc.ABCMeta): """Base class for path based backend connections.""" def __init__(self, backend): diff --git a/taskflow/retry.py b/taskflow/retry.py index c0198153a..4935bbfaa 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -18,7 +18,6 @@ import abc import enum -import six from taskflow import atom from taskflow import exceptions as exc @@ -100,7 +99,7 @@ class History(object): self._contents[index], ] for (provided, outcomes) in contents: - for (owner, outcome) in six.iteritems(outcomes): + for (owner, outcome) in outcomes.items(): yield (owner, outcome) def __len__(self): @@ -136,8 +135,7 @@ class History(object): return iter(self._contents) -@six.add_metaclass(abc.ABCMeta) -class Retry(atom.Atom): +class Retry(atom.Atom, metaclass=abc.ABCMeta): """A class that can decide how to resolve execution failures. This abstract base class is used to inherit from and provide different diff --git a/taskflow/storage.py b/taskflow/storage.py index 7ddd7efe4..b41b5dfc0 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -20,7 +20,6 @@ import functools import fasteners from oslo_utils import reflection from oslo_utils import uuidutils -import six import tenacity from taskflow import exceptions @@ -335,7 +334,7 @@ class Storage(object): except exceptions.NotFound: pass else: - names_iter = six.iterkeys(source.results) + names_iter = source.results.keys() self._set_result_mapping(source.name, dict((name, name) for name in names_iter)) @@ -628,7 +627,7 @@ class Storage(object): result_mapping = self._result_mappings.get(atom_name) if not result_mapping: return - for name, index in six.iteritems(result_mapping): + for name, index in result_mapping.items(): try: _item_from(container, index) except _EXTRACTION_EXCEPTIONS: @@ -731,7 +730,7 @@ class Storage(object): @fasteners.read_locked def _get_failures(self, fail_cache_key): failures = {} - for atom_name, fail_cache in six.iteritems(self._failures): + for atom_name, fail_cache in self._failures.items(): try: failures[atom_name] = fail_cache[fail_cache_key] except KeyError: @@ -771,7 +770,7 @@ class Storage(object): @fasteners.read_locked def has_failures(self): """Returns true if there are **any** failures in storage.""" - for fail_cache in six.itervalues(self._failures): + for fail_cache in self._failures.values(): if fail_cache: return True return False @@ -898,11 +897,11 @@ class Storage(object): clone.results.update(pairs) result = self._with_connection(self._save_atom_detail, source, clone) - return (self.injector_name, six.iterkeys(result.results)) + return (self.injector_name, result.results.keys()) def save_transient(): self._transients.update(pairs) - return (_TRANSIENT_PROVIDER, six.iterkeys(self._transients)) + return (_TRANSIENT_PROVIDER, self._transients.keys()) if transient: provider_name, names = save_transient() @@ -937,7 +936,7 @@ class Storage(object): if mapping: provider_mapping.update(mapping) # Ensure the reverse mapping/index is updated (for faster lookups). - for name, index in six.iteritems(provider_mapping): + for name, index in provider_mapping.items(): entries = self._reverse_mapping.setdefault(name, []) provider = _Provider(provider_name, index) if provider not in entries: @@ -1002,13 +1001,13 @@ class Storage(object): self._injected_args.get(atom_name, {}), source.meta.get(META_INJECTED, {}), ] - missing = set(six.iterkeys(args_mapping)) + missing = set(args_mapping.keys()) locator = _ProviderLocator( self._transients, self._fetch_providers, lambda atom_name: self._get(atom_name, 'last_results', 'failure', _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)) - for (bound_name, name) in six.iteritems(args_mapping): + for (bound_name, name) in args_mapping.items(): if LOG.isEnabledFor(logging.TRACE): LOG.trace("Looking for %r <= %r for atom '%s'", bound_name, name, atom_name) @@ -1041,7 +1040,7 @@ class Storage(object): if many_handler is None: many_handler = _many_handler results = {} - for name in six.iterkeys(self._reverse_mapping): + for name in self._reverse_mapping.keys(): try: results[name] = self.fetch(name, many_handler=many_handler) except exceptions.NotFound: @@ -1079,7 +1078,7 @@ class Storage(object): self._get(atom_name, 'last_results', 'failure', _EXECUTE_STATES_WITH_RESULTS, states.EXECUTE) mapped_args = {} - for (bound_name, name) in six.iteritems(args_mapping): + for (bound_name, name) in args_mapping.items(): if LOG.isEnabledFor(logging.TRACE): if atom_name: LOG.trace("Looking for %r <= %r for atom '%s'", diff --git a/taskflow/task.py b/taskflow/task.py index 3ff282d96..fea62a11e 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -18,11 +18,9 @@ import abc import copy +import functools from oslo_utils import reflection -import six -from six.moves import map as compat_map -from six.moves import reduce as compat_reduce from taskflow import atom from taskflow import logging @@ -43,8 +41,7 @@ REVERT_FLOW_FAILURES = 'flow_failures' EVENT_UPDATE_PROGRESS = 'update_progress' -@six.add_metaclass(abc.ABCMeta) -class Task(atom.Atom): +class Task(atom.Atom, metaclass=abc.ABCMeta): """An abstraction that defines a potential piece of work. This potential piece of work is expected to be able to contain @@ -125,11 +122,11 @@ class FunctorTask(Task): def __init__(self, execute, name=None, provides=None, requires=None, auto_extract=True, rebind=None, revert=None, version=None, inject=None): - if not six.callable(execute): + if not callable(execute): raise ValueError("Function to use for executing must be" " callable") if revert is not None: - if not six.callable(revert): + if not callable(revert): raise ValueError("Function to use for reverting must" " be callable") if name is None: @@ -175,7 +172,7 @@ class ReduceFunctorTask(Task): def __init__(self, functor, requires, name=None, provides=None, auto_extract=True, rebind=None, inject=None): - if not six.callable(functor): + if not callable(functor): raise ValueError("Function to use for reduce must be callable") f_args = reflection.get_callable_args(functor) @@ -204,7 +201,7 @@ class ReduceFunctorTask(Task): def execute(self, *args, **kwargs): l = [kwargs[r] for r in self.requires] - return compat_reduce(self._functor, l) + return functools.reduce(self._functor, l) class MapFunctorTask(Task): @@ -224,7 +221,7 @@ class MapFunctorTask(Task): def __init__(self, functor, requires, name=None, provides=None, auto_extract=True, rebind=None, inject=None): - if not six.callable(functor): + if not callable(functor): raise ValueError("Function to use for map must be callable") f_args = reflection.get_callable_args(functor) @@ -247,4 +244,4 @@ class MapFunctorTask(Task): def execute(self, *args, **kwargs): l = [kwargs[r] for r in self.requires] - return list(compat_map(self._functor, l)) + return list(map(self._functor, l)) diff --git a/taskflow/test.py b/taskflow/test.py index a5d880a85..8cd8e9890 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -20,7 +20,6 @@ from unittest import mock import fixtures from oslotest import base -import six from testtools import compat from testtools import matchers @@ -105,7 +104,7 @@ class TestCase(base.BaseTestCase): # Testtools seems to want equals objects instead of just keys? compare_dict = {} - for k in list(six.iterkeys(expected)): + for k in list(expected.keys()): if not isinstance(expected[k], matchers.Equals): compare_dict[k] = matchers.Equals(expected[k]) else: diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py index 62142909c..bcb09e146 100644 --- a/taskflow/tests/test_examples.py +++ b/taskflow/tests/test_examples.py @@ -34,8 +34,6 @@ import re import subprocess import sys -import six - from taskflow import test ROOT_DIR = os.path.abspath( @@ -118,8 +116,7 @@ class ExampleAdderMeta(type): return type.__new__(cls, name, parents, dct) -@six.add_metaclass(ExampleAdderMeta) -class ExamplesTestCase(test.TestCase): +class ExamplesTestCase(test.TestCase, metaclass=ExampleAdderMeta): """Runs the examples, and checks the outputs against expected outputs.""" def _check_example(self, name): diff --git a/taskflow/tests/unit/action_engine/test_builder.py b/taskflow/tests/unit/action_engine/test_builder.py index 1bb79b838..72b83ec6d 100644 --- a/taskflow/tests/unit/action_engine/test_builder.py +++ b/taskflow/tests/unit/action_engine/test_builder.py @@ -16,7 +16,6 @@ from automaton import exceptions as excp from automaton import runners -import six from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler @@ -70,30 +69,30 @@ class BuildersTest(test.TestCase): flow, initial_state=st.RUNNING) it = machine_runner.run_iter(builder.START) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(st.RESUMING, new_state) self.assertEqual(0, len(memory.failures)) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(st.SCHEDULING, new_state) self.assertEqual(0, len(memory.failures)) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(st.WAITING, new_state) self.assertEqual(0, len(memory.failures)) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(st.ANALYZING, new_state) self.assertEqual(0, len(memory.failures)) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(builder.GAME_OVER, new_state) self.assertEqual(0, len(memory.failures)) - prior_state, new_state = six.next(it) + prior_state, new_state = next(it) self.assertEqual(st.SUCCESS, new_state) self.assertEqual(0, len(memory.failures)) - self.assertRaises(StopIteration, six.next, it) + self.assertRaises(StopIteration, next, it) def test_run_iterations_reverted(self): flow = lf.Flow("root") diff --git a/taskflow/tests/unit/jobs/test_redis_job.py b/taskflow/tests/unit/jobs/test_redis_job.py index 9e6bdc7d7..320827c8b 100644 --- a/taskflow/tests/unit/jobs/test_redis_job.py +++ b/taskflow/tests/unit/jobs/test_redis_job.py @@ -18,7 +18,6 @@ import time from unittest import mock from oslo_utils import uuidutils -import six import testtools from taskflow import exceptions as excp @@ -44,7 +43,7 @@ class RedisJobboardTest(test.TestCase, base.BoardTestMixin): namespace = uuidutils.generate_uuid() client = ru.RedisClient() config = { - 'namespace': six.b("taskflow-%s" % namespace), + 'namespace': ("taskflow-%s" % namespace).encode('latin-1'), } kwargs = { 'client': client, diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 527f253ed..d93f148f8 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -21,7 +21,6 @@ from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import uuidutils -import six import testtools from zake import fake_client from zake import utils as zake_utils @@ -171,7 +170,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Forcefully delete the owner from the backend storage to make # sure the job becomes unclaimed (this may happen if some admin # manually deletes the lock). - paths = list(six.iteritems(self.client.storage.paths)) + paths = list(self.client.storage.paths.items()) for (path, value) in paths: if path in self.bad_paths: continue @@ -192,7 +191,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Forcefully delete the lock from the backend storage to make # sure the job becomes unclaimed (this may happen if some admin # manually deletes the lock). - paths = list(six.iteritems(self.client.storage.paths)) + paths = list(self.client.storage.paths.items()) for (path, value) in paths: if path in self.bad_paths: continue @@ -215,7 +214,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): trashed = [] jobs = [] - paths = list(six.iteritems(self.client.storage.paths)) + paths = list(self.client.storage.paths.items()) for (path, value) in paths: if path in self.bad_paths: continue @@ -244,14 +243,14 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Remove paths that got created due to the running process that we are # not interested in... paths = {} - for (path, data) in six.iteritems(self.client.storage.paths): + for (path, data) in self.client.storage.paths.items(): if path in self.bad_paths: continue paths[path] = data # Check the actual data that was posted. self.assertEqual(1, len(paths)) - path_key = list(six.iterkeys(paths))[0] + path_key = list(paths.keys())[0] self.assertTrue(len(paths[path_key]['data']) > 0) self.assertDictEqual({ 'uuid': posted_job.uuid, diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py index 973bf9f72..5a1917c47 100644 --- a/taskflow/tests/unit/persistence/test_sql_persistence.py +++ b/taskflow/tests/unit/persistence/test_sql_persistence.py @@ -20,7 +20,6 @@ import os import random import tempfile -import six import testtools @@ -123,8 +122,8 @@ class SqlitePersistenceTest(test.TestCase, base.PersistenceTestMixin): self.db_location = None -@six.add_metaclass(abc.ABCMeta) -class BackendPersistenceTestMixin(base.PersistenceTestMixin): +class BackendPersistenceTestMixin(base.PersistenceTestMixin, + metaclass=abc.ABCMeta): """Specifies a backend type and does required setup and teardown.""" def _get_connection(self): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 81f0e0842..445442948 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -20,7 +20,6 @@ import functools import threading import futurist -import six import testtools import taskflow.engines @@ -350,7 +349,7 @@ class EngineLinearFlowTest(utils.EngineTestBase): engine_it = engine.run_iter() while True: try: - engine_state = six.next(engine_it) + engine_state = next(engine_it) if engine_state not in engine_states: engine_states[engine_state] = 1 else: @@ -1318,7 +1317,7 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase): def test_graph_flow_conditional_history(self): def even_odd_decider(history, allowed): - total = sum(six.itervalues(history)) + total = sum(history.values()) if total == allowed: return True return False diff --git a/taskflow/tests/unit/test_exceptions.py b/taskflow/tests/unit/test_exceptions.py index c542ae830..4c20f7c6e 100644 --- a/taskflow/tests/unit/test_exceptions.py +++ b/taskflow/tests/unit/test_exceptions.py @@ -16,9 +16,6 @@ import string -import six -import testtools - from taskflow import exceptions as exc from taskflow import test @@ -109,7 +106,6 @@ class TestExceptions(test.TestCase): ex = exc.TaskFlowException("Broken") self.assertRaises(ValueError, ex.pformat, indent=-100) - @testtools.skipIf(not six.PY3, 'py3.x is not available') def test_raise_with_cause(self): capture = None try: diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py index 217f1b308..1c25ac7fa 100644 --- a/taskflow/tests/unit/test_failure.py +++ b/taskflow/tests/unit/test_failure.py @@ -14,12 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import pickle import sys from oslo_utils import encodeutils -import six -from six.moves import cPickle as pickle -import testtools from taskflow import exceptions from taskflow import test @@ -338,23 +336,16 @@ class NonAsciiExceptionsTestCase(test.TestCase): fail = failure.Failure.from_exception(excp) self.assertEqual(encodeutils.exception_to_unicode(excp), fail.exception_str) - # This is slightly different on py2 vs py3... due to how - # __str__ or __unicode__ is called and what is expected from - # both... - if six.PY2: - msg = encodeutils.exception_to_unicode(excp) - expected = 'Failure: ValueError: %s' % msg.encode('utf-8') - else: - expected = u'Failure: ValueError: \xc8' + expected = u'Failure: ValueError: \xc8' self.assertEqual(expected, str(fail)) def test_exception_non_ascii_unicode(self): hi_ru = u'привет' fail = failure.Failure.from_exception(ValueError(hi_ru)) self.assertEqual(hi_ru, fail.exception_str) - self.assertIsInstance(fail.exception_str, six.text_type) + self.assertIsInstance(fail.exception_str, str) self.assertEqual(u'Failure: ValueError: %s' % hi_ru, - six.text_type(fail)) + str(fail)) def test_wrapped_failure_non_ascii_unicode(self): hi_cn = u'嗨' @@ -364,7 +355,7 @@ class NonAsciiExceptionsTestCase(test.TestCase): wrapped_fail = exceptions.WrappedFailure([fail]) expected_result = (u"WrappedFailure: " "[Failure: ValueError: %s]" % (hi_cn)) - self.assertEqual(expected_result, six.text_type(wrapped_fail)) + self.assertEqual(expected_result, str(wrapped_fail)) def test_failure_equality_with_non_ascii_str(self): bad_string = chr(200) @@ -379,7 +370,6 @@ class NonAsciiExceptionsTestCase(test.TestCase): self.assertEqual(fail, copied) -@testtools.skipIf(not six.PY3, 'this test only works on python 3.x') class FailureCausesTest(test.TestCase): @classmethod @@ -392,7 +382,7 @@ class FailureCausesTest(test.TestCase): cls._raise_many(messages) raise e except RuntimeError as e1: - six.raise_from(e, e1) + raise e from e1 def test_causes(self): f = None @@ -467,7 +457,7 @@ class FailureCausesTest(test.TestCase): self._raise_many(["Still still not working", "Still not working", "Not working"]) except RuntimeError as e: - six.raise_from(e, None) + raise e from None except RuntimeError: f = failure.Failure() diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 3e2f8806e..b14f7f25d 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -21,7 +21,6 @@ import time from oslo_serialization import jsonutils from oslo_utils import reflection -import six from zake import fake_client import taskflow.engines @@ -111,7 +110,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): children = self.client.storage.get_children("/taskflow", only_direct=False) removed = 0 - for p, data in six.iteritems(children): + for p, data in children.items(): if p.endswith(".lock"): self.client.storage.pop(p) removed += 1 @@ -121,7 +120,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): children = self.client.storage.get_children("/taskflow", only_direct=False) altered = 0 - for p, data in six.iteritems(children): + for p, data in children.items(): if p.endswith(".lock"): self.client.set(p, misc.binary_encode( jsonutils.dumps({'owner': new_owner}))) diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index fd97cd666..eca26cd34 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -15,7 +15,7 @@ # under the License. import networkx as nx -from six.moves import cPickle as pickle +import pickle from taskflow import test from taskflow.types import graph diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 72bddd289..168e517ed 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -20,7 +20,6 @@ import random import string import time -import six import testscenarios from taskflow import test @@ -241,7 +240,7 @@ class TestCountdownIter(test.TestCase): def test_invalid_decr(self): it = misc.countdown_iter(10, -1) - self.assertRaises(ValueError, six.next, it) + self.assertRaises(ValueError, next, it) class TestMergeUri(test.TestCase): diff --git a/taskflow/tests/unit/test_utils_binary.py b/taskflow/tests/unit/test_utils_binary.py index 773f38934..0ba930f7d 100644 --- a/taskflow/tests/unit/test_utils_binary.py +++ b/taskflow/tests/unit/test_utils_binary.py @@ -14,24 +14,19 @@ # License for the specific language governing permissions and limitations # under the License. -import six - from taskflow import test from taskflow.utils import misc def _bytes(data): - if six.PY3: - return data.encode(encoding='utf-8') - else: - return data + return data.encode(encoding='utf-8') class BinaryEncodeTest(test.TestCase): def _check(self, data, expected_result): result = misc.binary_encode(data) - self.assertIsInstance(result, six.binary_type) + self.assertIsInstance(result, bytes) self.assertEqual(expected_result, result) def test_simple_binary(self): @@ -50,7 +45,7 @@ class BinaryEncodeTest(test.TestCase): def test_unicode_other_encoding(self): result = misc.binary_encode(u'mañana', 'latin-1') - self.assertIsInstance(result, six.binary_type) + self.assertIsInstance(result, bytes) self.assertEqual(u'mañana'.encode('latin-1'), result) @@ -58,7 +53,7 @@ class BinaryDecodeTest(test.TestCase): def _check(self, data, expected_result): result = misc.binary_decode(data) - self.assertIsInstance(result, six.text_type) + self.assertIsInstance(result, str) self.assertEqual(expected_result, result) def test_simple_text(self): @@ -78,7 +73,7 @@ class BinaryDecodeTest(test.TestCase): def test_unicode_other_encoding(self): data = u'mañana'.encode('latin-1') result = misc.binary_decode(data, 'latin-1') - self.assertIsInstance(result, six.text_type) + self.assertIsInstance(result, str) self.assertEqual(u'mañana', result) @@ -94,7 +89,7 @@ class DecodeJsonTest(test.TestCase): def test_handles_invalid_unicode(self): self.assertRaises(ValueError, misc.decode_json, - six.b('{"\xf1": 1}')) + '{"\xf1": 1}'.encode('latin-1')) def test_handles_bad_json(self): self.assertRaises(ValueError, misc.decode_json, diff --git a/taskflow/tests/unit/test_utils_iter_utils.py b/taskflow/tests/unit/test_utils_iter_utils.py index 4a5ff4b9a..c4109d893 100644 --- a/taskflow/tests/unit/test_utils_iter_utils.py +++ b/taskflow/tests/unit/test_utils_iter_utils.py @@ -16,9 +16,6 @@ import string -import six -from six.moves import range as compat_range - from taskflow import test from taskflow.utils import iter_utils @@ -46,22 +43,22 @@ class IterUtilsTest(test.TestCase): def test_generate_delays(self): it = iter_utils.generate_delays(1, 60) - self.assertEqual(1, six.next(it)) - self.assertEqual(2, six.next(it)) - self.assertEqual(4, six.next(it)) - self.assertEqual(8, six.next(it)) - self.assertEqual(16, six.next(it)) - self.assertEqual(32, six.next(it)) - self.assertEqual(60, six.next(it)) - self.assertEqual(60, six.next(it)) + self.assertEqual(1, next(it)) + self.assertEqual(2, next(it)) + self.assertEqual(4, next(it)) + self.assertEqual(8, next(it)) + self.assertEqual(16, next(it)) + self.assertEqual(32, next(it)) + self.assertEqual(60, next(it)) + self.assertEqual(60, next(it)) def test_generate_delays_custom_multiplier(self): it = iter_utils.generate_delays(1, 60, multiplier=4) - self.assertEqual(1, six.next(it)) - self.assertEqual(4, six.next(it)) - self.assertEqual(16, six.next(it)) - self.assertEqual(60, six.next(it)) - self.assertEqual(60, six.next(it)) + self.assertEqual(1, next(it)) + self.assertEqual(4, next(it)) + self.assertEqual(16, next(it)) + self.assertEqual(60, next(it)) + self.assertEqual(60, next(it)) def test_generate_delays_bad(self): self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1) @@ -99,7 +96,7 @@ class IterUtilsTest(test.TestCase): self.assertRaises(ValueError, iter_utils.fill, 2, 2) def test_fill_many_empty(self): - result = list(iter_utils.fill(compat_range(0, 50), 500)) + result = list(iter_utils.fill(range(0, 50), 500)) self.assertEqual(450, sum(1 for x in result if x is None)) self.assertEqual(50, sum(1 for x in result if x is not None)) @@ -134,10 +131,10 @@ class IterUtilsTest(test.TestCase): def test_count(self): self.assertEqual(0, iter_utils.count([])) self.assertEqual(1, iter_utils.count(['a'])) - self.assertEqual(10, iter_utils.count(compat_range(0, 10))) - self.assertEqual(1000, iter_utils.count(compat_range(0, 1000))) - self.assertEqual(0, iter_utils.count(compat_range(0))) - self.assertEqual(0, iter_utils.count(compat_range(-1))) + self.assertEqual(10, iter_utils.count(range(0, 10))) + self.assertEqual(1000, iter_utils.count(range(0, 1000))) + self.assertEqual(0, iter_utils.count(range(0))) + self.assertEqual(0, iter_utils.count(range(-1))) def test_bad_while_is_not(self): self.assertRaises(ValueError, iter_utils.while_is_not, 2, 'a') diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 5380204fa..2261628e1 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import six - from taskflow.engines.worker_based import endpoint as ep from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import server @@ -143,7 +141,7 @@ class TestServer(test.MockTestCase): (self.task.name, self.task.name, 'revert', dict(arguments=self.task_args, failures=dict((i, utils.FailureMatcher(f)) - for i, f in six.iteritems(failures)))), + for i, f in failures.items()))), (task_cls, task_name, action, task_args)) @mock.patch("taskflow.engines.worker_based.server.LOG.critical") diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 0e49a562b..3b068a0b0 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -14,8 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import io from oslo_utils import reflection -import six from taskflow.engines.worker_based import endpoint from taskflow.engines.worker_based import worker @@ -66,7 +66,7 @@ class TestWorker(test.MockTestCase): self.assertEqual(master_mock_calls, self.master_mock.mock_calls) def test_banner_writing(self): - buf = six.StringIO() + buf = io.StringIO() w = self.worker() w.run(banner_writer=buf.write) w.wait() diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 58cd9ab7e..6c2f13a77 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -21,7 +21,6 @@ import time from oslo_utils import timeutils import redis -import six from taskflow import exceptions from taskflow.listeners import capturing @@ -137,7 +136,7 @@ class GiveBackRevert(task.Task): result = kwargs.get('result') # If this somehow fails, timeout, or other don't send back a # valid result... - if isinstance(result, six.integer_types): + if isinstance(result, int): return result + 1 @@ -153,12 +152,8 @@ class LongArgNameTask(task.Task): return long_arg_name -if six.PY3: - RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception', - 'BaseException', 'object'] -else: - RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception', - 'BaseException', 'object'] +RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception', 'BaseException', + 'object'] class ProvidesRequiresTask(task.Task): @@ -410,11 +405,11 @@ class WaitForOneFromTask(ProgressingTask): def __init__(self, name, wait_for, wait_states, **kwargs): super(WaitForOneFromTask, self).__init__(name, **kwargs) - if isinstance(wait_for, six.string_types): + if isinstance(wait_for, str): self.wait_for = [wait_for] else: self.wait_for = wait_for - if isinstance(wait_states, six.string_types): + if isinstance(wait_states, str): self.wait_states = [wait_states] else: self.wait_states = wait_states diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index fc35bc90d..d5e9df130 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -16,17 +16,16 @@ import collections import copy +import io import os import sys import traceback from oslo_utils import encodeutils from oslo_utils import reflection -import six from taskflow import exceptions as exc from taskflow.utils import iter_utils -from taskflow.utils import mixins from taskflow.utils import schema_utils as su @@ -68,7 +67,7 @@ def _are_equal_exc_info_tuples(ei1, ei2): return tb1 == tb2 -class Failure(mixins.StrMixin): +class Failure(): """An immutable object that represents failure. Failure objects encapsulate exception information so that they can be @@ -210,7 +209,7 @@ class Failure(mixins.StrMixin): if kwargs: raise TypeError( 'Failure.__init__ got unexpected keyword argument(s): %s' - % ', '.join(six.iterkeys(kwargs))) + % ', '.join(kwargs.keys())) @classmethod def from_exception(cls, exception): @@ -343,7 +342,12 @@ class Failure(mixins.StrMixin): def reraise(self): """Re-raise captured exception.""" if self._exc_info: - six.reraise(*self._exc_info) + tp, value, tb = self._exc_info + if value is None: + value = tp() + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value else: raise exc.WrappedFailure([self]) @@ -424,12 +428,12 @@ class Failure(mixins.StrMixin): self._causes = tuple(self._extract_causes_iter(self.exception)) return self._causes - def __unicode__(self): + def __str__(self): return self.pformat() def pformat(self, traceback=False): """Pretty formats the failure object into a string.""" - buf = six.StringIO() + buf = io.StringIO() if not self._exc_type_names: buf.write('Failure: %s' % (self._exception_str)) else: diff --git a/taskflow/types/graph.py b/taskflow/types/graph.py index ad518e92d..5d6912797 100644 --- a/taskflow/types/graph.py +++ b/taskflow/types/graph.py @@ -15,11 +15,11 @@ # under the License. import collections +import io import os import networkx as nx from networkx.drawing import nx_pydot -import six def _common_format(g, edge_notation): @@ -128,7 +128,7 @@ class DiGraph(nx.DiGraph): cycles = list(nx.cycles.recursive_simple_cycles(self)) lines.append("Cycles: %s" % len(cycles)) for cycle in cycles: - buf = six.StringIO() + buf = io.StringIO() buf.write("%s" % (cycle[0])) for i in range(1, len(cycle)): buf.write(" --> %s" % (cycle[i])) @@ -251,7 +251,7 @@ def merge_graphs(graph, *graphs, **kwargs): tmp_graph = graph allow_overlaps = kwargs.get('allow_overlaps', False) overlap_detector = kwargs.get('overlap_detector') - if overlap_detector is not None and not six.callable(overlap_detector): + if overlap_detector is not None and not callable(overlap_detector): raise ValueError("Overlap detection callback expected to be callable") elif overlap_detector is None: overlap_detector = (lambda to_graph, from_graph: diff --git a/taskflow/types/notifier.py b/taskflow/types/notifier.py index 7c6af371e..f10637e75 100644 --- a/taskflow/types/notifier.py +++ b/taskflow/types/notifier.py @@ -20,7 +20,6 @@ import copy import logging from oslo_utils import reflection -import six LOG = logging.getLogger(__name__) @@ -164,7 +163,7 @@ class Notifier(object): :rtype: number """ count = 0 - for (_event_type, listeners) in six.iteritems(self._topics): + for (_event_type, listeners) in self._topics.items(): count += len(listeners) return count @@ -235,10 +234,10 @@ class Notifier(object): :param kwargs: key-value pair arguments :type kwargs: dictionary """ - if not six.callable(callback): + if not callable(callback): raise ValueError("Event callback must be callable") if details_filter is not None: - if not six.callable(details_filter): + if not callable(details_filter): raise ValueError("Details filter must be callable") if not self.can_be_registered(event_type): raise ValueError("Disallowed event type '%s' can not have a" @@ -280,7 +279,7 @@ class Notifier(object): def copy(self): c = copy.copy(self) c._topics = collections.defaultdict(list) - for (event_type, listeners) in six.iteritems(self._topics): + for (event_type, listeners) in self._topics.items(): c._topics[event_type] = listeners[:] return c @@ -292,7 +291,7 @@ class Notifier(object): itself wraps a provided callback (and its details filter callback, if any). """ - for event_type, listeners in six.iteritems(self._topics): + for event_type, listeners in self._topics.items(): if listeners: yield (event_type, listeners) diff --git a/taskflow/types/sets.py b/taskflow/types/sets.py index 1a33ed32d..805f54b27 100644 --- a/taskflow/types/sets.py +++ b/taskflow/types/sets.py @@ -18,8 +18,6 @@ import collections from collections import abc import itertools -import six - # Used for values that don't matter in sets backed by dicts... _sentinel = object() @@ -59,7 +57,7 @@ class OrderedSet(abc.Set, abc.Hashable): return len(self._data) def __iter__(self): - for value in six.iterkeys(self._data): + for value in self._data.keys(): yield value def __setstate__(self, items): diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index a160f6e56..cdab1cd67 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -16,8 +16,6 @@ import threading -import six - class Timeout(object): """An object which represents a timeout. @@ -62,7 +60,7 @@ def convert_to_timeout(value=None, default_value=None, """ if value is None: value = default_value - if isinstance(value, (int, float) + six.string_types): + if isinstance(value, (int, float, str)): return Timeout(float(value), event_factory=event_factory) elif isinstance(value, Timeout): return value diff --git a/taskflow/types/tree.py b/taskflow/types/tree.py index 3681694b5..2aef2a102 100644 --- a/taskflow/types/tree.py +++ b/taskflow/types/tree.py @@ -15,11 +15,10 @@ # under the License. import collections +import io import itertools import os -import six - from taskflow.types import graph from taskflow.utils import iter_utils from taskflow.utils import misc @@ -279,9 +278,9 @@ class Node(object): """ if stringify_node is None: # Default to making a unicode string out of the nodes item... - stringify_node = lambda node: six.text_type(node.item) + stringify_node = lambda node: str(node.item) expected_lines = self.child_count(only_direct=False) + 1 - buff = six.StringIO() + buff = io.StringIO() conn = vertical_conn + horizontal_conn stop_at_parent = self for i, node in enumerate(self.dfs_iter(include_self=True), 1): diff --git a/taskflow/utils/banner.py b/taskflow/utils/banner.py index a40eea611..0c2adb39e 100644 --- a/taskflow/utils/banner.py +++ b/taskflow/utils/banner.py @@ -17,8 +17,6 @@ import os import string -import six - from taskflow.utils import misc from taskflow import version @@ -62,7 +60,7 @@ def make_banner(what, chapters): buf.write_nl(BANNER_HEADER) if chapters: buf.write_nl("*%s*" % what) - chapter_names = sorted(six.iterkeys(chapters)) + chapter_names = sorted(chapters.keys()) else: buf.write("*%s*" % what) chapter_names = [] @@ -73,7 +71,7 @@ def make_banner(what, chapters): else: buf.write("%s:" % (chapter_name)) if isinstance(chapter_contents, dict): - section_names = sorted(six.iterkeys(chapter_contents)) + section_names = sorted(chapter_contents.keys()) for j, section_name in enumerate(section_names): if j + 1 < len(section_names): buf.write_nl(" %s => %s" diff --git a/taskflow/utils/iter_utils.py b/taskflow/utils/iter_utils.py index 8591d2357..ebf410760 100644 --- a/taskflow/utils/iter_utils.py +++ b/taskflow/utils/iter_utils.py @@ -15,15 +15,13 @@ # under the License. from collections import abc +import functools import itertools -import six -from six.moves import range as compat_range - def _ensure_iterable(func): - @six.wraps(func) + @functools.wraps(func) def wrapper(it, *args, **kwargs): if not isinstance(it, abc.Iterable): raise ValueError("Iterable expected, but '%s' is not" @@ -147,5 +145,5 @@ def iter_forever(limit): while True: yield next(i) else: - for i in compat_range(0, limit): + for i in range(0, limit): yield i diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index 2d856bd3f..505c1019b 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -17,8 +17,6 @@ from kazoo import client from kazoo import exceptions as k_exc from oslo_utils import reflection -import six -from six.moves import zip as compat_zip from taskflow import exceptions as exc from taskflow import logging @@ -28,11 +26,11 @@ LOG = logging.getLogger(__name__) def _parse_hosts(hosts): - if isinstance(hosts, six.string_types): + if isinstance(hosts, str): return hosts.strip() if isinstance(hosts, (dict)): host_ports = [] - for (k, v) in six.iteritems(hosts): + for (k, v) in hosts.items(): host_ports.append("%s:%s" % (k, v)) hosts = host_ports if isinstance(hosts, (list, set, tuple)): @@ -89,7 +87,7 @@ def checked_commit(txn): return [] results = txn.commit() failures = [] - for op, result in compat_zip(txn.operations, results): + for op, result in zip(txn.operations, results): if isinstance(result, k_exc.KazooException): failures.append((op, result)) if len(results) < len(txn.operations): @@ -211,7 +209,7 @@ def make_client(conf): if 'connection_retry' in conf: client_kwargs['connection_retry'] = conf['connection_retry'] hosts = _parse_hosts(conf.get("hosts", "localhost:2181")) - if not hosts or not isinstance(hosts, six.string_types): + if not hosts or not isinstance(hosts, str): raise TypeError("Invalid hosts format, expected " "non-empty string/list, not '%s' (%s)" % (hosts, type(hosts))) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 65a93e2c4..8c0f90405 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -18,7 +18,9 @@ import collections.abc import contextlib import datetime +import functools import inspect +import io import os import re import socket @@ -33,13 +35,12 @@ from oslo_utils import encodeutils from oslo_utils import importutils from oslo_utils import netutils from oslo_utils import reflection -import six from taskflow.types import failure UNKNOWN_HOSTNAME = "" -NUMERIC_TYPES = six.integer_types + (float,) +NUMERIC_TYPES = (int, float) # NOTE(imelnikov): regular expression to get scheme from URI, # see RFC 3986 section 3.1 @@ -57,7 +58,7 @@ class StrEnum(str, enum.Enum): return super(StrEnum, cls).__new__(cls, *args, **kwargs) -class StringIO(six.StringIO): +class StringIO(io.StringIO): """String buffer with some small additions.""" def write_nl(self, value, linesep=os.linesep): @@ -65,7 +66,7 @@ class StringIO(six.StringIO): self.write(linesep) -class BytesIO(six.BytesIO): +class BytesIO(io.BytesIO): """Byte buffer with some small additions.""" def reset(self): @@ -118,7 +119,7 @@ def countdown_iter(start_at, decr=1): def extract_driver_and_conf(conf, conf_key): """Common function to get a driver name and its configuration.""" - if isinstance(conf, six.string_types): + if isinstance(conf, str): conf = {conf_key: conf} maybe_uri = conf[conf_key] try: @@ -161,7 +162,7 @@ def merge_uri(uri, conf): for (k, v, is_not_empty_value_func) in specials: if is_not_empty_value_func(v): conf.setdefault(k, v) - for (k, v) in six.iteritems(uri.params()): + for (k, v) in uri.params().items(): conf.setdefault(k, v) return conf @@ -182,7 +183,7 @@ def find_subclasses(locations, base_cls, exclude_hidden=True): derived = set() for item in locations: module = None - if isinstance(item, six.string_types): + if isinstance(item, str): try: pkg, cls = item.split(':') except ValueError: @@ -221,7 +222,7 @@ def pick_first_not_none(*values): def parse_uri(uri): """Parses a uri into its components.""" # Do some basic validation before continuing... - if not isinstance(uri, six.string_types): + if not isinstance(uri, str): raise TypeError("Can only parse string types to uri data, " "and not '%s' (%s)" % (uri, type(uri))) match = _SCHEME_REGEX.match(uri) @@ -236,7 +237,7 @@ def disallow_when_frozen(excp_cls): def decorator(f): - @six.wraps(f) + @functools.wraps(f) def wrapper(self, *args, **kwargs): if self.frozen: raise excp_cls() @@ -274,7 +275,7 @@ def binary_encode(text, encoding='utf-8', errors='strict'): Does nothing if data is already a binary string (raises on unknown types). """ - if isinstance(text, six.binary_type): + if isinstance(text, bytes): return text else: return encodeutils.safe_encode(text, encoding=encoding, @@ -286,7 +287,7 @@ def binary_decode(data, encoding='utf-8', errors='strict'): Does nothing if data is already a text string (raises on unknown types). """ - if isinstance(data, six.text_type): + if isinstance(data, str): return data else: return encodeutils.safe_decode(data, incoming=encoding, @@ -426,7 +427,7 @@ def get_version_string(obj): if isinstance(obj_version, (list, tuple)): obj_version = '.'.join(str(item) for item in obj_version) if obj_version is not None and not isinstance(obj_version, - six.string_types): + str): obj_version = str(obj_version) return obj_version @@ -524,7 +525,7 @@ def is_iterable(obj): :param obj: object to be tested for iterable :return: True if object is iterable and is not a string """ - return (not isinstance(obj, six.string_types) and + return (not isinstance(obj, str) and isinstance(obj, collections.abc.Iterable)) diff --git a/taskflow/utils/mixins.py b/taskflow/utils/mixins.py deleted file mode 100644 index 5bb0fa47f..000000000 --- a/taskflow/utils/mixins.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import six - - -class StrMixin(object): - """Mixin that helps deal with the PY2 and PY3 method differences. - - http://lucumr.pocoo.org/2011/1/22/forwards-compatible-python/ explains - why this is quite useful... - """ - - if six.PY2: - def __str__(self): - try: - return self.__bytes__() - except AttributeError: - return self.__unicode__().encode('utf-8') - else: - def __str__(self): - return self.__unicode__() diff --git a/taskflow/utils/redis_utils.py b/taskflow/utils/redis_utils.py index 0d0407382..373ab2dc7 100644 --- a/taskflow/utils/redis_utils.py +++ b/taskflow/utils/redis_utils.py @@ -15,15 +15,15 @@ # under the License. import enum +import functools import redis from redis import exceptions as redis_exceptions -import six def _raise_on_closed(meth): - @six.wraps(meth) + @functools.wraps(meth) def wrapper(self, *args, **kwargs): if self.closed: raise redis_exceptions.ConnectionError("Connection has been" diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index ed5546839..74b4ebbbc 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -14,13 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. +import _thread import collections import multiprocessing import threading -import six -from six.moves import _thread - from taskflow.utils import misc @@ -106,7 +104,7 @@ class ThreadBundle(object): before_join, after_join) for attr_name in builder.fields: cb = getattr(builder, attr_name) - if not six.callable(cb): + if not callable(cb): raise ValueError("Provided callback for argument" " '%s' must be callable" % attr_name) with self._lock: diff --git a/tools/schema_generator.py b/tools/schema_generator.py index 3685a0a13..c84e24b73 100755 --- a/tools/schema_generator.py +++ b/tools/schema_generator.py @@ -17,7 +17,6 @@ import contextlib import re -import six import tabulate from taskflow.persistence.backends import impl_sqlalchemy @@ -37,9 +36,9 @@ SCHEMA_QUERY = "pragma table_info(%s)" def to_bool_string(val): if isinstance(val, (int, bool)): - return six.text_type(bool(val)) - if not isinstance(val, six.string_types): - val = six.text_type(val) + return str(bool(val)) + if not isinstance(val, str): + val = str(val) if val.lower() in ('0', 'false'): return 'False' if val.lower() in ('1', 'true'): diff --git a/tools/speed_test.py b/tools/speed_test.py index f9da37ace..39092e661 100644 --- a/tools/speed_test.py +++ b/tools/speed_test.py @@ -18,11 +18,10 @@ Profile a simple engine build/load/compile/prepare/validate/run. import argparse import cProfile as profiler +import io import pstats from oslo_utils import timeutils -import six -from six.moves import range as compat_range from taskflow import engines from taskflow.patterns import linear_flow as lf @@ -50,7 +49,7 @@ class ProfileIt(object): def __exit__(self, exc_tp, exc_v, exc_tb): self.profile.disable() - buf = six.StringIO() + buf = io.StringIO() ps = pstats.Stats(self.profile, stream=buf) ps = ps.sort_stats(*self.stats_ordering) percent_limit = max(0.0, max(1.0, self.args.limit / 100.0)) diff --git a/tox.ini b/tox.ini index 752cd5e6a..85031e43c 100644 --- a/tox.ini +++ b/tox.ini @@ -61,7 +61,6 @@ ignore = E305,E402,E721,E731,E741,W503,W504 [hacking] import_exceptions = - six.moves taskflow.test.mock unittest.mock