#!/usr/bin/env python # Copyright (C) 2015 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # Manage Jenkins plugin module registry. import inspect import logging import operator import packaging.version import pkg_resources import sys import types from six import PY2 from jenkins.plugins import PluginVersion from jenkins_jobs.errors import JenkinsJobsException __all__ = ["ModuleRegistry"] logger = logging.getLogger(__name__) getargspec = inspect.getargspec if PY2 else inspect.getfullargspec class ModuleRegistry(object): _entry_points_cache = {} _component_type_cache = {} def __init__(self, jjb_config, plugins_list=None): self.modules = [] self.modules_by_component_type = {} self.handlers = {} self.jjb_config = jjb_config self.masked_warned = {} self._macros = {} if plugins_list is None: self._plugin_version = {} else: # PluginVersion by short and long plugin name. self._plugin_version = self._get_plugins_versions(plugins_list) for entrypoint in pkg_resources.iter_entry_points(group="jenkins_jobs.modules"): Mod = entrypoint.load() mod = Mod(self) self.modules.append(mod) self.modules.sort(key=operator.attrgetter("sequence")) if mod.component_type is not None: self.modules_by_component_type[mod.component_type] = entrypoint @staticmethod def _get_plugins_versions(plugins_list): plugin_version = {} for plugin_info in plugins_list: short_name = plugin_info.get("shortName") long_name = plugin_info.get("longName") version = plugin_info["version"] if version == None: # noqa: E711; should call PluginInfo.__eq__. # Ensure that plugin_info always has version and it is instance of PluginVersion. plugin_info["version"] = str(sys.maxsize) version = plugin_info["version"] try: packaging.version.Version(version) except packaging.version.InvalidVersion: plugin_name = short_name or long_name if plugin_name: logger.warning( "Version %s for plugin %s does not conform to PEP440", version, plugin_name, ) else: logger.warning("Version %s does not conform to PEP440", version) if short_name: plugin_version[short_name] = version if long_name: plugin_version[long_name] = version return plugin_version @staticmethod def _filter_kwargs(func, **kwargs): arg_spec = getargspec(func) for name in list(kwargs.keys()): if name not in arg_spec.args: del kwargs[name] return kwargs def get_plugin_version(self, plugin_name, alt_plugin_name=None, default=None): """Provide plugin version to be used from a module's impl of Base.gen_xml. The return value is a plugin version obtained directly from a running Jenkins instance. This allows module authors to differentiate generated XML output based on it. :arg str plugin_name: Either the shortName or longName of a plugin as seen in a query that looks like: ``http:///pluginManager/api/json?pretty&depth=2`` :arg str alt_plugin_name: Alternative plugin name. Used if plugin_name is missing in plugin list. :arg str default: Default value. Used if plugin name is missing in plugin list. During a 'test' run, it is possible to override JJB's query to a live Jenkins instance by passing it a path to a file containing a YAML list of dictionaries that mimics the plugin properties you want your test output to reflect:: jenkins-jobs test -p /path/to/plugins-info.yaml Below is example YAML that might be included in /path/to/plugins-info.yaml. .. literalinclude:: /../../tests/cmd/fixtures/plugins-info.yaml """ try: return self._plugin_version[plugin_name] except KeyError: pass if alt_plugin_name: try: return self._plugin_version[alt_plugin_name] except KeyError: pass if default is not None: return PluginVersion(default) # Assume latest version of plugin is preferred config format. return PluginVersion(str(sys.maxsize)) def registerHandler(self, category, name, method): cat_dict = self.handlers.get(category, {}) if not cat_dict: self.handlers[category] = cat_dict cat_dict[name] = method def getHandler(self, category, name): return self.handlers[category][name] @property def macros(self): return self._macros def set_macros(self, macros): self._macros = macros def amend_job_dicts(self, job_data_list): while True: changed = False for job in job_data_list: for module in self.modules: if module.amend_job_dict(job.data): changed = True if not changed: break def get_component_list_type(self, entry_point): if entry_point in self._component_type_cache: return self._component_type_cache[entry_point] # pkg_resources.EntryPoint.load() is costly, cache it. component_list_type = entry_point.load().component_list_type logging.info("Caching type %s of %s", component_list_type, entry_point) self._component_type_cache[entry_point] = component_list_type return component_list_type def dispatch( self, component_type, xml_parent, component, template_data={}, job_data=None, component_pos=None, ): """This is a method that you can call from your implementation of Base.gen_xml or component. It allows modules to define a type of component, and benefit from extensibility via Python entry points and Jenkins Job Builder :ref:`Macros `. :arg str component_type: the name of the component (e.g., `builder`) :arg xml_parent: the parent XML element :arg component: component definition :arg dict template_data: values that should be interpolated into the component definition :arg dict job_data: full job definition See :py:class:`jenkins_jobs.modules.base.Base` for how to register components of a module. See the Publishers module for a simple example of how to use this method. """ if component_type not in self.modules_by_component_type: raise JenkinsJobsException( "Unknown component type: " "'{0}'.".format(component_type) ) entry_point = self.modules_by_component_type[component_type] component_list_type = self.get_component_list_type(entry_point) if isinstance(component, dict): # The component is a singleton dictionary of name: dict(args) name, component_data = next(iter(component.items())) if template_data: paramdict = {} paramdict.update(template_data) paramdict.update(job_data or {}) else: # The component is a simple string name, eg "run-tests" name = component component_data = {} # Look for a component function defined in an entry point eps = self._entry_points_cache.get(component_list_type) if eps is None: eps = self._load_eps(component_list_type, component_type, entry_point, name) macro_dict = self.macros.get(component_type, {}) macro = macro_dict.get(name) if macro: try: self._dispatch_macro( component_data, component_type, eps, job_data, macro, name, xml_parent, ) except JenkinsJobsException as x: if component_pos is not None: raise x.with_context( f"While expanding {component_type} macro call {name!r}", pos=component_pos, ) else: raise elif name in eps: try: func = eps[name] kwargs = self._filter_kwargs(func, job_data=job_data) func(self, xml_parent, component_data, **kwargs) except JenkinsJobsException as x: raise x.with_context( f"In {component_type} {name!r}", pos=component.pos, ) else: raise JenkinsJobsException( "Unknown entry point or macro '{0}' " "for component type: '{1}'.".format(name, component_type) ) def _dispatch_macro( self, component_data, component_type, eps, job_data, macro, name, xml_parent ): if name in eps and name not in self.masked_warned: self.masked_warned[name] = True logger.warning( "You have a macro ('%s') defined for '%s' " "component type that is masking an inbuilt " "definition" % (name, component_type) ) if component_data is None: component_data = {} params = {**component_data, **(job_data or {})} macro.dispatch_elements(self, xml_parent, component_data, job_data, params) def _load_eps(self, component_list_type, component_type, entry_point, name): logging.debug("Caching entrypoints for %s" % component_list_type) module_eps = [] # auto build entry points by inferring from base component_types mod = pkg_resources.EntryPoint( "__all__", entry_point.module_name, dist=entry_point.dist ) Mod = mod.load() func_eps = [ Mod.__dict__.get(a) for a in dir(Mod) if isinstance(Mod.__dict__.get(a), types.FunctionType) ] for func_ep in func_eps: try: # extract entry point based on docstring name_line = func_ep.__doc__.split("\n") if not name_line[0].startswith("yaml:"): logger.debug("Ignoring '%s' as an entry point" % name_line) continue ep_name = name_line[0].split(" ")[1] except (AttributeError, IndexError): # AttributeError by docstring not being defined as # a string to have split called on it. # IndexError raised by name_line not containing anything # after the 'yaml:' string. logger.debug( "Not including func '%s' as an entry point" % func_ep.__name__ ) continue module_eps.append( pkg_resources.EntryPoint( ep_name, entry_point.module_name, dist=entry_point.dist, attrs=(func_ep.__name__,), ) ) logger.debug( "Adding auto EP '%s=%s:%s'" % (ep_name, entry_point.module_name, func_ep.__name__) ) # load from explicitly defined entry points module_eps.extend( list( pkg_resources.iter_entry_points( group="jenkins_jobs.{0}".format(component_list_type) ) ) ) eps = {} for module_ep in module_eps: if module_ep.name in eps: raise JenkinsJobsException( "Duplicate entry point found for component type: " "'{0}', '{0}'," "name: '{1}'".format(component_type, name) ) eps[module_ep.name] = module_ep.load() # cache both sets of entry points self._entry_points_cache[component_list_type] = eps logger.debug("Cached entry point group %s = %s", component_list_type, eps) return eps