Assert problems in Glance raised by Bandit

Fix all assert problems raised by Bandit. Asserts are potentially
problematic, since Python optimization sometimes removes them, so code
needs to remain safe and functional without the assert.

Two asserts are safe to skip, so they are deleted for improved error
messages. Three asserts are probably necessary, and are converted to
exceptions. Two asserts are probably necessary, and are instead made to
fail safely, and `# nosec` is added to the assert line.

This also enables the assert test in bandit's configuration.

Change-Id: Ic69a204ceb15cac234c6b6bca3d950256a98016d
Partial-bug: 1511862
This commit is contained in:
Drew Varner 2015-11-03 13:39:04 -06:00 committed by Drew Varner
parent 625269f114
commit 62b5ebc718
11 changed files with 29 additions and 20 deletions

View File

@ -33,7 +33,7 @@ profiles:
include: include:
- any_other_function_with_shell_equals_true - any_other_function_with_shell_equals_true
# - assert_used # TODO: enable this test - assert_used
- blacklist_calls - blacklist_calls
- blacklist_import_func - blacklist_import_func

View File

@ -144,7 +144,6 @@ class ImagesController(object):
for change in changes: for change in changes:
change_method_name = '_do_%s' % change['op'] change_method_name = '_do_%s' % change['op']
assert hasattr(self, change_method_name)
change_method = getattr(self, change_method_name) change_method = getattr(self, change_method_name)
change_method(req, image, change) change_method(req, image, change)

View File

@ -114,7 +114,7 @@ class Controller(object):
:params excluded: List of methods to exclude. :params excluded: List of methods to exclude.
:params refiner: Callable to use as filter for methods. :params refiner: Callable to use as filter for methods.
:raises: AssertionError: If refiner is not callable. :raises TypeError: If refiner is not callable.
""" """
funcs = filter(lambda x: not x.startswith("_"), dir(resource)) funcs = filter(lambda x: not x.startswith("_"), dir(resource))
@ -126,7 +126,6 @@ class Controller(object):
funcs = [f for f in funcs if f not in excluded] funcs = [f for f in funcs if f not in excluded]
if refiner: if refiner:
assert callable(refiner), "Refiner must be callable"
funcs = filter(refiner, funcs) funcs = filter(refiner, funcs)
for name in funcs: for name in funcs:

View File

@ -163,8 +163,9 @@ class ImageRepo(object):
def get(self, image_id): def get(self, image_id):
try: try:
db_api_image = dict(self.db_api.image_get(self.context, image_id)) db_api_image = dict(self.db_api.image_get(self.context, image_id))
assert not db_api_image['deleted'] if db_api_image['deleted']:
except (exception.ImageNotFound, exception.Forbidden, AssertionError): raise exception.ImageNotFound()
except (exception.ImageNotFound, exception.Forbidden):
msg = _("No image found with ID %s") % image_id msg = _("No image found with ID %s") % image_id
raise exception.ImageNotFound(msg) raise exception.ImageNotFound(msg)
tags = self.db_api.image_tag_get_all(self.context, image_id) tags = self.db_api.image_tag_get_all(self.context, image_id)

View File

@ -330,17 +330,21 @@ def _paginate_query(query, model, limit, sort_keys, marker=None,
# the actual primary key, rather than assuming its id # the actual primary key, rather than assuming its id
LOG.warn(_LW('Id not in sort_keys; is sort_keys unique?')) LOG.warn(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs)) assert(not (sort_dir and sort_dirs)) # nosec
# nosec: This function runs safely if the assertion fails.
# Default the sort direction to ascending # Default the sort direction to ascending
if sort_dirs is None and sort_dir is None: if sort_dir is None:
sort_dir = 'asc' sort_dir = 'asc'
# Ensure a per-column sort direction # Ensure a per-column sort direction
if sort_dirs is None: if sort_dirs is None:
sort_dirs = [sort_dir for _sort_key in sort_keys] sort_dirs = [sort_dir] * len(sort_keys)
assert(len(sort_dirs) == len(sort_keys)) assert(len(sort_dirs) == len(sort_keys)) # nosec
# nosec: This function runs safely if the assertion fails.
if len(sort_dirs) < len(sort_keys):
sort_dirs += [sort_dir] * (len(sort_keys) - len(sort_dirs))
# Add sorting # Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):

