Remove the task manager
The underlying openstacksdk library is shifting how task manager works, so stop trying to send it a task manager. This is a feature basically in place just for nodepool which is being expanded to be more usable by everyone. The likelihood that anyone other than nodepool is using it is ... very low. Change-Id: I04be3937589a805a5f9686c91a78933eebcfa022
This commit is contained in:
parent
d0da0b0323
commit
28e95889a0
@ -39,13 +39,6 @@ Most of the logging is set up to log to the root `shade` logger. There are
|
||||
additional sub-loggers that are used at times, primarily so that a user can
|
||||
decide to turn on or off a specific type of logging. They are listed below.
|
||||
|
||||
shade.task_manager
|
||||
`shade` uses a Task Manager to perform remote calls. The `shade.task_manager`
|
||||
logger emits messages at the start and end of each Task announging what
|
||||
it is going to run and then what it ran and how long it took. Logging
|
||||
`shade.task_manager` is a good way to get a trace of external actions shade
|
||||
is taking without full `HTTP Tracing`_.
|
||||
|
||||
shade.request_ids
|
||||
The `shade.request_ids` logger emits a log line at the end of each HTTP
|
||||
interaction with the OpenStack Request ID associated with the interaction.
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The ``manager`` parameter is no longer meaningful. This should have no
|
||||
impact as the only known consumer of the feature is nodepool which
|
||||
no longer uses shade.
|
@ -11,7 +11,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
import base64
|
||||
import collections
|
||||
import copy
|
||||
import datetime
|
||||
import functools
|
||||
@ -35,7 +34,6 @@ import os
|
||||
from openstack.cloud import _utils
|
||||
from openstack.config import loader
|
||||
from openstack import connection
|
||||
from openstack import task_manager
|
||||
from openstack import utils
|
||||
|
||||
from shade import exc
|
||||
@ -135,7 +133,6 @@ class OpenStackCloud(
|
||||
super(OpenStackCloud, self).__init__(
|
||||
config=cloud_config,
|
||||
strict=strict,
|
||||
task_manager=manager,
|
||||
app_name=app_name,
|
||||
app_version=app_version,
|
||||
use_direct_get=use_direct_get,
|
||||
@ -7270,118 +7267,6 @@ class OpenStackCloud(
|
||||
endpoint, filename, headers,
|
||||
file_size, segment_size, use_slo)
|
||||
|
||||
def _upload_object(self, endpoint, filename, headers):
|
||||
return self._object_store_client.put(
|
||||
endpoint, headers=headers, data=open(filename, 'r'))
|
||||
|
||||
def _get_file_segments(self, endpoint, filename, file_size, segment_size):
|
||||
# Use an ordered dict here so that testing can replicate things
|
||||
segments = collections.OrderedDict()
|
||||
for (index, offset) in enumerate(range(0, file_size, segment_size)):
|
||||
remaining = file_size - (index * segment_size)
|
||||
segment = _utils.FileSegment(
|
||||
filename, offset,
|
||||
segment_size if segment_size < remaining else remaining)
|
||||
name = '{endpoint}/{index:0>6}'.format(
|
||||
endpoint=endpoint, index=index)
|
||||
segments[name] = segment
|
||||
return segments
|
||||
|
||||
def _object_name_from_url(self, url):
|
||||
'''Get container_name/object_name from the full URL called.
|
||||
|
||||
Remove the Swift endpoint from the front of the URL, and remove
|
||||
the leaving / that will leave behind.'''
|
||||
endpoint = self._object_store_client.get_endpoint()
|
||||
object_name = url.replace(endpoint, '')
|
||||
if object_name.startswith('/'):
|
||||
object_name = object_name[1:]
|
||||
return object_name
|
||||
|
||||
def _add_etag_to_manifest(self, segment_results, manifest):
|
||||
for result in segment_results:
|
||||
if 'Etag' not in result.headers:
|
||||
continue
|
||||
name = self._object_name_from_url(result.url)
|
||||
for entry in manifest:
|
||||
if entry['path'] == '/{name}'.format(name=name):
|
||||
entry['etag'] = result.headers['Etag']
|
||||
|
||||
def _upload_large_object(
|
||||
self, endpoint, filename,
|
||||
headers, file_size, segment_size, use_slo):
|
||||
# If the object is big, we need to break it up into segments that
|
||||
# are no larger than segment_size, upload each of them individually
|
||||
# and then upload a manifest object. The segments can be uploaded in
|
||||
# parallel, so we'll use the async feature of the TaskManager.
|
||||
|
||||
segment_futures = []
|
||||
segment_results = []
|
||||
retry_results = []
|
||||
retry_futures = []
|
||||
manifest = []
|
||||
|
||||
# Get an OrderedDict with keys being the swift location for the
|
||||
# segment, the value a FileSegment file-like object that is a
|
||||
# slice of the data for the segment.
|
||||
segments = self._get_file_segments(
|
||||
endpoint, filename, file_size, segment_size)
|
||||
|
||||
# Schedule the segments for upload
|
||||
for name, segment in segments.items():
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._object_store_client.put(
|
||||
name, headers=headers, data=segment, run_async=True)
|
||||
segment_futures.append(segment_future)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
manifest.append(dict(
|
||||
path='/{name}'.format(name=name),
|
||||
size_bytes=segment.length))
|
||||
|
||||
# Try once and collect failed results to retry
|
||||
segment_results, retry_results = task_manager.wait_for_futures(
|
||||
segment_futures, raise_on_error=False)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
||||
for result in retry_results:
|
||||
# Grab the FileSegment for the failed upload so we can retry
|
||||
name = self._object_name_from_url(result.url)
|
||||
segment = segments[name]
|
||||
segment.seek(0)
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._object_store_client.put(
|
||||
name, headers=headers, data=segment, run_async=True)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
retry_futures.append(segment_future)
|
||||
|
||||
# If any segments fail the second time, just throw the error
|
||||
segment_results, retry_results = task_manager.wait_for_futures(
|
||||
retry_futures, raise_on_error=True)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
||||
if use_slo:
|
||||
return self._finish_large_object_slo(endpoint, headers, manifest)
|
||||
else:
|
||||
return self._finish_large_object_dlo(endpoint, headers)
|
||||
|
||||
def _finish_large_object_slo(self, endpoint, headers, manifest):
|
||||
# TODO(mordred) send an etag of the manifest, which is the md5sum
|
||||
# of the concatenation of the etags of the results
|
||||
headers = headers.copy()
|
||||
return self._object_store_client.put(
|
||||
endpoint,
|
||||
params={'multipart-manifest': 'put'},
|
||||
headers=headers, data=json.dumps(manifest))
|
||||
|
||||
def _finish_large_object_dlo(self, endpoint, headers):
|
||||
headers = headers.copy()
|
||||
headers['X-Object-Manifest'] = endpoint
|
||||
return self._object_store_client.put(endpoint, headers=headers)
|
||||
|
||||
def update_object(self, container, name, metadata=None, **headers):
|
||||
"""Update the metadata of an object
|
||||
|
||||
|
@ -1,334 +0,0 @@
|
||||
# Copyright (C) 2011-2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import concurrent.futures
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
|
||||
import keystoneauth1.exceptions
|
||||
import six
|
||||
|
||||
from shade import _log
|
||||
from shade import exc
|
||||
from shade import meta
|
||||
|
||||
|
||||
def _is_listlike(obj):
|
||||
# NOTE(Shrews): Since the client API might decide to subclass one
|
||||
# of these result types, we use isinstance() here instead of type().
|
||||
return (
|
||||
isinstance(obj, list) or
|
||||
isinstance(obj, types.GeneratorType))
|
||||
|
||||
|
||||
def _is_objlike(obj):
|
||||
# NOTE(Shrews): Since the client API might decide to subclass one
|
||||
# of these result types, we use isinstance() here instead of type().
|
||||
return (
|
||||
not isinstance(obj, bool) and
|
||||
not isinstance(obj, int) and
|
||||
not isinstance(obj, float) and
|
||||
not isinstance(obj, six.string_types) and
|
||||
not isinstance(obj, set) and
|
||||
not isinstance(obj, tuple))
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseTask(object):
|
||||
"""Represent a task to be performed on an OpenStack Cloud.
|
||||
|
||||
Some consumers need to inject things like rate-limiting or auditing
|
||||
around each external REST interaction. Task provides an interface
|
||||
to encapsulate each such interaction. Also, although shade itself
|
||||
operates normally in a single-threaded direct action manner, consuming
|
||||
programs may provide a multi-threaded TaskManager themselves. For that
|
||||
reason, Task uses threading events to ensure appropriate wait conditions.
|
||||
These should be a no-op in single-threaded applications.
|
||||
|
||||
A consumer is expected to overload the main method.
|
||||
|
||||
:param dict kw: Any args that are expected to be passed to something in
|
||||
the main payload at execution time.
|
||||
"""
|
||||
|
||||
def __init__(self, **kw):
|
||||
self._exception = None
|
||||
self._traceback = None
|
||||
self._result = None
|
||||
self._response = None
|
||||
self._finished = threading.Event()
|
||||
self.run_async = False
|
||||
self.args = kw
|
||||
self.name = type(self).__name__
|
||||
|
||||
@abc.abstractmethod
|
||||
def main(self, client):
|
||||
""" Override this method with the actual workload to be performed """
|
||||
|
||||
def done(self, result):
|
||||
self._result = result
|
||||
self._finished.set()
|
||||
|
||||
def exception(self, e, tb):
|
||||
self._exception = e
|
||||
self._traceback = tb
|
||||
self._finished.set()
|
||||
|
||||
def wait(self, raw=False):
|
||||
self._finished.wait()
|
||||
|
||||
if self._exception:
|
||||
six.reraise(type(self._exception), self._exception,
|
||||
self._traceback)
|
||||
|
||||
return self._result
|
||||
|
||||
def run(self, client):
|
||||
self._client = client
|
||||
try:
|
||||
# Retry one time if we get a retriable connection failure
|
||||
try:
|
||||
# Keep time for connection retrying logging
|
||||
start = time.time()
|
||||
self.done(self.main(client))
|
||||
except keystoneauth1.exceptions.RetriableConnectionFailure as e:
|
||||
end = time.time()
|
||||
dt = end - start
|
||||
if client.region_name:
|
||||
client.log.debug(str(e))
|
||||
client.log.debug(
|
||||
"Connection failure on %(cloud)s:%(region)s"
|
||||
" for %(name)s after %(secs)s seconds, retrying",
|
||||
{'cloud': client.name,
|
||||
'region': client.region_name,
|
||||
'secs': dt,
|
||||
'name': self.name})
|
||||
else:
|
||||
client.log.debug(
|
||||
"Connection failure on %(cloud)s for %(name)s after"
|
||||
" %(secs)s seconds, retrying",
|
||||
{'cloud': client.name, 'name': self.name, 'secs': dt})
|
||||
self.done(self.main(client))
|
||||
except Exception:
|
||||
raise
|
||||
except Exception as e:
|
||||
self.exception(e, sys.exc_info()[2])
|
||||
|
||||
|
||||
class Task(BaseTask):
|
||||
""" Shade specific additions to the BaseTask Interface. """
|
||||
|
||||
def wait(self, raw=False):
|
||||
super(Task, self).wait()
|
||||
|
||||
if raw:
|
||||
# Do NOT convert the result.
|
||||
return self._result
|
||||
|
||||
if _is_listlike(self._result):
|
||||
return meta.obj_list_to_munch(self._result)
|
||||
elif _is_objlike(self._result):
|
||||
return meta.obj_to_munch(self._result)
|
||||
else:
|
||||
return self._result
|
||||
|
||||
|
||||
class RequestTask(BaseTask):
|
||||
""" Extensions to the Shade Tasks to handle raw requests """
|
||||
|
||||
# It's totally legit for calls to not return things
|
||||
result_key = None
|
||||
|
||||
# keystoneauth1 throws keystoneauth1.exceptions.http.HttpError on !200
|
||||
def done(self, result):
|
||||
self._response = result
|
||||
|
||||
try:
|
||||
result_json = self._response.json()
|
||||
except Exception as e:
|
||||
result_json = self._response.text
|
||||
self._client.log.debug(
|
||||
'Could not decode json in response: %(e)s', {'e': str(e)})
|
||||
self._client.log.debug(result_json)
|
||||
|
||||
if self.result_key:
|
||||
self._result = result_json[self.result_key]
|
||||
else:
|
||||
self._result = result_json
|
||||
|
||||
self._request_id = self._response.headers.get('x-openstack-request-id')
|
||||
self._finished.set()
|
||||
|
||||
def wait(self, raw=False):
|
||||
super(RequestTask, self).wait()
|
||||
|
||||
if raw:
|
||||
# Do NOT convert the result.
|
||||
return self._result
|
||||
|
||||
if _is_listlike(self._result):
|
||||
return meta.obj_list_to_munch(
|
||||
self._result, request_id=self._request_id)
|
||||
elif _is_objlike(self._result):
|
||||
return meta.obj_to_munch(self._result, request_id=self._request_id)
|
||||
return self._result
|
||||
|
||||
|
||||
def _result_filter_cb(result):
|
||||
return result
|
||||
|
||||
|
||||
def generate_task_class(method, name, result_filter_cb):
|
||||
if name is None:
|
||||
if callable(method):
|
||||
name = method.__name__
|
||||
else:
|
||||
name = method
|
||||
|
||||
class RunTask(Task):
|
||||
def __init__(self, **kw):
|
||||
super(RunTask, self).__init__(**kw)
|
||||
self.name = name
|
||||
self._method = method
|
||||
|
||||
def wait(self, raw=False):
|
||||
super(RunTask, self).wait()
|
||||
|
||||
if raw:
|
||||
# Do NOT convert the result.
|
||||
return self._result
|
||||
return result_filter_cb(self._result)
|
||||
|
||||
def main(self, client):
|
||||
if callable(self._method):
|
||||
return method(**self.args)
|
||||
else:
|
||||
meth = getattr(client, self._method)
|
||||
return meth(**self.args)
|
||||
return RunTask
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
log = _log.setup_logging('shade.task_manager')
|
||||
|
||||
def __init__(
|
||||
self, client, name, result_filter_cb=None, workers=5, **kwargs):
|
||||
self.name = name
|
||||
self._client = client
|
||||
self._executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=workers)
|
||||
if not result_filter_cb:
|
||||
self._result_filter_cb = _result_filter_cb
|
||||
else:
|
||||
self._result_filter_cb = result_filter_cb
|
||||
|
||||
def set_client(self, client):
|
||||
self._client = client
|
||||
|
||||
def stop(self):
|
||||
""" This is a direct action passthrough TaskManager """
|
||||
self._executor.shutdown(wait=True)
|
||||
|
||||
def run(self):
|
||||
""" This is a direct action passthrough TaskManager """
|
||||
pass
|
||||
|
||||
def submit_task(self, task, raw=False):
|
||||
"""Submit and execute the given task.
|
||||
|
||||
:param task: The task to execute.
|
||||
:param bool raw: If True, return the raw result as received from the
|
||||
underlying client call.
|
||||
"""
|
||||
return self.run_task(task=task, raw=raw)
|
||||
|
||||
def _run_task_async(self, task, raw=False):
|
||||
self.log.debug(
|
||||
"Manager %s submitting task %s", self.name, task.name)
|
||||
return self._executor.submit(self._run_task, task, raw=raw)
|
||||
|
||||
def run_task(self, task, raw=False):
|
||||
if hasattr(task, 'run_async') and task.run_async:
|
||||
return self._run_task_async(task, raw=raw)
|
||||
else:
|
||||
return self._run_task(task, raw=raw)
|
||||
|
||||
def _run_task(self, task, raw=False):
|
||||
self.log.debug(
|
||||
"Manager %s running task %s", self.name, task.name)
|
||||
start = time.time()
|
||||
task.run(self._client)
|
||||
end = time.time()
|
||||
dt = end - start
|
||||
self.log.debug(
|
||||
"Manager %s ran task %s in %ss", self.name, task.name, dt)
|
||||
|
||||
self.post_run_task(dt, task)
|
||||
|
||||
return task.wait(raw)
|
||||
|
||||
def post_run_task(self, elasped_time, task):
|
||||
pass
|
||||
|
||||
# Backwards compatibility
|
||||
submitTask = submit_task
|
||||
|
||||
def submit_function(
|
||||
self, method, name=None, result_filter_cb=None, **kwargs):
|
||||
""" Allows submitting an arbitrary method for work.
|
||||
|
||||
:param method: Method to run in the TaskManager. Can be either the
|
||||
name of a method to find on self.client, or a callable.
|
||||
"""
|
||||
if not result_filter_cb:
|
||||
result_filter_cb = self._result_filter_cb
|
||||
|
||||
task_class = generate_task_class(method, name, result_filter_cb)
|
||||
|
||||
return self._executor.submit_task(task_class(**kwargs))
|
||||
|
||||
|
||||
def wait_for_futures(futures, raise_on_error=True, log=None):
|
||||
'''Collect results or failures from a list of running future tasks.'''
|
||||
|
||||
results = []
|
||||
retries = []
|
||||
|
||||
# Check on each result as its thread finishes
|
||||
for completed in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
result = completed.result()
|
||||
# We have to do this here because munch_response doesn't
|
||||
# get called on async job results
|
||||
exc.raise_from_response(result)
|
||||
results.append(result)
|
||||
except (keystoneauth1.exceptions.RetriableConnectionFailure,
|
||||
exc.OpenStackCloudException) as e:
|
||||
if log:
|
||||
log.debug(
|
||||
"Exception processing async task: {e}".format(
|
||||
e=str(e)),
|
||||
exc_info=True)
|
||||
# If we get an exception, put the result into a list so we
|
||||
# can try again
|
||||
if raise_on_error:
|
||||
raise
|
||||
else:
|
||||
retries.append(result)
|
||||
return results, retries
|
@ -1,109 +0,0 @@
|
||||
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import concurrent.futures
|
||||
import mock
|
||||
|
||||
from shade import task_manager
|
||||
from shade.tests.unit import base
|
||||
|
||||
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TaskTest(task_manager.Task):
|
||||
def main(self, client):
|
||||
raise TestException("This is a test exception")
|
||||
|
||||
|
||||
class TaskTestGenerator(task_manager.Task):
|
||||
def main(self, client):
|
||||
yield 1
|
||||
|
||||
|
||||
class TaskTestInt(task_manager.Task):
|
||||
def main(self, client):
|
||||
return int(1)
|
||||
|
||||
|
||||
class TaskTestFloat(task_manager.Task):
|
||||
def main(self, client):
|
||||
return float(2.0)
|
||||
|
||||
|
||||
class TaskTestStr(task_manager.Task):
|
||||
def main(self, client):
|
||||
return "test"
|
||||
|
||||
|
||||
class TaskTestBool(task_manager.Task):
|
||||
def main(self, client):
|
||||
return True
|
||||
|
||||
|
||||
class TaskTestSet(task_manager.Task):
|
||||
def main(self, client):
|
||||
return set([1, 2])
|
||||
|
||||
|
||||
class TaskTestAsync(task_manager.Task):
|
||||
def __init__(self):
|
||||
super(task_manager.Task, self).__init__()
|
||||
self.run_async = True
|
||||
|
||||
def main(self, client):
|
||||
pass
|
||||
|
||||
|
||||
class TestTaskManager(base.RequestsMockTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestTaskManager, self).setUp()
|
||||
self.manager = task_manager.TaskManager(name='test', client=self)
|
||||
|
||||
def test_wait_re_raise(self):
|
||||
"""Test that Exceptions thrown in a Task is reraised correctly
|
||||
|
||||
This test is aimed to six.reraise(), called in Task::wait().
|
||||
Specifically, we test if we get the same behaviour with all the
|
||||
configured interpreters (e.g. py27, p34, pypy, ...)
|
||||
"""
|
||||
self.assertRaises(TestException, self.manager.submit_task, TaskTest())
|
||||
|
||||
def test_dont_munchify_int(self):
|
||||
ret = self.manager.submit_task(TaskTestInt())
|
||||
self.assertIsInstance(ret, int)
|
||||
|
||||
def test_dont_munchify_float(self):
|
||||
ret = self.manager.submit_task(TaskTestFloat())
|
||||
self.assertIsInstance(ret, float)
|
||||
|
||||
def test_dont_munchify_str(self):
|
||||
ret = self.manager.submit_task(TaskTestStr())
|
||||
self.assertIsInstance(ret, str)
|
||||
|
||||
def test_dont_munchify_bool(self):
|
||||
ret = self.manager.submit_task(TaskTestBool())
|
||||
self.assertIsInstance(ret, bool)
|
||||
|
||||
def test_dont_munchify_set(self):
|
||||
ret = self.manager.submit_task(TaskTestSet())
|
||||
self.assertIsInstance(ret, set)
|
||||
|
||||
@mock.patch.object(concurrent.futures.ThreadPoolExecutor, 'submit')
|
||||
def test_async(self, mock_submit):
|
||||
self.manager.submit_task(TaskTestAsync())
|
||||
self.assertTrue(mock_submit.called)
|
Loading…
x
Reference in New Issue
Block a user