diff --git a/doc/source/execution.rst b/doc/source/execution.rst index 326b56c33..6946cf48f 100644 --- a/doc/source/execution.rst +++ b/doc/source/execution.rst @@ -160,6 +160,13 @@ arguments after the job definition path. To update Foo1 and Foo2 run:: jenkins-jobs update /path/to/defs Foo1 Foo2 +You can also enable the parallel execution of the program passing the workers +option with a value of 0, 2, or higher. Use 0 to run as many workers as cores +in the host that runs it, and 2 or higher to specify the number of workers to +use:: + + jenkins-jobs update --workers 0 /path/to/defs + Passing Multiple Paths ^^^^^^^^^^^^^^^^^^^^^^ It is possible to pass multiple paths to JJB using colons as a path separator on diff --git a/jenkins_jobs/builder.py b/jenkins_jobs/builder.py index 66ecd4918..c9c4edf14 100644 --- a/jenkins_jobs/builder.py +++ b/jenkins_jobs/builder.py @@ -26,8 +26,10 @@ import jenkins import re from pprint import pformat import logging +import time from jenkins_jobs.constants import MAGIC_MANAGE_STRING +from jenkins_jobs.parallel import parallelize from jenkins_jobs.parser import YamlParser from jenkins_jobs import utils @@ -135,6 +137,7 @@ class Jenkins(object): self._job_list = set(job['name'] for job in self.jobs) return self._job_list + @parallelize def update_job(self, job_name, xml): if self.is_job(job_name): logger.info("Reconfiguring jenkins job {0}".format(job_name)) @@ -303,6 +306,7 @@ class Builder(object): self.jenkins.delete_job(job) if(self.cache.is_cached(job)): self.cache.set(job, '') + self.cache.save() def delete_all_jobs(self): jobs = self.jenkins.get_jobs() @@ -311,10 +315,23 @@ class Builder(object): # Need to clear the JJB cache after deletion self.cache.clear() - def update_job(self, input_fn, jobs_glob=None, output=None): + @parallelize + def changed(self, job): + md5 = job.md5() + changed = self.ignore_cache or self.cache.has_changed(job.name, md5) + if not changed: + logger.debug("'{0}' has not changed".format(job.name)) + return changed + + def update_jobs(self, input_fn, jobs_glob=None, output=None, + n_workers=None): + orig = time.time() self.load_files(input_fn) self.parser.expandYaml(jobs_glob) self.parser.generateXML() + step = time.time() + logging.debug('%d XML files generated in %ss', + len(self.parser.jobs), str(step - orig)) logger.info("Number of jobs generated: %d", len(self.parser.xml_jobs)) self.parser.xml_jobs.sort(key=operator.attrgetter('name')) @@ -328,9 +345,8 @@ class Builder(object): if not os.path.isdir(output): raise - updated_jobs = 0 - for job in self.parser.xml_jobs: - if output: + if output: + for job in self.parser.xml_jobs: if hasattr(output, 'write'): # `output` is a file-like object logger.info("Job name: %s", job.name) @@ -351,17 +367,54 @@ class Builder(object): logger.debug("Writing XML to '{0}'".format(output_fn)) with io.open(output_fn, 'w', encoding='utf-8') as f: f.write(job.output().decode('utf-8')) - continue - md5 = job.md5() - if (self.jenkins.is_job(job.name) - and not self.cache.is_cached(job.name)): - old_md5 = self.jenkins.get_job_md5(job.name) - self.cache.set(job.name, old_md5) + return self.parser.xml_jobs, len(self.parser.xml_jobs) - if self.cache.has_changed(job.name, md5) or self.ignore_cache: - self.jenkins.update_job(job.name, job.output().decode('utf-8')) - updated_jobs += 1 - self.cache.set(job.name, md5) + # Filter out the jobs that did not change + logging.debug('Filtering %d jobs for changed jobs', + len(self.parser.xml_jobs)) + step = time.time() + jobs = [job for job in self.parser.xml_jobs + if self.changed(job)] + logging.debug("Filtered for changed jobs in %ss", + (time.time() - step)) + + if not jobs: + return [], 0 + + # Update the jobs + logging.debug('Updating jobs') + step = time.time() + p_params = [{'job': job} for job in jobs] + results = self.parallel_update_job( + n_workers=n_workers, + parallelize=p_params) + logging.debug("Parsing results") + # generalize the result parsing, as a parallelized job always returns a + # list + if len(p_params) in (1, 0): + results = [results] + for result in results: + if isinstance(result, Exception): + raise result else: - logger.debug("'{0}' has not changed".format(job.name)) - return self.parser.xml_jobs, updated_jobs + # update in-memory cache + j_name, j_md5 = result + self.cache.set(j_name, j_md5) + # write cache to disk + self.cache.save() + logging.debug("Updated %d jobs in %ss", + len(jobs), + time.time() - step) + logging.debug("Total run took %ss", (time.time() - orig)) + return jobs, len(jobs) + + @parallelize + def parallel_update_job(self, job): + self.jenkins.update_job(job.name, job.output().decode('utf-8')) + return (job.name, job.md5()) + + def update_job(self, input_fn, jobs_glob=None, output=None): + logging.warn('Current update_job function signature is deprecated and ' + 'will change in future versions to the signature of the ' + 'new parallel_update_job') + return self.update_jobs(input_fn, jobs_glob, output) diff --git a/jenkins_jobs/cmd.py b/jenkins_jobs/cmd.py index 898e588e9..0f82251a7 100755 --- a/jenkins_jobs/cmd.py +++ b/jenkins_jobs/cmd.py @@ -104,6 +104,9 @@ def create_parser(): parser_update.add_argument('--delete-old', help='delete obsolete jobs', action='store_true', dest='delete_old', default=False,) + parser_update.add_argument('--workers', dest='n_workers', type=int, + default=1, help='number of workers to use, 0 ' + 'for autodetection and 1 for just one worker.') # subparser: test parser_test = subparser.add_parser('test', parents=[recursive_parser]) @@ -325,17 +328,23 @@ def execute(options, config): logger.info("Deleting all jobs") builder.delete_all_jobs() elif options.command == 'update': + if options.n_workers < 0: + raise JenkinsJobsException( + 'Number of workers must be equal or greater than 0') + logger.info("Updating jobs in {0} ({1})".format( options.path, options.names)) - jobs, num_updated_jobs = builder.update_job(options.path, - options.names) + jobs, num_updated_jobs = builder.update_jobs( + options.path, options.names, + n_workers=options.n_workers) logger.info("Number of jobs updated: %d", num_updated_jobs) if options.delete_old: num_deleted_jobs = builder.delete_old_managed() logger.info("Number of jobs deleted: %d", num_deleted_jobs) elif options.command == 'test': - builder.update_job(options.path, options.name, - output=options.output_dir) + builder.update_jobs(options.path, options.name, + output=options.output_dir, + n_workers=1) def version(): diff --git a/jenkins_jobs/parallel.py b/jenkins_jobs/parallel.py new file mode 100644 index 000000000..617c05e36 --- /dev/null +++ b/jenkins_jobs/parallel.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# Copyright (C) 2015 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Parallel execution helper functions and classes + +from functools import wraps +import logging +from multiprocessing import cpu_count +import threading +import traceback + +try: + import Queue as queue +except ImportError: + import queue + +logger = logging.getLogger(__name__) + + +class TaskFunc(dict): + """ + Simple class to wrap around the information needed to run a function. + """ + def __init__(self, n_ord, func, args=None, kwargs=None): + self['func'] = func + self['args'] = args or [] + self['kwargs'] = kwargs or {} + self['ord'] = n_ord + + +class Worker(threading.Thread): + """ + Class that actually does the work, gets a TaskFunc through the queue, + runs its function with the passed parameters and returns the result + If the string 'done' is passed instead of a TaskFunc instance, the thread + will end. + """ + def __init__(self, in_queue, out_queue): + threading.Thread.__init__(self) + self.in_queue = in_queue + self.out_queue = out_queue + + def run(self): + while True: + task = self.in_queue.get() + if task == 'done': + return + try: + res = task['func'](*task['args'], + **task['kwargs']) + except Exception as exc: + res = exc + traceback.print_exc() + self.out_queue.put((task['ord'], res)) + + +def parallelize(func): + @wraps(func) + def parallelized(*args, **kwargs): + """ + This function will spawn workers and run the decorated function in + parallel on the workers. It will not ensure the thread safety of the + decorated function (the decorated function should be thread safe by + itself). It accepts two special parameters: + + :arg list parallelize: list of the arguments to pass to each of the + runs, the results of each run will be returned in the same order. + :arg int n_workers: number of workers to use, by default and if '0' + passed will autodetect the number of cores and use that, if '1' + passed, it will not use any workers and just run as if were not + parallelized everything. + + Example: + + > @parallelize + > def sample(param1, param2, param3): + > return param1 + param2 + param3 + > + > sample('param1', param2='val2', + > parallelize=[ + > {'param3': 'val3'}, + > {'param3': 'val4'}, + > {'param3': 'val5'}, + > ]) + > + ['param1val2val3', 'param1val2val4', 'param1val2val5'] + + This will run the function `parallelized_function` 3 times, in + parallel (depending on the number of detected cores) and return an + array with the results of the executions in the same order the + parameters were passed. + """ + n_workers = kwargs.pop('n_workers', 0) + p_kwargs = kwargs.pop('parallelize', []) + # if only one parameter is passed inside the parallelize dict, run the + # original function as is, no need for pools + if len(p_kwargs) == 1: + kwargs.update(p_kwargs[0]) + if len(p_kwargs) in (1, 0): + return func(*args, **kwargs) + + # prepare the workers + # If no number of workers passed or passed 0 + if not n_workers: + n_workers = cpu_count() + logging.debug("Running parallel %d workers", n_workers) + worker_pool = [] + in_queue = queue.Queue() + out_queue = queue.Queue() + for n_worker in range(n_workers): + new_worker = Worker(in_queue, out_queue) + new_worker.setDaemon(True) + logging.debug("Spawning worker %d", n_worker) + new_worker.start() + worker_pool.append(new_worker) + + # Feed the workers + n_ord = 0 + for f_kwargs in p_kwargs: + f_kwargs.update(kwargs) + in_queue.put(TaskFunc(n_ord, func, args, f_kwargs)) + n_ord += 1 + for _ in range(n_workers): + in_queue.put('done') + + # Wait for the results + logging.debug("Waiting for workers to finish processing") + results = [] + for _ in p_kwargs: + new_res = out_queue.get() + results.append(new_res) + # cleanup + for worker in worker_pool: + worker.join() + # Reorder the results + results = [r[1] for r in sorted(results)] + logging.debug("Parallel task finished") + return results + return parallelized diff --git a/tests/cmd/subcommands/test_test.py b/tests/cmd/subcommands/test_test.py index 883d1df12..5a834c602 100644 --- a/tests/cmd/subcommands/test_test.py +++ b/tests/cmd/subcommands/test_test.py @@ -85,8 +85,8 @@ class TestTests(CmdTestsBase): args.output_dir = mock.Mock(wraps=io.BytesIO()) cmd.execute(args, self.config) # probably better to fail here - @mock.patch('jenkins_jobs.cmd.Builder.update_job') - def test_multi_path(self, update_job_mock): + @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') + def test_multi_path(self, update_jobs_mock): """ Run test mode and pass multiple paths. """ @@ -98,14 +98,15 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) self.assertEqual(args.path, path_list) - update_job_mock.assert_called_with(path_list, [], - output=args.output_dir) + update_jobs_mock.assert_called_with(path_list, [], + output=args.output_dir, + n_workers=mock.ANY) - @mock.patch('jenkins_jobs.cmd.Builder.update_job') + @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') @mock.patch('jenkins_jobs.cmd.os.path.isdir') @mock.patch('jenkins_jobs.cmd.os.walk') def test_recursive_multi_path(self, os_walk_mock, isdir_mock, - update_job_mock): + update_jobs_mock): """ Run test mode and pass multiple paths with recursive path option. """ @@ -125,20 +126,22 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) - update_job_mock.assert_called_with(paths, [], output=args.output_dir) + update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, + n_workers=mock.ANY) args = self.parser.parse_args(['test', multipath]) args.output_dir = mock.MagicMock() self.config.set('job_builder', 'recursive', 'True') cmd.execute(args, self.config) - update_job_mock.assert_called_with(paths, [], output=args.output_dir) + update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, + n_workers=mock.ANY) - @mock.patch('jenkins_jobs.cmd.Builder.update_job') + @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') @mock.patch('jenkins_jobs.cmd.os.path.isdir') @mock.patch('jenkins_jobs.cmd.os.walk') def test_recursive_multi_path_with_excludes(self, os_walk_mock, isdir_mock, - update_job_mock): + update_jobs_mock): """ Run test mode and pass multiple paths with recursive path option. """ @@ -160,7 +163,8 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) - update_job_mock.assert_called_with(paths, [], output=args.output_dir) + update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, + n_workers=mock.ANY) def test_console_output(self): """ diff --git a/tests/cmd/subcommands/test_update.py b/tests/cmd/subcommands/test_update.py index f5ee0c226..f9919d8bf 100644 --- a/tests/cmd/subcommands/test_update.py +++ b/tests/cmd/subcommands/test_update.py @@ -26,19 +26,19 @@ from tests.cmd.test_cmd import CmdTestsBase @mock.patch('jenkins_jobs.builder.Jenkins.get_plugins_info', mock.MagicMock) class UpdateTests(CmdTestsBase): - @mock.patch('jenkins_jobs.cmd.Builder.update_job') - def test_update_jobs(self, update_job_mock): + @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') + def test_update_jobs(self, update_jobs_mock): """ Test update_job is called """ # don't care about the value returned here - update_job_mock.return_value = ([], 0) + update_jobs_mock.return_value = ([], 0) path = os.path.join(self.fixtures_path, 'cmd-002.yaml') args = self.parser.parse_args(['update', path]) cmd.execute(args, self.config) - update_job_mock.assert_called_with([path], []) + update_jobs_mock.assert_called_with([path], [], n_workers=mock.ANY) @mock.patch('jenkins_jobs.builder.Jenkins.is_job', return_value=True) @mock.patch('jenkins_jobs.builder.Jenkins.get_jobs') @@ -88,7 +88,7 @@ class UpdateTests(CmdTestsBase): # mocks to call real methods on a the above test object. b_inst = builder_mock.return_value b_inst.plugins_list = builder_obj.plugins_list - b_inst.update_job.side_effect = builder_obj.update_job + b_inst.update_jobs.side_effect = builder_obj.update_jobs b_inst.delete_old_managed.side_effect = builder_obj.delete_old_managed def _get_jobs(): diff --git a/tests/parallel/__init__.py b/tests/parallel/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/parallel/test_parallel.py b/tests/parallel/test_parallel.py new file mode 100644 index 000000000..ae63da0ae --- /dev/null +++ b/tests/parallel/test_parallel.py @@ -0,0 +1,70 @@ +# Copyright 2015 David Caro +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +from multiprocessing import cpu_count + +from testtools import matchers +from testtools import TestCase + +from jenkins_jobs.parallel import parallelize +from tests.base import mock + + +class TestCaseParallel(TestCase): + def test_parallel_correct_order(self): + expected = list(range(10, 20)) + + @parallelize + def parallel_test(num_base, num_extra): + return num_base + num_extra + + parallel_args = [{'num_extra': num} for num in range(10)] + result = parallel_test(10, parallelize=parallel_args) + self.assertThat(result, matchers.Equals(expected)) + + def test_parallel_time_less_than_serial(self): + + @parallelize + def wait(secs): + time.sleep(secs) + + before = time.time() + # ten threads to make it as fast as possible + wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10) + after = time.time() + self.assertThat(after - before, matchers.LessThan(5)) + + def test_parallel_single_thread(self): + expected = list(range(10, 20)) + + @parallelize + def parallel_test(num_base, num_extra): + return num_base + num_extra + + parallel_args = [{'num_extra': num} for num in range(10)] + result = parallel_test(10, parallelize=parallel_args, n_workers=1) + self.assertThat(result, matchers.Equals(expected)) + + @mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count) + def test_use_auto_detect_cores(self, mockCpu_count): + + @parallelize + def parallel_test(): + return True + + result = parallel_test(parallelize=[{} for _ in range(10)], + n_workers=0) + self.assertThat(result, matchers.Equals([True for _ in range(10)])) + mockCpu_count.assert_called_once_with()