Merge "Rename "parallelize" decorator to "concurrent""
This commit is contained in:
commit
0a8928f51a
@ -31,7 +31,7 @@ import yaml
|
||||
import jenkins
|
||||
|
||||
from jenkins_jobs.constants import MAGIC_MANAGE_STRING
|
||||
from jenkins_jobs.parallel import parallelize
|
||||
from jenkins_jobs.parallel import concurrent
|
||||
from jenkins_jobs import utils
|
||||
|
||||
|
||||
@ -153,7 +153,6 @@ class Jenkins(object):
|
||||
self._job_list = set(job['name'] for job in self.jobs)
|
||||
return self._job_list
|
||||
|
||||
@parallelize
|
||||
def update_job(self, job_name, xml):
|
||||
if self.is_job(job_name):
|
||||
logger.info("Reconfiguring jenkins job {0}".format(job_name))
|
||||
@ -274,7 +273,6 @@ class Builder(object):
|
||||
# Need to clear the JJB cache after deletion
|
||||
self.cache.clear()
|
||||
|
||||
@parallelize
|
||||
def changed(self, job):
|
||||
md5 = job.md5()
|
||||
|
||||
@ -344,9 +342,9 @@ class Builder(object):
|
||||
p_params = [{'job': job} for job in jobs]
|
||||
results = self.parallel_update_job(
|
||||
n_workers=n_workers,
|
||||
parallelize=p_params)
|
||||
concurrent=p_params)
|
||||
logging.debug("Parsing results")
|
||||
# generalize the result parsing, as a parallelized job always returns a
|
||||
# generalize the result parsing, as a concurrent job always returns a
|
||||
# list
|
||||
if len(p_params) in (1, 0):
|
||||
results = [results]
|
||||
@ -365,7 +363,7 @@ class Builder(object):
|
||||
logging.debug("Total run took %ss", (time.time() - orig))
|
||||
return jobs, len(jobs)
|
||||
|
||||
@parallelize
|
||||
@concurrent
|
||||
def parallel_update_job(self, job):
|
||||
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
|
||||
return (job.name, job.md5())
|
||||
|
@ -13,7 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Parallel execution helper functions and classes
|
||||
# Concurrent execution helper functions and classes
|
||||
|
||||
from functools import wraps
|
||||
import logging
|
||||
@ -66,30 +66,30 @@ class Worker(threading.Thread):
|
||||
self.out_queue.put((task['ord'], res))
|
||||
|
||||
|
||||
def parallelize(func):
|
||||
def concurrent(func):
|
||||
@wraps(func)
|
||||
def parallelized(*args, **kwargs):
|
||||
def concurrentized(*args, **kwargs):
|
||||
"""
|
||||
This function will spawn workers and run the decorated function in
|
||||
parallel on the workers. It will not ensure the thread safety of the
|
||||
decorated function (the decorated function should be thread safe by
|
||||
This function will spawn workers and run the decorated function
|
||||
concurrently on the workers. It will not ensure the thread safety of
|
||||
the decorated function (the decorated function should be thread safe by
|
||||
itself). It accepts two special parameters:
|
||||
|
||||
:arg list parallelize: list of the arguments to pass to each of the
|
||||
:arg list concurrentize: list of the arguments to pass to each of the
|
||||
runs, the results of each run will be returned in the same order.
|
||||
:arg int n_workers: number of workers to use, by default and if '0'
|
||||
passed will autodetect the number of cores and use that, if '1'
|
||||
passed, it will not use any workers and just run as if were not
|
||||
parallelized everything.
|
||||
concurrentized everything.
|
||||
|
||||
Example:
|
||||
|
||||
> @parallelize
|
||||
> @concurrent
|
||||
> def sample(param1, param2, param3):
|
||||
> return param1 + param2 + param3
|
||||
>
|
||||
> sample('param1', param2='val2',
|
||||
> parallelize=[
|
||||
> concurrent=[
|
||||
> {'param3': 'val3'},
|
||||
> {'param3': 'val4'},
|
||||
> {'param3': 'val5'},
|
||||
@ -97,14 +97,14 @@ def parallelize(func):
|
||||
>
|
||||
['param1val2val3', 'param1val2val4', 'param1val2val5']
|
||||
|
||||
This will run the function `parallelized_function` 3 times, in
|
||||
parallel (depending on the number of detected cores) and return an
|
||||
This will run the function `concurrentized_function` 3 times, in
|
||||
concurrent (depending on the number of detected cores) and return an
|
||||
array with the results of the executions in the same order the
|
||||
parameters were passed.
|
||||
"""
|
||||
n_workers = kwargs.pop('n_workers', 0)
|
||||
p_kwargs = kwargs.pop('parallelize', [])
|
||||
# if only one parameter is passed inside the parallelize dict, run the
|
||||
p_kwargs = kwargs.pop('concurrent', [])
|
||||
# if only one parameter is passed inside the concurrent dict, run the
|
||||
# original function as is, no need for pools
|
||||
if len(p_kwargs) == 1:
|
||||
kwargs.update(p_kwargs[0])
|
||||
@ -115,7 +115,7 @@ def parallelize(func):
|
||||
# If no number of workers passed or passed 0
|
||||
if not n_workers:
|
||||
n_workers = cpu_count()
|
||||
logging.debug("Running parallel %d workers", n_workers)
|
||||
logging.debug("Running concurrent %d workers", n_workers)
|
||||
worker_pool = []
|
||||
in_queue = queue.Queue()
|
||||
out_queue = queue.Queue()
|
||||
@ -146,6 +146,6 @@ def parallelize(func):
|
||||
worker.join()
|
||||
# Reorder the results
|
||||
results = [r[1] for r in sorted(results)]
|
||||
logging.debug("Parallel task finished")
|
||||
logging.debug("Concurrent task finished")
|
||||
return results
|
||||
return parallelized
|
||||
return concurrentized
|
||||
|
@ -18,7 +18,7 @@ from multiprocessing import cpu_count
|
||||
from testtools import matchers
|
||||
from testtools import TestCase
|
||||
|
||||
from jenkins_jobs.parallel import parallelize
|
||||
from jenkins_jobs.parallel import concurrent
|
||||
from tests.base import mock
|
||||
|
||||
|
||||
@ -26,45 +26,45 @@ class TestCaseParallel(TestCase):
|
||||
def test_parallel_correct_order(self):
|
||||
expected = list(range(10, 20))
|
||||
|
||||
@parallelize
|
||||
@concurrent
|
||||
def parallel_test(num_base, num_extra):
|
||||
return num_base + num_extra
|
||||
|
||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||
result = parallel_test(10, parallelize=parallel_args)
|
||||
result = parallel_test(10, concurrent=parallel_args)
|
||||
self.assertThat(result, matchers.Equals(expected))
|
||||
|
||||
def test_parallel_time_less_than_serial(self):
|
||||
|
||||
@parallelize
|
||||
@concurrent
|
||||
def wait(secs):
|
||||
time.sleep(secs)
|
||||
|
||||
before = time.time()
|
||||
# ten threads to make it as fast as possible
|
||||
wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10)
|
||||
wait(concurrent=[{'secs': 1} for _ in range(10)], n_workers=10)
|
||||
after = time.time()
|
||||
self.assertThat(after - before, matchers.LessThan(5))
|
||||
|
||||
def test_parallel_single_thread(self):
|
||||
expected = list(range(10, 20))
|
||||
|
||||
@parallelize
|
||||
@concurrent
|
||||
def parallel_test(num_base, num_extra):
|
||||
return num_base + num_extra
|
||||
|
||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||
result = parallel_test(10, parallelize=parallel_args, n_workers=1)
|
||||
result = parallel_test(10, concurrent=parallel_args, n_workers=1)
|
||||
self.assertThat(result, matchers.Equals(expected))
|
||||
|
||||
@mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count)
|
||||
def test_use_auto_detect_cores(self, mockCpu_count):
|
||||
|
||||
@parallelize
|
||||
@concurrent
|
||||
def parallel_test():
|
||||
return True
|
||||
|
||||
result = parallel_test(parallelize=[{} for _ in range(10)],
|
||||
result = parallel_test(concurrent=[{} for _ in range(10)],
|
||||
n_workers=0)
|
||||
self.assertThat(result, matchers.Equals([True for _ in range(10)]))
|
||||
mockCpu_count.assert_called_once_with()
|
||||
|
Loading…
Reference in New Issue
Block a user