Merge "Add sign-the-data signature verification"
This commit is contained in:
commit
cfae7533a3
@ -31,7 +31,7 @@ from oslo_serialization import base64
|
|||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
|
|
||||||
from glance.common import exception
|
from glance.common import exception
|
||||||
from glance.i18n import _LE
|
from glance.i18n import _, _LE
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -70,6 +70,13 @@ MASK_GEN_ALGORITHMS = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Required image property names
|
# Required image property names
|
||||||
|
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
|
||||||
|
'img_signature',
|
||||||
|
'img_signature_hash_method',
|
||||||
|
'img_signature_key_type',
|
||||||
|
'img_signature_certificate_uuid'
|
||||||
|
)
|
||||||
|
|
||||||
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
|
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
|
||||||
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
|
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
|
||||||
'signature',
|
'signature',
|
||||||
@ -135,6 +142,70 @@ KEY_TYPE_METHODS = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def should_create_verifier(image_properties):
|
||||||
|
"""Determine whether a verifier should be created.
|
||||||
|
|
||||||
|
Using the image properties, determine whether existing properties indicate
|
||||||
|
that signature verification should be done.
|
||||||
|
|
||||||
|
:param image_properties: the key-value properties about the image
|
||||||
|
:return: True, if signature metadata properties exist, False otherwise
|
||||||
|
"""
|
||||||
|
return (image_properties is not None and
|
||||||
|
CERT_UUID in image_properties and
|
||||||
|
HASH_METHOD in image_properties and
|
||||||
|
SIGNATURE in image_properties and
|
||||||
|
KEY_TYPE in image_properties)
|
||||||
|
|
||||||
|
|
||||||
|
def get_verifier(context, image_properties):
|
||||||
|
"""Retrieve the image properties and use them to create a verifier.
|
||||||
|
|
||||||
|
:param context: the user context for authentication
|
||||||
|
:param image_properties: the key-value properties about the image
|
||||||
|
:return: instance of cryptography AsymmetricVerificationContext
|
||||||
|
:raises glance.common.exception.SignatureVerificationError: if building
|
||||||
|
the verifier fails
|
||||||
|
"""
|
||||||
|
if not should_create_verifier(image_properties):
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
_('Required image properties for signature verification do not'
|
||||||
|
' exist. Cannot verify signature.')
|
||||||
|
)
|
||||||
|
|
||||||
|
signature = get_signature(image_properties[SIGNATURE])
|
||||||
|
hash_method = get_hash_method(image_properties[HASH_METHOD])
|
||||||
|
signature_key_type = get_signature_key_type(
|
||||||
|
image_properties[KEY_TYPE])
|
||||||
|
public_key = get_public_key(context,
|
||||||
|
image_properties[CERT_UUID],
|
||||||
|
signature_key_type)
|
||||||
|
|
||||||
|
# create the verifier based on the signature key type
|
||||||
|
try:
|
||||||
|
verifier = KEY_TYPE_METHODS[signature_key_type](signature,
|
||||||
|
hash_method,
|
||||||
|
public_key,
|
||||||
|
image_properties)
|
||||||
|
except crypto_exception.UnsupportedAlgorithm as e:
|
||||||
|
msg = (_LE("Unable to create verifier since algorithm is "
|
||||||
|
"unsupported: %(e)s")
|
||||||
|
% {'e': encodeutils.exception_to_unicode(e)})
|
||||||
|
LOG.error(msg)
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
_('Unable to verify signature since the algorithm is unsupported '
|
||||||
|
'on this system')
|
||||||
|
)
|
||||||
|
|
||||||
|
if verifier:
|
||||||
|
return verifier
|
||||||
|
else:
|
||||||
|
# Error creating the verifier
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
_('Error occurred while creating the verifier')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@debtcollector.removals.remove(message="This will be removed in the N cycle.")
|
@debtcollector.removals.remove(message="This will be removed in the N cycle.")
|
||||||
def should_verify_signature(image_properties):
|
def should_verify_signature(image_properties):
|
||||||
"""Determine whether a signature should be verified.
|
"""Determine whether a signature should be verified.
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
import collections
|
import collections
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
|
from cryptography import exceptions as crypto_exception
|
||||||
import debtcollector
|
import debtcollector
|
||||||
import glance_store as store
|
import glance_store as store
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -391,16 +392,40 @@ class ImageProxy(glance.domain.proxy.Image):
|
|||||||
def set_data(self, data, size=None):
|
def set_data(self, data, size=None):
|
||||||
if size is None:
|
if size is None:
|
||||||
size = 0 # NOTE(markwash): zero -> unknown size
|
size = 0 # NOTE(markwash): zero -> unknown size
|
||||||
|
|
||||||
|
# Create the verifier for signature verification (if correct properties
|
||||||
|
# are present)
|
||||||
|
if (signature_utils.should_create_verifier(
|
||||||
|
self.image.extra_properties)):
|
||||||
|
# NOTE(bpoulos): if creating verifier fails, exception will be
|
||||||
|
# raised
|
||||||
|
verifier = signature_utils.get_verifier(
|
||||||
|
self.context, self.image.extra_properties)
|
||||||
|
else:
|
||||||
|
verifier = None
|
||||||
|
|
||||||
location, size, checksum, loc_meta = self.store_api.add_to_backend(
|
location, size, checksum, loc_meta = self.store_api.add_to_backend(
|
||||||
CONF,
|
CONF,
|
||||||
self.image.image_id,
|
self.image.image_id,
|
||||||
utils.LimitingReader(utils.CooperativeReader(data),
|
utils.LimitingReader(utils.CooperativeReader(data),
|
||||||
CONF.image_size_cap),
|
CONF.image_size_cap),
|
||||||
size,
|
size,
|
||||||
context=self.context)
|
context=self.context,
|
||||||
|
verifier=verifier)
|
||||||
|
|
||||||
self._verify_signature_if_needed(checksum)
|
self._verify_signature_if_needed(checksum)
|
||||||
|
|
||||||
|
# NOTE(bpoulos): if verification fails, exception will be raised
|
||||||
|
if verifier:
|
||||||
|
try:
|
||||||
|
verifier.verify()
|
||||||
|
LOG.info(_LI("Successfully verified signature for image %s"),
|
||||||
|
self.image.image_id)
|
||||||
|
except crypto_exception.InvalidSignature:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
_('Signature verification failed')
|
||||||
|
)
|
||||||
|
|
||||||
self.image.locations = [{'url': location, 'metadata': loc_meta,
|
self.image.locations = [{'url': location, 'metadata': loc_meta,
|
||||||
'status': 'active'}]
|
'status': 'active'}]
|
||||||
self.image.size = size
|
self.image.size = size
|
||||||
|
@ -33,6 +33,14 @@ TEST_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3,
|
|||||||
key_size=1024,
|
key_size=1024,
|
||||||
backend=default_backend())
|
backend=default_backend())
|
||||||
|
|
||||||
|
# Required image property names
|
||||||
|
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
|
||||||
|
signature_utils.SIGNATURE,
|
||||||
|
signature_utils.HASH_METHOD,
|
||||||
|
signature_utils.KEY_TYPE,
|
||||||
|
signature_utils.CERT_UUID
|
||||||
|
)
|
||||||
|
|
||||||
# Required image property names
|
# Required image property names
|
||||||
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
|
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
|
||||||
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
|
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
|
||||||
@ -330,6 +338,130 @@ class TestSignatureUtils(test_utils.BaseTestCase):
|
|||||||
signature_utils.verify_signature,
|
signature_utils.verify_signature,
|
||||||
None, checksum_hash, image_properties)
|
None, checksum_hash, image_properties)
|
||||||
|
|
||||||
|
def test_should_create_verifier(self):
|
||||||
|
image_props = {CERT_UUID: 'CERT_UUID',
|
||||||
|
HASH_METHOD: 'HASH_METHOD',
|
||||||
|
SIGNATURE: 'SIGNATURE',
|
||||||
|
KEY_TYPE: 'SIG_KEY_TYPE'}
|
||||||
|
self.assertTrue(signature_utils.should_create_verifier(image_props))
|
||||||
|
|
||||||
|
def test_should_create_verifier_fail(self):
|
||||||
|
bad_image_properties = [{CERT_UUID: 'CERT_UUID',
|
||||||
|
HASH_METHOD: 'HASH_METHOD',
|
||||||
|
SIGNATURE: 'SIGNATURE'},
|
||||||
|
{CERT_UUID: 'CERT_UUID',
|
||||||
|
HASH_METHOD: 'HASH_METHOD',
|
||||||
|
KEY_TYPE: 'SIG_KEY_TYPE'},
|
||||||
|
{CERT_UUID: 'CERT_UUID',
|
||||||
|
SIGNATURE: 'SIGNATURE',
|
||||||
|
KEY_TYPE: 'SIG_KEY_TYPE'},
|
||||||
|
{HASH_METHOD: 'HASH_METHOD',
|
||||||
|
SIGNATURE: 'SIGNATURE',
|
||||||
|
KEY_TYPE: 'SIG_KEY_TYPE'}]
|
||||||
|
|
||||||
|
for bad_props in bad_image_properties:
|
||||||
|
result = signature_utils.should_create_verifier(bad_props)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
|
||||||
|
"SHA-2 hash algorithms not supported by backend")
|
||||||
|
@mock.patch('glance.common.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_PSS(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
|
||||||
|
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
|
||||||
|
signer = TEST_PRIVATE_KEY.signer(
|
||||||
|
padding.PSS(
|
||||||
|
mgf=padding.MGF1(hash_alg),
|
||||||
|
salt_length=padding.PSS.MAX_LENGTH
|
||||||
|
),
|
||||||
|
hash_alg
|
||||||
|
)
|
||||||
|
signer.update(data)
|
||||||
|
signature = base64.b64encode(signer.finalize())
|
||||||
|
image_props = {CERT_UUID:
|
||||||
|
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
|
||||||
|
HASH_METHOD: hash_name,
|
||||||
|
KEY_TYPE: 'RSA-PSS',
|
||||||
|
SIGNATURE: signature}
|
||||||
|
verifier = signature_utils.get_verifier(None, image_props)
|
||||||
|
verifier.update(data)
|
||||||
|
verifier.verify()
|
||||||
|
|
||||||
|
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
|
||||||
|
"SHA-2 hash algorithms not supported by backend")
|
||||||
|
@mock.patch('glance.common.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_bad_signature(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
|
||||||
|
image_properties = {CERT_UUID:
|
||||||
|
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
|
||||||
|
HASH_METHOD: 'SHA-256',
|
||||||
|
KEY_TYPE: 'RSA-PSS',
|
||||||
|
SIGNATURE: 'BLAH'}
|
||||||
|
verifier = signature_utils.get_verifier(None, image_properties)
|
||||||
|
verifier.update(data)
|
||||||
|
self.assertRaises(crypto_exception.InvalidSignature,
|
||||||
|
verifier.verify)
|
||||||
|
|
||||||
|
@mock.patch('glance.common.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_unsupported_algorithm(self,
|
||||||
|
mock_get_pub_key):
|
||||||
|
public_key = TEST_PRIVATE_KEY.public_key()
|
||||||
|
public_key.verifier = mock.MagicMock(
|
||||||
|
side_effect=crypto_exception.UnsupportedAlgorithm(
|
||||||
|
"When OpenSSL is older than 1.0.1 then only SHA1 is "
|
||||||
|
"supported with MGF1.",
|
||||||
|
crypto_exception._Reasons.UNSUPPORTED_HASH))
|
||||||
|
mock_get_pub_key.return_value = public_key
|
||||||
|
image_properties = {CERT_UUID:
|
||||||
|
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
|
||||||
|
HASH_METHOD: 'SHA-256',
|
||||||
|
KEY_TYPE: 'RSA-PSS',
|
||||||
|
SIGNATURE: 'BLAH'}
|
||||||
|
self.assertRaisesRegexp(exception.SignatureVerificationError,
|
||||||
|
'Unable to verify signature since the '
|
||||||
|
'algorithm is unsupported on this system',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, image_properties)
|
||||||
|
|
||||||
|
@mock.patch('glance.common.signature_utils.should_create_verifier')
|
||||||
|
def test_verify_signature_invalid_image_props(self, mock_should):
|
||||||
|
mock_should.return_value = False
|
||||||
|
self.assertRaisesRegexp(exception.SignatureVerificationError,
|
||||||
|
'Required image properties for signature'
|
||||||
|
' verification do not exist. Cannot verify'
|
||||||
|
' signature.',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, None)
|
||||||
|
|
||||||
|
@mock.patch('glance.common.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key):
|
||||||
|
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
|
||||||
|
image_properties = {CERT_UUID:
|
||||||
|
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
|
||||||
|
HASH_METHOD: 'SHA-256',
|
||||||
|
KEY_TYPE: 'BLAH',
|
||||||
|
SIGNATURE: 'BLAH'}
|
||||||
|
self.assertRaisesRegexp(exception.SignatureVerificationError,
|
||||||
|
'Invalid signature key type: .*',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, image_properties)
|
||||||
|
|
||||||
|
@mock.patch('glance.common.signature_utils.get_public_key')
|
||||||
|
def test_get_verifier_none(self, mock_get_pub_key):
|
||||||
|
mock_get_pub_key.return_value = BadPublicKey()
|
||||||
|
image_properties = {CERT_UUID:
|
||||||
|
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
|
||||||
|
HASH_METHOD: 'SHA-256',
|
||||||
|
KEY_TYPE: 'RSA-PSS',
|
||||||
|
SIGNATURE: 'BLAH'}
|
||||||
|
self.assertRaisesRegexp(exception.SignatureVerificationError,
|
||||||
|
'Error occurred while creating'
|
||||||
|
' the verifier',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, image_properties)
|
||||||
|
|
||||||
def test_get_signature(self):
|
def test_get_signature(self):
|
||||||
signature = b'A' * 256
|
signature = b'A' * 256
|
||||||
data = base64.b64encode(signature)
|
data = base64.b64encode(signature)
|
||||||
|
@ -15,6 +15,8 @@
|
|||||||
import glance_store
|
import glance_store
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
from debtcollector import removals
|
||||||
|
|
||||||
from glance.common import exception
|
from glance.common import exception
|
||||||
from glance.common import signature_utils
|
from glance.common import signature_utils
|
||||||
import glance.location
|
import glance.location
|
||||||
@ -188,7 +190,8 @@ class TestStoreImage(utils.BaseTestCase):
|
|||||||
self.store_api.get_from_backend,
|
self.store_api.get_from_backend,
|
||||||
image.locations[0]['url'], context={})
|
image.locations[0]['url'], context={})
|
||||||
|
|
||||||
def test_image_set_data_valid_signature(self):
|
@removals.remove(message="This will be removed in the N cycle.")
|
||||||
|
def test_old_image_set_data_valid_signature(self):
|
||||||
context = glance.context.RequestContext(user=USER1)
|
context = glance.context.RequestContext(user=USER1)
|
||||||
extra_properties = {
|
extra_properties = {
|
||||||
'signature_certificate_uuid': 'UUID',
|
'signature_certificate_uuid': 'UUID',
|
||||||
@ -199,7 +202,7 @@ class TestStoreImage(utils.BaseTestCase):
|
|||||||
image_stub = ImageStub(UUID2, status='queued',
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
extra_properties=extra_properties)
|
extra_properties=extra_properties)
|
||||||
self.stubs.Set(signature_utils, 'verify_signature',
|
self.stubs.Set(signature_utils, 'verify_signature',
|
||||||
unit_test_utils.fake_verify_signature)
|
unit_test_utils.fake_old_verify_signature)
|
||||||
image = glance.location.ImageProxy(image_stub, context,
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
self.store_api, self.store_utils)
|
self.store_api, self.store_utils)
|
||||||
image.set_data('YYYY', 4)
|
image.set_data('YYYY', 4)
|
||||||
@ -207,7 +210,8 @@ class TestStoreImage(utils.BaseTestCase):
|
|||||||
self.assertEqual('Z', image.checksum)
|
self.assertEqual('Z', image.checksum)
|
||||||
self.assertEqual('active', image.status)
|
self.assertEqual('active', image.status)
|
||||||
|
|
||||||
def test_image_set_data_invalid_signature(self):
|
@removals.remove(message="This will be removed in the N cycle.")
|
||||||
|
def test_old_image_set_data_invalid_signature(self):
|
||||||
context = glance.context.RequestContext(user=USER1)
|
context = glance.context.RequestContext(user=USER1)
|
||||||
extra_properties = {
|
extra_properties = {
|
||||||
'signature_certificate_uuid': 'UUID',
|
'signature_certificate_uuid': 'UUID',
|
||||||
@ -218,14 +222,15 @@ class TestStoreImage(utils.BaseTestCase):
|
|||||||
image_stub = ImageStub(UUID2, status='queued',
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
extra_properties=extra_properties)
|
extra_properties=extra_properties)
|
||||||
self.stubs.Set(signature_utils, 'verify_signature',
|
self.stubs.Set(signature_utils, 'verify_signature',
|
||||||
unit_test_utils.fake_verify_signature)
|
unit_test_utils.fake_old_verify_signature)
|
||||||
image = glance.location.ImageProxy(image_stub, context,
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
self.store_api, self.store_utils)
|
self.store_api, self.store_utils)
|
||||||
self.assertRaises(exception.SignatureVerificationError,
|
self.assertRaises(exception.SignatureVerificationError,
|
||||||
image.set_data,
|
image.set_data,
|
||||||
'YYYY', 4)
|
'YYYY', 4)
|
||||||
|
|
||||||
def test_image_set_data_invalid_signature_missing_metadata(self):
|
@removals.remove(message="This will be removed in the N cycle.")
|
||||||
|
def test_old_image_set_data_invalid_signature_missing_metadata(self):
|
||||||
context = glance.context.RequestContext(user=USER1)
|
context = glance.context.RequestContext(user=USER1)
|
||||||
extra_properties = {
|
extra_properties = {
|
||||||
'signature_hash_method': 'METHOD',
|
'signature_hash_method': 'METHOD',
|
||||||
@ -235,7 +240,65 @@ class TestStoreImage(utils.BaseTestCase):
|
|||||||
image_stub = ImageStub(UUID2, status='queued',
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
extra_properties=extra_properties)
|
extra_properties=extra_properties)
|
||||||
self.stubs.Set(signature_utils, 'verify_signature',
|
self.stubs.Set(signature_utils, 'verify_signature',
|
||||||
unit_test_utils.fake_verify_signature)
|
unit_test_utils.fake_old_verify_signature)
|
||||||
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
|
self.store_api, self.store_utils)
|
||||||
|
image.set_data('YYYY', 4)
|
||||||
|
self.assertEqual(UUID2, image.locations[0]['url'])
|
||||||
|
self.assertEqual('Z', image.checksum)
|
||||||
|
# Image is still active, since invalid signature was ignored
|
||||||
|
self.assertEqual('active', image.status)
|
||||||
|
|
||||||
|
@mock.patch('glance.location.LOG')
|
||||||
|
def test_image_set_data_valid_signature(self, mock_log):
|
||||||
|
context = glance.context.RequestContext(user=USER1)
|
||||||
|
extra_properties = {
|
||||||
|
'img_signature_certificate_uuid': 'UUID',
|
||||||
|
'img_signature_hash_method': 'METHOD',
|
||||||
|
'img_signature_key_type': 'TYPE',
|
||||||
|
'img_signature': 'VALID'
|
||||||
|
}
|
||||||
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
|
extra_properties=extra_properties)
|
||||||
|
self.stubs.Set(signature_utils, 'get_verifier',
|
||||||
|
unit_test_utils.fake_get_verifier)
|
||||||
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
|
self.store_api, self.store_utils)
|
||||||
|
image.set_data('YYYY', 4)
|
||||||
|
self.assertEqual('active', image.status)
|
||||||
|
mock_log.info.assert_called_once_with(
|
||||||
|
u'Successfully verified signature for image %s',
|
||||||
|
UUID2)
|
||||||
|
|
||||||
|
def test_image_set_data_invalid_signature(self):
|
||||||
|
context = glance.context.RequestContext(user=USER1)
|
||||||
|
extra_properties = {
|
||||||
|
'img_signature_certificate_uuid': 'UUID',
|
||||||
|
'img_signature_hash_method': 'METHOD',
|
||||||
|
'img_signature_key_type': 'TYPE',
|
||||||
|
'img_signature': 'INVALID'
|
||||||
|
}
|
||||||
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
|
extra_properties=extra_properties)
|
||||||
|
self.stubs.Set(signature_utils, 'get_verifier',
|
||||||
|
unit_test_utils.fake_get_verifier)
|
||||||
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
|
self.store_api, self.store_utils)
|
||||||
|
self.assertRaises(exception.SignatureVerificationError,
|
||||||
|
image.set_data,
|
||||||
|
'YYYY', 4)
|
||||||
|
|
||||||
|
def test_image_set_data_invalid_signature_missing_metadata(self):
|
||||||
|
context = glance.context.RequestContext(user=USER1)
|
||||||
|
extra_properties = {
|
||||||
|
'img_signature_hash_method': 'METHOD',
|
||||||
|
'img_signature_key_type': 'TYPE',
|
||||||
|
'img_signature': 'INVALID'
|
||||||
|
}
|
||||||
|
image_stub = ImageStub(UUID2, status='queued',
|
||||||
|
extra_properties=extra_properties)
|
||||||
|
self.stubs.Set(signature_utils, 'get_verifier',
|
||||||
|
unit_test_utils.fake_get_verifier)
|
||||||
image = glance.location.ImageProxy(image_stub, context,
|
image = glance.location.ImageProxy(image_stub, context,
|
||||||
self.store_api, self.store_utils)
|
self.store_api, self.store_utils)
|
||||||
image.set_data('YYYY', 4)
|
image.set_data('YYYY', 4)
|
||||||
|
@ -14,7 +14,10 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from cryptography import exceptions as crypto_exception
|
||||||
|
from debtcollector import removals
|
||||||
import glance_store as store
|
import glance_store as store
|
||||||
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
@ -85,7 +88,8 @@ def fake_get_size_from_backend(uri, context=None):
|
|||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
def fake_verify_signature(context, checksum_hash, image_properties):
|
@removals.remove(message="This will be removed in the N cycle.")
|
||||||
|
def fake_old_verify_signature(context, checksum_hash, image_properties):
|
||||||
if (image_properties is not None and 'signature' in image_properties and
|
if (image_properties is not None and 'signature' in image_properties and
|
||||||
image_properties['signature'] == 'VALID'):
|
image_properties['signature'] == 'VALID'):
|
||||||
return True
|
return True
|
||||||
@ -94,6 +98,17 @@ def fake_verify_signature(context, checksum_hash, image_properties):
|
|||||||
'Signature verification failed.')
|
'Signature verification failed.')
|
||||||
|
|
||||||
|
|
||||||
|
def fake_get_verifier(context, image_properties):
|
||||||
|
verifier = mock.Mock()
|
||||||
|
if (image_properties is not None and 'img_signature' in image_properties
|
||||||
|
and image_properties['img_signature'] == 'VALID'):
|
||||||
|
verifier.verify.return_value = None
|
||||||
|
else:
|
||||||
|
ex = crypto_exception.InvalidSignature()
|
||||||
|
verifier.verify.side_effect = ex
|
||||||
|
return verifier
|
||||||
|
|
||||||
|
|
||||||
class FakeDB(object):
|
class FakeDB(object):
|
||||||
|
|
||||||
def __init__(self, initialize=True):
|
def __init__(self, initialize=True):
|
||||||
@ -197,7 +212,7 @@ class FakeStoreAPI(object):
|
|||||||
return self.get_from_backend(location, context=context)[1]
|
return self.get_from_backend(location, context=context)[1]
|
||||||
|
|
||||||
def add_to_backend(self, conf, image_id, data, size,
|
def add_to_backend(self, conf, image_id, data, size,
|
||||||
scheme=None, context=None):
|
scheme=None, context=None, verifier=None):
|
||||||
store_max_size = 7
|
store_max_size = 7
|
||||||
current_store_size = 2
|
current_store_size = 2
|
||||||
for location in self.data.keys():
|
for location in self.data.keys():
|
||||||
|
Loading…
Reference in New Issue
Block a user