Merge "Add parallelization options"
This commit is contained in:
commit
0fc023b911
@ -160,6 +160,13 @@ arguments after the job definition path. To update Foo1 and Foo2 run::
|
||||
|
||||
jenkins-jobs update /path/to/defs Foo1 Foo2
|
||||
|
||||
You can also enable the parallel execution of the program passing the workers
|
||||
option with a value of 0, 2, or higher. Use 0 to run as many workers as cores
|
||||
in the host that runs it, and 2 or higher to specify the number of workers to
|
||||
use::
|
||||
|
||||
jenkins-jobs update --workers 0 /path/to/defs
|
||||
|
||||
Passing Multiple Paths
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
It is possible to pass multiple paths to JJB using colons as a path separator on
|
||||
|
@ -26,8 +26,10 @@ import jenkins
|
||||
import re
|
||||
from pprint import pformat
|
||||
import logging
|
||||
import time
|
||||
|
||||
from jenkins_jobs.constants import MAGIC_MANAGE_STRING
|
||||
from jenkins_jobs.parallel import parallelize
|
||||
from jenkins_jobs.parser import YamlParser
|
||||
from jenkins_jobs import utils
|
||||
|
||||
@ -135,6 +137,7 @@ class Jenkins(object):
|
||||
self._job_list = set(job['name'] for job in self.jobs)
|
||||
return self._job_list
|
||||
|
||||
@parallelize
|
||||
def update_job(self, job_name, xml):
|
||||
if self.is_job(job_name):
|
||||
logger.info("Reconfiguring jenkins job {0}".format(job_name))
|
||||
@ -303,6 +306,7 @@ class Builder(object):
|
||||
self.jenkins.delete_job(job)
|
||||
if(self.cache.is_cached(job)):
|
||||
self.cache.set(job, '')
|
||||
self.cache.save()
|
||||
|
||||
def delete_all_jobs(self):
|
||||
jobs = self.jenkins.get_jobs()
|
||||
@ -311,10 +315,23 @@ class Builder(object):
|
||||
# Need to clear the JJB cache after deletion
|
||||
self.cache.clear()
|
||||
|
||||
def update_job(self, input_fn, jobs_glob=None, output=None):
|
||||
@parallelize
|
||||
def changed(self, job):
|
||||
md5 = job.md5()
|
||||
changed = self.ignore_cache or self.cache.has_changed(job.name, md5)
|
||||
if not changed:
|
||||
logger.debug("'{0}' has not changed".format(job.name))
|
||||
return changed
|
||||
|
||||
def update_jobs(self, input_fn, jobs_glob=None, output=None,
|
||||
n_workers=None):
|
||||
orig = time.time()
|
||||
self.load_files(input_fn)
|
||||
self.parser.expandYaml(jobs_glob)
|
||||
self.parser.generateXML()
|
||||
step = time.time()
|
||||
logging.debug('%d XML files generated in %ss',
|
||||
len(self.parser.jobs), str(step - orig))
|
||||
|
||||
logger.info("Number of jobs generated: %d", len(self.parser.xml_jobs))
|
||||
self.parser.xml_jobs.sort(key=operator.attrgetter('name'))
|
||||
@ -328,9 +345,8 @@ class Builder(object):
|
||||
if not os.path.isdir(output):
|
||||
raise
|
||||
|
||||
updated_jobs = 0
|
||||
for job in self.parser.xml_jobs:
|
||||
if output:
|
||||
if output:
|
||||
for job in self.parser.xml_jobs:
|
||||
if hasattr(output, 'write'):
|
||||
# `output` is a file-like object
|
||||
logger.info("Job name: %s", job.name)
|
||||
@ -351,17 +367,54 @@ class Builder(object):
|
||||
logger.debug("Writing XML to '{0}'".format(output_fn))
|
||||
with io.open(output_fn, 'w', encoding='utf-8') as f:
|
||||
f.write(job.output().decode('utf-8'))
|
||||
continue
|
||||
md5 = job.md5()
|
||||
if (self.jenkins.is_job(job.name)
|
||||
and not self.cache.is_cached(job.name)):
|
||||
old_md5 = self.jenkins.get_job_md5(job.name)
|
||||
self.cache.set(job.name, old_md5)
|
||||
return self.parser.xml_jobs, len(self.parser.xml_jobs)
|
||||
|
||||
if self.cache.has_changed(job.name, md5) or self.ignore_cache:
|
||||
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
|
||||
updated_jobs += 1
|
||||
self.cache.set(job.name, md5)
|
||||
# Filter out the jobs that did not change
|
||||
logging.debug('Filtering %d jobs for changed jobs',
|
||||
len(self.parser.xml_jobs))
|
||||
step = time.time()
|
||||
jobs = [job for job in self.parser.xml_jobs
|
||||
if self.changed(job)]
|
||||
logging.debug("Filtered for changed jobs in %ss",
|
||||
(time.time() - step))
|
||||
|
||||
if not jobs:
|
||||
return [], 0
|
||||
|
||||
# Update the jobs
|
||||
logging.debug('Updating jobs')
|
||||
step = time.time()
|
||||
p_params = [{'job': job} for job in jobs]
|
||||
results = self.parallel_update_job(
|
||||
n_workers=n_workers,
|
||||
parallelize=p_params)
|
||||
logging.debug("Parsing results")
|
||||
# generalize the result parsing, as a parallelized job always returns a
|
||||
# list
|
||||
if len(p_params) in (1, 0):
|
||||
results = [results]
|
||||
for result in results:
|
||||
if isinstance(result, Exception):
|
||||
raise result
|
||||
else:
|
||||
logger.debug("'{0}' has not changed".format(job.name))
|
||||
return self.parser.xml_jobs, updated_jobs
|
||||
# update in-memory cache
|
||||
j_name, j_md5 = result
|
||||
self.cache.set(j_name, j_md5)
|
||||
# write cache to disk
|
||||
self.cache.save()
|
||||
logging.debug("Updated %d jobs in %ss",
|
||||
len(jobs),
|
||||
time.time() - step)
|
||||
logging.debug("Total run took %ss", (time.time() - orig))
|
||||
return jobs, len(jobs)
|
||||
|
||||
@parallelize
|
||||
def parallel_update_job(self, job):
|
||||
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
|
||||
return (job.name, job.md5())
|
||||
|
||||
def update_job(self, input_fn, jobs_glob=None, output=None):
|
||||
logging.warn('Current update_job function signature is deprecated and '
|
||||
'will change in future versions to the signature of the '
|
||||
'new parallel_update_job')
|
||||
return self.update_jobs(input_fn, jobs_glob, output)
|
||||
|
@ -104,6 +104,9 @@ def create_parser():
|
||||
parser_update.add_argument('--delete-old', help='delete obsolete jobs',
|
||||
action='store_true',
|
||||
dest='delete_old', default=False,)
|
||||
parser_update.add_argument('--workers', dest='n_workers', type=int,
|
||||
default=1, help='number of workers to use, 0 '
|
||||
'for autodetection and 1 for just one worker.')
|
||||
|
||||
# subparser: test
|
||||
parser_test = subparser.add_parser('test', parents=[recursive_parser])
|
||||
@ -325,17 +328,23 @@ def execute(options, config):
|
||||
logger.info("Deleting all jobs")
|
||||
builder.delete_all_jobs()
|
||||
elif options.command == 'update':
|
||||
if options.n_workers < 0:
|
||||
raise JenkinsJobsException(
|
||||
'Number of workers must be equal or greater than 0')
|
||||
|
||||
logger.info("Updating jobs in {0} ({1})".format(
|
||||
options.path, options.names))
|
||||
jobs, num_updated_jobs = builder.update_job(options.path,
|
||||
options.names)
|
||||
jobs, num_updated_jobs = builder.update_jobs(
|
||||
options.path, options.names,
|
||||
n_workers=options.n_workers)
|
||||
logger.info("Number of jobs updated: %d", num_updated_jobs)
|
||||
if options.delete_old:
|
||||
num_deleted_jobs = builder.delete_old_managed()
|
||||
logger.info("Number of jobs deleted: %d", num_deleted_jobs)
|
||||
elif options.command == 'test':
|
||||
builder.update_job(options.path, options.name,
|
||||
output=options.output_dir)
|
||||
builder.update_jobs(options.path, options.name,
|
||||
output=options.output_dir,
|
||||
n_workers=1)
|
||||
|
||||
|
||||
def version():
|
||||
|
151
jenkins_jobs/parallel.py
Normal file
151
jenkins_jobs/parallel.py
Normal file
@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (C) 2015 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Parallel execution helper functions and classes
|
||||
|
||||
from functools import wraps
|
||||
import logging
|
||||
from multiprocessing import cpu_count
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
try:
|
||||
import Queue as queue
|
||||
except ImportError:
|
||||
import queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TaskFunc(dict):
|
||||
"""
|
||||
Simple class to wrap around the information needed to run a function.
|
||||
"""
|
||||
def __init__(self, n_ord, func, args=None, kwargs=None):
|
||||
self['func'] = func
|
||||
self['args'] = args or []
|
||||
self['kwargs'] = kwargs or {}
|
||||
self['ord'] = n_ord
|
||||
|
||||
|
||||
class Worker(threading.Thread):
|
||||
"""
|
||||
Class that actually does the work, gets a TaskFunc through the queue,
|
||||
runs its function with the passed parameters and returns the result
|
||||
If the string 'done' is passed instead of a TaskFunc instance, the thread
|
||||
will end.
|
||||
"""
|
||||
def __init__(self, in_queue, out_queue):
|
||||
threading.Thread.__init__(self)
|
||||
self.in_queue = in_queue
|
||||
self.out_queue = out_queue
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
task = self.in_queue.get()
|
||||
if task == 'done':
|
||||
return
|
||||
try:
|
||||
res = task['func'](*task['args'],
|
||||
**task['kwargs'])
|
||||
except Exception as exc:
|
||||
res = exc
|
||||
traceback.print_exc()
|
||||
self.out_queue.put((task['ord'], res))
|
||||
|
||||
|
||||
def parallelize(func):
|
||||
@wraps(func)
|
||||
def parallelized(*args, **kwargs):
|
||||
"""
|
||||
This function will spawn workers and run the decorated function in
|
||||
parallel on the workers. It will not ensure the thread safety of the
|
||||
decorated function (the decorated function should be thread safe by
|
||||
itself). It accepts two special parameters:
|
||||
|
||||
:arg list parallelize: list of the arguments to pass to each of the
|
||||
runs, the results of each run will be returned in the same order.
|
||||
:arg int n_workers: number of workers to use, by default and if '0'
|
||||
passed will autodetect the number of cores and use that, if '1'
|
||||
passed, it will not use any workers and just run as if were not
|
||||
parallelized everything.
|
||||
|
||||
Example:
|
||||
|
||||
> @parallelize
|
||||
> def sample(param1, param2, param3):
|
||||
> return param1 + param2 + param3
|
||||
>
|
||||
> sample('param1', param2='val2',
|
||||
> parallelize=[
|
||||
> {'param3': 'val3'},
|
||||
> {'param3': 'val4'},
|
||||
> {'param3': 'val5'},
|
||||
> ])
|
||||
>
|
||||
['param1val2val3', 'param1val2val4', 'param1val2val5']
|
||||
|
||||
This will run the function `parallelized_function` 3 times, in
|
||||
parallel (depending on the number of detected cores) and return an
|
||||
array with the results of the executions in the same order the
|
||||
parameters were passed.
|
||||
"""
|
||||
n_workers = kwargs.pop('n_workers', 0)
|
||||
p_kwargs = kwargs.pop('parallelize', [])
|
||||
# if only one parameter is passed inside the parallelize dict, run the
|
||||
# original function as is, no need for pools
|
||||
if len(p_kwargs) == 1:
|
||||
kwargs.update(p_kwargs[0])
|
||||
if len(p_kwargs) in (1, 0):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# prepare the workers
|
||||
# If no number of workers passed or passed 0
|
||||
if not n_workers:
|
||||
n_workers = cpu_count()
|
||||
logging.debug("Running parallel %d workers", n_workers)
|
||||
worker_pool = []
|
||||
in_queue = queue.Queue()
|
||||
out_queue = queue.Queue()
|
||||
for n_worker in range(n_workers):
|
||||
new_worker = Worker(in_queue, out_queue)
|
||||
new_worker.setDaemon(True)
|
||||
logging.debug("Spawning worker %d", n_worker)
|
||||
new_worker.start()
|
||||
worker_pool.append(new_worker)
|
||||
|
||||
# Feed the workers
|
||||
n_ord = 0
|
||||
for f_kwargs in p_kwargs:
|
||||
f_kwargs.update(kwargs)
|
||||
in_queue.put(TaskFunc(n_ord, func, args, f_kwargs))
|
||||
n_ord += 1
|
||||
for _ in range(n_workers):
|
||||
in_queue.put('done')
|
||||
|
||||
# Wait for the results
|
||||
logging.debug("Waiting for workers to finish processing")
|
||||
results = []
|
||||
for _ in p_kwargs:
|
||||
new_res = out_queue.get()
|
||||
results.append(new_res)
|
||||
# cleanup
|
||||
for worker in worker_pool:
|
||||
worker.join()
|
||||
# Reorder the results
|
||||
results = [r[1] for r in sorted(results)]
|
||||
logging.debug("Parallel task finished")
|
||||
return results
|
||||
return parallelized
|
@ -85,8 +85,8 @@ class TestTests(CmdTestsBase):
|
||||
args.output_dir = mock.Mock(wraps=io.BytesIO())
|
||||
cmd.execute(args, self.config) # probably better to fail here
|
||||
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
|
||||
def test_multi_path(self, update_job_mock):
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
|
||||
def test_multi_path(self, update_jobs_mock):
|
||||
"""
|
||||
Run test mode and pass multiple paths.
|
||||
"""
|
||||
@ -98,14 +98,15 @@ class TestTests(CmdTestsBase):
|
||||
|
||||
cmd.execute(args, self.config)
|
||||
self.assertEqual(args.path, path_list)
|
||||
update_job_mock.assert_called_with(path_list, [],
|
||||
output=args.output_dir)
|
||||
update_jobs_mock.assert_called_with(path_list, [],
|
||||
output=args.output_dir,
|
||||
n_workers=mock.ANY)
|
||||
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
|
||||
@mock.patch('jenkins_jobs.cmd.os.path.isdir')
|
||||
@mock.patch('jenkins_jobs.cmd.os.walk')
|
||||
def test_recursive_multi_path(self, os_walk_mock, isdir_mock,
|
||||
update_job_mock):
|
||||
update_jobs_mock):
|
||||
"""
|
||||
Run test mode and pass multiple paths with recursive path option.
|
||||
"""
|
||||
@ -125,20 +126,22 @@ class TestTests(CmdTestsBase):
|
||||
|
||||
cmd.execute(args, self.config)
|
||||
|
||||
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
|
||||
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
|
||||
n_workers=mock.ANY)
|
||||
|
||||
args = self.parser.parse_args(['test', multipath])
|
||||
args.output_dir = mock.MagicMock()
|
||||
self.config.set('job_builder', 'recursive', 'True')
|
||||
cmd.execute(args, self.config)
|
||||
|
||||
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
|
||||
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
|
||||
n_workers=mock.ANY)
|
||||
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
|
||||
@mock.patch('jenkins_jobs.cmd.os.path.isdir')
|
||||
@mock.patch('jenkins_jobs.cmd.os.walk')
|
||||
def test_recursive_multi_path_with_excludes(self, os_walk_mock, isdir_mock,
|
||||
update_job_mock):
|
||||
update_jobs_mock):
|
||||
"""
|
||||
Run test mode and pass multiple paths with recursive path option.
|
||||
"""
|
||||
@ -160,7 +163,8 @@ class TestTests(CmdTestsBase):
|
||||
|
||||
cmd.execute(args, self.config)
|
||||
|
||||
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
|
||||
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
|
||||
n_workers=mock.ANY)
|
||||
|
||||
def test_console_output(self):
|
||||
"""
|
||||
|
@ -26,19 +26,19 @@ from tests.cmd.test_cmd import CmdTestsBase
|
||||
@mock.patch('jenkins_jobs.builder.Jenkins.get_plugins_info', mock.MagicMock)
|
||||
class UpdateTests(CmdTestsBase):
|
||||
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
|
||||
def test_update_jobs(self, update_job_mock):
|
||||
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
|
||||
def test_update_jobs(self, update_jobs_mock):
|
||||
"""
|
||||
Test update_job is called
|
||||
"""
|
||||
# don't care about the value returned here
|
||||
update_job_mock.return_value = ([], 0)
|
||||
update_jobs_mock.return_value = ([], 0)
|
||||
|
||||
path = os.path.join(self.fixtures_path, 'cmd-002.yaml')
|
||||
args = self.parser.parse_args(['update', path])
|
||||
|
||||
cmd.execute(args, self.config)
|
||||
update_job_mock.assert_called_with([path], [])
|
||||
update_jobs_mock.assert_called_with([path], [], n_workers=mock.ANY)
|
||||
|
||||
@mock.patch('jenkins_jobs.builder.Jenkins.is_job', return_value=True)
|
||||
@mock.patch('jenkins_jobs.builder.Jenkins.get_jobs')
|
||||
@ -88,7 +88,7 @@ class UpdateTests(CmdTestsBase):
|
||||
# mocks to call real methods on a the above test object.
|
||||
b_inst = builder_mock.return_value
|
||||
b_inst.plugins_list = builder_obj.plugins_list
|
||||
b_inst.update_job.side_effect = builder_obj.update_job
|
||||
b_inst.update_jobs.side_effect = builder_obj.update_jobs
|
||||
b_inst.delete_old_managed.side_effect = builder_obj.delete_old_managed
|
||||
|
||||
def _get_jobs():
|
||||
|
0
tests/parallel/__init__.py
Normal file
0
tests/parallel/__init__.py
Normal file
70
tests/parallel/test_parallel.py
Normal file
70
tests/parallel/test_parallel.py
Normal file
@ -0,0 +1,70 @@
|
||||
# Copyright 2015 David Caro
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
from testtools import matchers
|
||||
from testtools import TestCase
|
||||
|
||||
from jenkins_jobs.parallel import parallelize
|
||||
from tests.base import mock
|
||||
|
||||
|
||||
class TestCaseParallel(TestCase):
|
||||
def test_parallel_correct_order(self):
|
||||
expected = list(range(10, 20))
|
||||
|
||||
@parallelize
|
||||
def parallel_test(num_base, num_extra):
|
||||
return num_base + num_extra
|
||||
|
||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||
result = parallel_test(10, parallelize=parallel_args)
|
||||
self.assertThat(result, matchers.Equals(expected))
|
||||
|
||||
def test_parallel_time_less_than_serial(self):
|
||||
|
||||
@parallelize
|
||||
def wait(secs):
|
||||
time.sleep(secs)
|
||||
|
||||
before = time.time()
|
||||
# ten threads to make it as fast as possible
|
||||
wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10)
|
||||
after = time.time()
|
||||
self.assertThat(after - before, matchers.LessThan(5))
|
||||
|
||||
def test_parallel_single_thread(self):
|
||||
expected = list(range(10, 20))
|
||||
|
||||
@parallelize
|
||||
def parallel_test(num_base, num_extra):
|
||||
return num_base + num_extra
|
||||
|
||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||
result = parallel_test(10, parallelize=parallel_args, n_workers=1)
|
||||
self.assertThat(result, matchers.Equals(expected))
|
||||
|
||||
@mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count)
|
||||
def test_use_auto_detect_cores(self, mockCpu_count):
|
||||
|
||||
@parallelize
|
||||
def parallel_test():
|
||||
return True
|
||||
|
||||
result = parallel_test(parallelize=[{} for _ in range(10)],
|
||||
n_workers=0)
|
||||
self.assertThat(result, matchers.Equals([True for _ in range(10)]))
|
||||
mockCpu_count.assert_called_once_with()
|
Loading…
Reference in New Issue
Block a user