requirements/openstack_requirements/requirement.py
Dirk Mueller 4153903939 Add policy check on global-requirements.txt entries
We want to ensure that every entry in global-requirements has
at least a minimum version specified, and if there is an exclude,
the exclude is not outside the lower bounds.

Change-Id: Ie12d99d3317bd4c81de9a47a43fa99345b2b9664
2017-09-15 20:49:33 +00:00

239 lines
8.7 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Requirements handling."""
# This module has no IO at all, and none should be added.
import collections
import distutils.version
import packaging.specifiers
import pkg_resources
import re
# A header for the requirements file(s).
# TODO(lifeless): Remove this once constraints are in use.
_REQS_HEADER = [
'# The order of packages is significant, because pip processes '
'them in the order\n',
'# of appearance. Changing the order has an impact on the overall '
'integration\n',
'# process, which may cause wedges in the gate later.\n',
]
def cmp_specifier(a, b):
weight = {'>=': 0, '>': 0,
'==': 1, '~=': 1, '!=': 1,
'<': 2, '<=': 2}
a = a._spec
b = b._spec
wa, wb = weight[a[0]], weight[b[0]]
res = cmp(wa, wb)
if res != 0:
return res
else:
return cmp(distutils.version.LooseVersion(a[1]),
distutils.version.LooseVersion(b[1]))
class Requirement(collections.namedtuple('Requirement',
['package', 'location', 'specifiers',
'markers', 'comment', 'extras'])):
def __new__(cls, package, location, specifiers, markers, comment,
extras=None):
return super(Requirement, cls).__new__(
cls, package, location, specifiers, markers, comment,
frozenset(extras or ()))
def to_line(self, marker_sep=';', line_prefix='', comment_prefix=' ',
sort_specifiers=False):
comment_p = comment_prefix if self.package else ''
comment = (comment_p + self.comment if self.comment else '')
marker = marker_sep + self.markers if self.markers else ''
package = line_prefix + self.package if self.package else ''
location = self.location + '#egg=' if self.location else ''
extras = '[%s]' % ",".join(sorted(self.extras)) if self.extras else ''
specifiers = self.specifiers
if sort_specifiers:
_specifiers = packaging.specifiers.SpecifierSet(specifiers)
_specifiers = ['%s' % s for s in sorted(_specifiers,
cmp=cmp_specifier)]
specifiers = ','.join(_specifiers)
return '%s%s%s%s%s%s\n' % (location,
package,
extras,
specifiers,
marker,
comment)
Requirements = collections.namedtuple('Requirements', ['reqs'])
url_re = re.compile(
'^(?P<url>\s*(?:-e\s)?\s*(?:(?:[a-z]+\+)?(?:[a-z]+))://[^#]*)'
'#egg=(?P<name>[-\.\w]+)')
def canonical_name(req_name):
"""Return the canonical form of req_name."""
return pkg_resources.safe_name(req_name).lower()
def parse(content, permit_urls=False):
return to_dict(to_reqs(content, permit_urls=permit_urls))
def parse_line(req_line, permit_urls=False):
"""Parse a single line of a requirements file.
requirements files here are a subset of pip requirements files: we don't
try to parse URL entries, or pip options like -f and -e. Those are not
permitted in global-requirements.txt. If encountered in a synchronised
file such as requirements.txt or test-requirements.txt, they are illegal
but currently preserved as-is.
They may of course be used by local test configurations, just not
committed into the OpenStack reference branches.
:param permit_urls: If True, urls are parsed into Requirement tuples.
By default they are not, because they cannot be reflected into
setuptools kwargs, and thus the default is conservative. When
urls are permitted, -e *may* be supplied at the start of the line.
"""
end = len(req_line)
hash_pos = req_line.find('#')
if hash_pos < 0:
hash_pos = end
# Don't find urls that are in comments.
if '://' in req_line[:hash_pos]:
if permit_urls:
# We accept only a subset of urls here - they have to have an egg
# name so that we can tell what project its for without doing
# network access. Egg markers use a fragment, so we need to pull
# out url from the entire line.
m = url_re.match(req_line)
name = m.group('name')
location = m.group('url')
parse_start = m.end('name')
hash_pos = req_line[parse_start:].find('#')
if hash_pos < 0:
hash_pos = end
else:
hash_pos = hash_pos + parse_start
else:
# Trigger an early failure before we look for ':'
pkg_resources.Requirement.parse(req_line)
else:
parse_start = 0
location = ''
semi_pos = req_line.find(';', parse_start, hash_pos)
colon_pos = req_line.find(':', parse_start, hash_pos)
marker_pos = max(semi_pos, colon_pos)
if marker_pos < 0:
marker_pos = hash_pos
markers = req_line[marker_pos + 1:hash_pos].strip()
if hash_pos != end:
comment = req_line[hash_pos:]
else:
comment = ''
req_line = req_line[parse_start:marker_pos]
extras = ()
if parse_start:
# We parsed a url before
specifier = ''
elif req_line:
# Pulled out a requirement
parsed = pkg_resources.Requirement.parse(req_line)
name = parsed.project_name
extras = parsed.extras
specifier = str(parsed.specifier)
else:
# Comments / blank lines etc.
name = ''
specifier = ''
return Requirement(name, location, specifier, markers, comment, extras)
def to_content(reqs, marker_sep=';', line_prefix='', prefix=True):
lines = []
if prefix:
lines += _REQS_HEADER
for req in reqs.reqs:
lines.append(req.to_line(marker_sep, line_prefix))
return u''.join(lines)
def to_dict(req_sequence):
reqs = dict()
for req, req_line in req_sequence:
if req is not None:
key = canonical_name(req.package)
reqs.setdefault(key, []).append((req, req_line))
return reqs
def _pass_through(req_line, permit_urls=False):
"""Identify unparsable lines."""
if permit_urls:
return (req_line.startswith('http://tarballs.openstack.org/') or
req_line.startswith('-f'))
else:
return (req_line.startswith('http://tarballs.openstack.org/') or
req_line.startswith('-e') or
req_line.startswith('-f'))
def to_reqs(content, permit_urls=False):
for content_line in content.splitlines(True):
req_line = content_line.strip()
if _pass_through(req_line, permit_urls=permit_urls):
yield None, content_line
else:
yield parse_line(req_line, permit_urls=permit_urls), content_line
def check_reqs_bounds_policy(global_reqs):
"""Check that the global requirement version specifiers match the policy.
The policy is defined as
* There needs to be exactly one lower bound (>=1.2 defined)
* There can be one or more excludes (!=1.2.1, !=1.2.2)
* TODO: Clarify (non-) existance of upper caps
"""
for pkg_requirement in global_reqs.values():
req = pkg_requirement[0][0]
if req.package:
_specifiers = packaging.specifiers.SpecifierSet(req.specifiers)
lower_bound = set()
for spec in _specifiers:
if spec.operator == '>=':
lower_bound.add(spec)
if len(lower_bound) < 1:
yield ('Requirement %s needs a >= specifier' % req.package)
elif len(lower_bound) > 1:
yield ('Requirement %s has multiple >= specifier' %
req.package)
else:
lower_bound = lower_bound.pop()
for spec in _specifiers:
if spec.operator == '!=':
if not lower_bound.contains(spec.version):
yield('Requirement %s has a !=%s specifier '
'that is not >=%s' % (req.package,
spec.version,
lower_bound.version))