From f74ce34d36415368090bb00598d0068b0e73ccf2 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 5 May 2015 18:07:37 +0000 Subject: [PATCH] Revert "Added parallelization options" This reverts commit 7100a7f225e60936f23d24c757c1b14c9b136ec4. The referenced commit, in addition to what it advertises, also filters the result of update_jobs to include only changed jobs. If there are no change jobs, then, if the user specifies --delete-old, there are no jobs in the list of jobs to keep which comes from the result of update_jobs. In that case, all jobs are deleted. Or in other words, the logic in this change is that all jobs not updated are deleted. Proposing as a revert because this is currently causing serious problems for continuously deployed JJB instances, and the very common --delete-old code path should be properly tested with this change before it lands again. Change-Id: I98443f0c085e27ed8dfece6409434015ac24b306 --- doc/source/installation.rst | 7 -- jenkins_jobs/builder.py | 87 ++++------------- jenkins_jobs/cmd.py | 17 +--- jenkins_jobs/parallel.py | 151 ----------------------------- tests/cmd/subcommands/test_test.py | 26 +++-- tests/parallel/__init__.py | 0 tests/parallel/test_parallel.py | 74 -------------- 7 files changed, 32 insertions(+), 330 deletions(-) delete mode 100644 jenkins_jobs/parallel.py delete mode 100644 tests/parallel/__init__.py delete mode 100644 tests/parallel/test_parallel.py diff --git a/doc/source/installation.rst b/doc/source/installation.rst index 99ab24912..8cd3325c7 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -183,12 +183,6 @@ arguments after the job definition path. To update Foo1 and Foo2 run:: jenkins-jobs update /path/to/defs Foo1 Foo2 -You can also enable the parallel execution of the program passing the workers -option with a value of 0, 2, or higer. Use 0 to run as many workers as cores in -the host that runs it, and 2 or higher to specify the number of workers to use:: - - jenkins-jobs update --workers 0 /path/to/defs - Passing Multiple Paths ^^^^^^^^^^^^^^^^^^^^^^ It is possible to pass multiple paths to JJB using colons as a path separator on @@ -279,7 +273,6 @@ There is also a command to delete **all** jobs. jenkins-jobs delete-all - Globbed Parameters ^^^^^^^^^^^^^^^^^^ Jenkins job builder supports globbed parameters to identify jobs from a set of diff --git a/jenkins_jobs/builder.py b/jenkins_jobs/builder.py index 3d6838864..ccc5b1533 100644 --- a/jenkins_jobs/builder.py +++ b/jenkins_jobs/builder.py @@ -28,10 +28,8 @@ import jenkins import re from pprint import pformat import logging -import time from jenkins_jobs.constants import MAGIC_MANAGE_STRING -from jenkins_jobs.parallel import parallelize from jenkins_jobs.parser import YamlParser logger = logging.getLogger(__name__) @@ -143,7 +141,6 @@ class Jenkins(object): def __init__(self, url, user, password): self.jenkins = jenkins.Jenkins(url, user, password) - @parallelize def update_job(self, job_name, xml): if self.is_job(job_name): logger.info("Reconfiguring jenkins job {0}".format(job_name)) @@ -288,9 +285,8 @@ class Builder(object): logger.info("Removing jenkins job(s): %s" % ", ".join(jobs)) for job in jobs: self.jenkins.delete_job(job) - if self.cache.is_cached(job): + if(self.cache.is_cached(job)): self.cache.set(job, '') - self.cache.save() def delete_all_jobs(self): jobs = self.jenkins.get_jobs() @@ -298,23 +294,10 @@ class Builder(object): for job in jobs: self.delete_job(job['name']) - @parallelize - def changed(self, job): - md5 = job.md5() - changed = self.ignore_cache or self.cache.has_changed(job.name, md5) - if not changed: - logger.debug("'{0}' has not changed".format(job.name)) - return changed - - def update_jobs(self, input_fn, jobs_glob=None, output=None, - n_workers=None): - orig = time.time() + def update_job(self, input_fn, jobs_glob=None, output=None): self.load_files(input_fn) self.parser.expandYaml(jobs_glob) self.parser.generateXML() - step = time.time() - logging.debug('%d XML files generated in %ss', - len(self.parser.jobs), str(step - orig)) logger.info("Number of jobs generated: %d", len(self.parser.xml_jobs)) self.parser.xml_jobs.sort(key=operator.attrgetter('name')) @@ -328,8 +311,9 @@ class Builder(object): if not os.path.isdir(output): raise - if output: - for job in self.parser.xml_jobs: + updated_jobs = 0 + for job in self.parser.xml_jobs: + if output: if hasattr(output, 'write'): # `output` is a file-like object logger.info("Job name: %s", job.name) @@ -350,54 +334,17 @@ class Builder(object): f = open(output_fn, 'w') f.write(job.output()) f.close() - return self.parser.xml_jobs, len(self.parser.xml_jobs) + continue + md5 = job.md5() + if (self.jenkins.is_job(job.name) + and not self.cache.is_cached(job.name)): + old_md5 = self.jenkins.get_job_md5(job.name) + self.cache.set(job.name, old_md5) - # Filter out the jobs that did not change - logging.debug('Filtering %d jobs for changed jobs', - len(self.parser.xml_jobs)) - step = time.time() - jobs = [job for job in self.parser.xml_jobs - if self.changed(job)] - logging.debug("Filtered for changed jobs in %ss", - (time.time() - step)) - - if not jobs: - return [], 0 - - # Update the jobs - logging.debug('Updating jobs') - step = time.time() - p_params = [{'job': job} for job in jobs] - results = self.parallel_update_job( - n_workers=n_workers, - parallelize=p_params) - logging.debug("Parsing results") - # generalize the result parsing, as a parallelized job always returns a - # list - if len(p_params) in (1, 0): - results = [results] - for result in results: - if isinstance(result, Exception): - raise result + if self.cache.has_changed(job.name, md5) or self.ignore_cache: + self.jenkins.update_job(job.name, job.output()) + updated_jobs += 1 + self.cache.set(job.name, md5) else: - # update in-memory cache - j_name, j_md5 = result - self.cache.set(j_name, j_md5) - # write cache to disk - self.cache.save() - logging.debug("Updated %d jobs in %ss", - len(jobs), - time.time() - step) - logging.debug("Total run took %ss", (time.time() - orig)) - return jobs, len(jobs) - - @parallelize - def parallel_update_job(self, job): - self.jenkins.update_job(job.name, job.output()) - return (job.name, job.md5()) - - def update_job(self, input_fn, jobs_glob=None, output=None): - logging.warn('Current update_job function signature is deprecated and ' - 'will change in future versions to the signature of the ' - 'new parallel_update_job') - return self.update_jobs(input_fn, jobs_glob, output) + logger.debug("'{0}' has not changed".format(job.name)) + return self.parser.xml_jobs, updated_jobs diff --git a/jenkins_jobs/cmd.py b/jenkins_jobs/cmd.py index 3f66348b0..8c5dc1e03 100755 --- a/jenkins_jobs/cmd.py +++ b/jenkins_jobs/cmd.py @@ -105,9 +105,6 @@ def create_parser(): parser_update.add_argument('--delete-old', help='delete obsolete jobs', action='store_true', dest='delete_old', default=False,) - parser_update.add_argument('--workers', dest='n_workers', type=int, - default=1, help='number of workers to use, 0 ' - 'for autodetection and 1 for just one worker.') # subparser: test parser_test = subparser.add_parser('test', parents=[recursive_parser]) @@ -292,24 +289,18 @@ def execute(options, config): logger.info("Deleting all jobs") builder.delete_all_jobs() elif options.command == 'update': - if options.n_workers < 0: - raise JenkinsJobsException( - 'Number of workers must be equal or greater than 0') - logger.info("Updating jobs in {0} ({1})".format( options.path, options.names)) - jobs, num_updated_jobs = builder.update_jobs( - options.path, options.names, - n_workers=options.n_workers) + jobs, num_updated_jobs = builder.update_job(options.path, + options.names) logger.info("Number of jobs updated: %d", num_updated_jobs) if options.delete_old: num_deleted_jobs = builder.delete_old_managed( keep=[x.name for x in jobs]) logger.info("Number of jobs deleted: %d", num_deleted_jobs) elif options.command == 'test': - builder.update_jobs(options.path, options.name, - output=options.output_dir, - n_workers=1) + builder.update_job(options.path, options.name, + output=options.output_dir) def version(): diff --git a/jenkins_jobs/parallel.py b/jenkins_jobs/parallel.py deleted file mode 100644 index 100e6ba08..000000000 --- a/jenkins_jobs/parallel.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2012 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# Parallel execution helper functions and classes - -from functools import wraps -import logging -from multiprocessing import cpu_count -import threading -import traceback - -try: - import Queue as queue -except ImportError: - import queue - -logger = logging.getLogger(__name__) - - -class TaskFunc(dict): - """ - Simple class to wrap around the information needed to run a function. - """ - def __init__(self, n_ord, func, args=None, kwargs=None): - self['func'] = func - self['args'] = args or [] - self['kwargs'] = kwargs or {} - self['ord'] = n_ord - - -class Worker(threading.Thread): - """ - Class that actually does the work, gets a TaskFunc through the queue, - runs its function with the passed parameters and returns the result - If the string 'done' is passed instead of a TaskFunc instance, the thread - will end. - """ - def __init__(self, in_queue, out_queue): - threading.Thread.__init__(self) - self.in_queue = in_queue - self.out_queue = out_queue - - def run(self): - while True: - task = self.in_queue.get() - if task == 'done': - return - try: - res = task['func'](*task['args'], - **task['kwargs']) - except Exception as exc: - res = exc - traceback.print_exc() - self.out_queue.put((task['ord'], res)) - - -def parallelize(func): - @wraps(func) - def parallelized(*args, **kwargs): - """ - This function will spawn workers and run the decorated function in - parallel on the workers. It will not ensure the thread safety of the - decorated function (the decorated function should be thread safe by - itself). It accepts two special parameters: - - :arg list parallelize: list of the arguments to pass to each of the - runs, the results of each run will be returned in the same order. - :arg int n_workers: number of workers to use, by default and if '0' - passed will autodetect the number of cores and use that, if '1' - passed, it will not use any workers and just run as if were not - parallelized everything. - - Example: - - > @parallelize - > def sample(param1, param2, param3): - > return param1 + param2 + param3 - > - > sample('param1', param2='val2', - > parallelize=[ - > {'param3': 'val3'}, - > {'param3': 'val4'}, - > {'param3': 'val5'}, - > ]) - > - ['param1val2val3', 'param1val2val4', 'param1val2val5'] - - This will run the function `parallelized_function` 3 times, in - parallel (depending on the number of detected cores) and return an - array with the results of the executions in the same order the - parameters were passed. - """ - n_workers = kwargs.pop('n_workers', 0) - p_kwargs = kwargs.pop('parallelize', []) - # if only one parameter is passed inside the parallelize dict, run the - # original function as is, no need for pools - if len(p_kwargs) == 1: - kwargs.update(p_kwargs[0]) - if len(p_kwargs) in (1, 0): - return func(*args, **kwargs) - - # prepare the workers - # If no number of workers passed or passed 0 - if not n_workers: - n_workers = cpu_count() - logging.debug("Running parallel %d workers", n_workers) - worker_pool = [] - in_queue = queue.Queue() - out_queue = queue.Queue() - for n_worker in range(n_workers): - new_worker = Worker(in_queue, out_queue) - new_worker.setDaemon(True) - logging.debug("Spawning worker %d", n_worker) - new_worker.start() - worker_pool.append(new_worker) - - # Feed the workers - n_ord = 0 - for f_kwargs in p_kwargs: - f_kwargs.update(kwargs) - in_queue.put(TaskFunc(n_ord, func, args, f_kwargs)) - n_ord += 1 - for _ in range(n_workers): - in_queue.put('done') - - # Wait for the results - logging.debug("Waiting for workers to finish processing") - results = [] - for _ in p_kwargs: - new_res = out_queue.get() - results.append(new_res) - # cleanup - for worker in worker_pool: - worker.join() - # Reorder the results - results = [r[1] for r in sorted(results)] - logging.debug("Parallel task finished") - return results - return parallelized diff --git a/tests/cmd/subcommands/test_test.py b/tests/cmd/subcommands/test_test.py index 4ad24df44..5e38d6cae 100644 --- a/tests/cmd/subcommands/test_test.py +++ b/tests/cmd/subcommands/test_test.py @@ -84,8 +84,8 @@ class TestTests(CmdTestsBase): args.output_dir = mock.MagicMock() cmd.execute(args, self.config) # probably better to fail here - @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') - def test_multi_path(self, update_jobs_mock): + @mock.patch('jenkins_jobs.cmd.Builder.update_job') + def test_multi_path(self, update_job_mock): """ Run test mode and pass multiple paths. """ @@ -97,15 +97,14 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) self.assertEqual(args.path, path_list) - update_jobs_mock.assert_called_with(path_list, [], - output=args.output_dir, - n_workers=mock.ANY) + update_job_mock.assert_called_with(path_list, [], + output=args.output_dir) - @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') + @mock.patch('jenkins_jobs.cmd.Builder.update_job') @mock.patch('jenkins_jobs.cmd.os.path.isdir') @mock.patch('jenkins_jobs.cmd.os.walk') def test_recursive_multi_path(self, os_walk_mock, isdir_mock, - update_jobs_mock): + update_job_mock): """ Run test mode and pass multiple paths with recursive path option. """ @@ -125,21 +124,19 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) - update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, - n_workers=mock.ANY) + update_job_mock.assert_called_with(paths, [], output=args.output_dir) args = self.parser.parse_args(['test', multipath]) self.config.set('job_builder', 'recursive', 'True') cmd.execute(args, self.config) - update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, - n_workers=mock.ANY) + update_job_mock.assert_called_with(paths, [], output=args.output_dir) - @mock.patch('jenkins_jobs.cmd.Builder.update_jobs') + @mock.patch('jenkins_jobs.cmd.Builder.update_job') @mock.patch('jenkins_jobs.cmd.os.path.isdir') @mock.patch('jenkins_jobs.cmd.os.walk') def test_recursive_multi_path_with_excludes(self, os_walk_mock, isdir_mock, - update_jobs_mock): + update_job_mock): """ Run test mode and pass multiple paths with recursive path option. """ @@ -161,8 +158,7 @@ class TestTests(CmdTestsBase): cmd.execute(args, self.config) - update_jobs_mock.assert_called_with(paths, [], output=args.output_dir, - n_workers=mock.ANY) + update_job_mock.assert_called_with(paths, [], output=args.output_dir) def test_console_output(self): """ diff --git a/tests/parallel/__init__.py b/tests/parallel/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/parallel/test_parallel.py b/tests/parallel/test_parallel.py deleted file mode 100644 index 78b601ed2..000000000 --- a/tests/parallel/test_parallel.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright 2014 David Caro -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from testtools import ( - TestCase, -) -from testtools.matchers import ( - Equals, - LessThan, -) -from jenkins_jobs.parallel import parallelize -import time -from mock import patch -from multiprocessing import cpu_count - - -class TestCaseParallel(TestCase): - def test_parallel_correct_order(self): - expected = list(range(10, 20)) - - @parallelize - def parallel_test(num_base, num_extra): - return num_base + num_extra - - parallel_args = [{'num_extra': num} for num in range(10)] - result = parallel_test(10, parallelize=parallel_args) - self.assertThat(result, Equals(expected)) - - def test_parallel_time_less_than_serial(self): - - @parallelize - def wait(secs): - time.sleep(secs) - - before = time.time() - # ten threads to make it as fast as possible - wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10) - after = time.time() - self.assertThat(after - before, LessThan(5)) - - def test_parallel_single_thread(self): - expected = list(range(10, 20)) - - @parallelize - def parallel_test(num_base, num_extra): - return num_base + num_extra - - parallel_args = [{'num_extra': num} for num in range(10)] - result = parallel_test(10, parallelize=parallel_args, n_workers=1) - self.assertThat(result, Equals(expected)) - - @patch('multiprocessing.cpu_count') - def test_use_auto_detect_cores(self, mockCpu_count): - - @parallelize - def parallel_test(): - return True - - mockCpu_count.return_value = cpu_count() - result = parallel_test(parallelize=[{} for _ in range(10)], - n_workers=0) - self.assertThat(result, Equals([True for _ in range(10)])) - mockCpu_count.assert_called()