View File

@ -328,14 +328,16 @@ def _get_all(context, session, filters=None, marker=None,
def _do_paginate_query(query, sort_keys=None, sort_dirs=None, def _do_paginate_query(query, sort_keys=None, sort_dirs=None,
marker=None, limit=None): marker=None, limit=None):
# Default the sort direction to ascending # Default the sort direction to ascending
if sort_dirs is None: sort_dir = 'asc'
sort_dir = 'asc'
# Ensure a per-column sort direction # Ensure a per-column sort direction
if sort_dirs is None: if sort_dirs is None:
sort_dirs = [sort_dir for _sort_key in sort_keys] sort_dirs = [sort_dir] * len(sort_keys)
assert(len(sort_dirs) == len(sort_keys)) assert(len(sort_dirs) == len(sort_keys)) # nosec
# nosec: This function runs safely if the assertion fails.
if len(sort_dirs) < len(sort_keys):
sort_dirs += [sort_dir] * (len(sort_keys) - len(sort_dirs))
# Add sorting # Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):

View File

@ -98,7 +98,9 @@ def legacy_parse_uri(uri, to_quote):
raise exception.BadStoreUri(message=reason) raise exception.BadStoreUri(message=reason)
pieces = urlparse.urlparse(uri) pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('swift', 'swift+http', 'swift+https') if pieces.scheme not in ('swift', 'swift+http', 'swift+https'):
raise exception.BadStoreUri(message="Unacceptable scheme: '%s'" %
pieces.scheme)
scheme = pieces.scheme scheme = pieces.scheme
netloc = pieces.netloc netloc = pieces.netloc
path = pieces.path.lstrip('/') path = pieces.path.lstrip('/')

View File

@ -162,7 +162,9 @@ def legacy_parse_uri(uri, to_quote):
raise exception.BadStoreUri(message=reason) raise exception.BadStoreUri(message=reason)
pieces = urlparse.urlparse(uri) pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('swift', 'swift+http', 'swift+https') if pieces.scheme not in ('swift', 'swift+http', 'swift+https'):
raise exception.BadStoreUri(message="Unacceptable scheme: '%s'" %
pieces.scheme)
scheme = pieces.scheme scheme = pieces.scheme
netloc = pieces.netloc netloc = pieces.netloc
path = pieces.path.lstrip('/') path = pieces.path.lstrip('/')

View File

@ -108,7 +108,7 @@ class TestRPCController(base.IsolatedUnitTest):
controller = rpc.Controller() controller = rpc.Controller()
# Not callable # Not callable
self.assertRaises(AssertionError, self.assertRaises(TypeError,
controller.register, controller.register,
res, refiner="get_all_images") res, refiner="get_all_images")

View File

@ -420,7 +420,7 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
invalid_scheme_uri = ('http://acct:usr:pass@example.com' invalid_scheme_uri = ('http://acct:usr:pass@example.com'
'/container/obj-id') '/container/obj-id')
self.assertRaises(AssertionError, self.assertRaises(exception.BadStoreUri,
legacy_parse_uri_fn, legacy_parse_uri_fn,
invalid_scheme_uri, invalid_scheme_uri,
True) True)

View File

@ -1899,8 +1899,8 @@ class TestImagesController(base.IsolatedUnitTest):
change = {'op': 'test', 'path': 'options', 'value': 'puts'} change = {'op': 'test', 'path': 'options', 'value': 'puts'}
try: try:
self.controller.update(request, UUID1, [change]) self.controller.update(request, UUID1, [change])
except AssertionError: except AttributeError:
pass # AssertionError is the desired behavior pass # AttributeError is the desired behavior
else: else:
self.fail('Failed to raise AssertionError on %s' % change) self.fail('Failed to raise AssertionError on %s' % change)