Add new get location api

This change adds a new location api GET
/v2/images/{image_id}/locations to get the locations associated
to the image.
This operation will be allowed to service user only,
and validated by the new policy rule `fetch_image_location`.

Implements: blueprint new-location-apis
Change-Id: I9d14465a83e76c73e12cec3b96d42e568ab97072
This commit is contained in:
Pranali Deore 2023-05-07 17:34:10 +00:00
parent b83f38cf25
commit 5369a825ed
9 changed files with 291 additions and 0 deletions

View File

@ -1217,6 +1217,29 @@ class ImagesController(object):
return image_id
def get_locations(self, req, image_id):
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
# NOTE(pdeore): This is the right place to check whether user
# have permission to get the image locations
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
api_pol.get_locations()
locations = list(image.locations)
for loc in locations:
loc.pop('id', None)
loc.pop('status', None)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except exception.Forbidden as e:
LOG.debug("User not permitted to get the image locations.")
raise webob.exc.HTTPForbidden(explanation=e.msg)
return locations
class RequestDeserializer(wsgi.JSONRequestDeserializer):

View File

@ -235,6 +235,9 @@ class ImageAPIPolicy(APIPolicyBase):
def add_location(self):
self._enforce('add_image_location')
def get_locations(self):
self._enforce('fetch_image_location')
def add_image(self):
try:
self._enforce('add_image')

View File

@ -498,6 +498,10 @@ class API(wsgi.Router):
controller=images_resource,
action='add_location',
conditions={'method': ['POST']})
mapper.connect('/images/{image_id}/locations',
controller=images_resource,
action='get_locations',
conditions={'method': ['GET']})
mapper.connect('/images/{image_id}/locations',
controller=reject_method_resource,
action='reject',

View File

@ -88,6 +88,8 @@ SERVICE_OR_PROJECT_MEMBER = (
f'rule:service_api or ({PROJECT_MEMBER} and project_id:%(owner)s)'
)
SERVICE = 'rule:service_api'
rules = [
policy.RuleDefault(name='default', check_str='',
description='Defines the default rule used for '

View File

@ -198,6 +198,16 @@ image_policies = [
'method': 'POST'}
],
),
policy.DocumentedRuleDefault(
name="fetch_image_location",
check_str=base.SERVICE,
scope_types=['project'],
description='Show all locations associated to given image',
operations=[
{'path': '/v2/images/{image_id}/locations',
'method': 'GET'}
],
),
policy.DocumentedRuleDefault(
name="add_member",

View File

@ -3978,6 +3978,109 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_get_location(self):
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.CREATED, response.status_code)
# Returned image entity should have a generated id and status
image = jsonutils.loads(response.text)
image_id = image['id']
self.assertEqual('queued', image['status'])
# Get locations of `queued` image
headers = self._headers({'X-Roles': 'service'})
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code, response.text)
self.assertEqual(0, len(jsonutils.loads(response.text)))
self.assertEqual('queued', image['status'])
# Get location of invalid image
image_id = str(uuid.uuid4())
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(http.NOT_FOUND, response.status_code, response.text)
# Add Location with valid URL and image owner
image_id = image['id']
path = self._url('/v2/images/%s/locations' % image_id)
url = 'http://127.0.0.1:%s/foo_image' % self.http_port1
data = jsonutils.dumps({'url': url})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(202, response.status_code, response.text)
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({'content-type': 'application/json'})
func_utils.wait_for_status(self, request_path=path,
request_headers=headers,
status='active',
max_sec=10,
delay_sec=0.2,
start_delay_sec=1)
# Get Locations not allowed for any other user
headers = self._headers({'X-Roles': 'admin,member'})
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(http.FORBIDDEN, response.status_code, response.text)
# Get Locations allowed only for service user
headers = self._headers({'X-Roles': 'service'})
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code, response.text)
self.stop_servers()
def test_get_location_with_data_upload(self):
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.CREATED, response.status_code)
# Returned image entity should have a generated id and status
image = jsonutils.loads(response.text)
image_id = image['id']
self.assertEqual('queued', image['status'])
# Upload some image data
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
image_data = b'ZZZZZ'
response = requests.put(path, headers=headers, data=image_data)
self.assertEqual(http.NO_CONTENT, response.status_code)
expect_c = str(md5(image_data, usedforsecurity=False).hexdigest())
expect_h = str(hashlib.sha512(image_data).hexdigest())
func_utils.verify_image_hashes_and_status(self, image_id, expect_c,
expect_h, 'active',
size=len(image_data))
# Get Locations not allowed for any other user
headers = self._headers({'X-Roles': 'admin,member'})
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(http.FORBIDDEN, response.status_code, response.text)
# Get Locations allowed only for service user
headers = self._headers({'X-Roles': 'service'})
path = self._url('/v2/images/%s/locations' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code, response.text)
output = jsonutils.loads(response.text)
self.assertTrue(output[0]['url'])
self.stop_servers()
class TestImagesIPv6(functional.FunctionalTest):
"""Verify that API and REG servers running IPv6 can communicate"""
@ -7952,3 +8055,62 @@ class TestMultipleBackendsLocationApi(functional.SynchronousAPIBase):
image = jsonutils.loads(resp.text)
self.assertEqual(expect_c, image['checksum'])
self.assertEqual(expect_h, image['os_hash_value'])
def test_get_location(self):
self._setup_multiple_stores()
# Create an image
path = '/v2/images'
headers = self._headers({'content-type': 'application/json'})
data = {'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'}
response = self.api_post(path, headers=headers, json=data)
self.assertEqual(http.CREATED, response.status_code)
# Returned image entity should have a generated id and status
image = jsonutils.loads(response.text)
image_id = image['id']
self.assertEqual('queued', image['status'])
# Get location of `queued` image
headers = self._headers({'X-Roles': 'service'})
path = '/v2/images/%s/locations' % image_id
response = self.api_get(path, headers=headers)
self.assertEqual(200, response.status_code, response.text)
self.assertEqual(0, len(jsonutils.loads(response.text)))
# Get location of invalid image
image_id = str(uuid.uuid4())
path = '/v2/images/%s/locations' % image_id
response = self.api_get(path, headers=headers)
self.assertEqual(http.NOT_FOUND, response.status_code, response.text)
# Add Location with valid URL and image owner
image_id = image['id']
path = '/v2/images/%s/locations' % image_id
url = 'http://127.0.0.1:%s/store1/foo_image' % self.http_port0
data = {'url': url}
response = self.api_post(path, headers=headers, json=data)
self.assertEqual(202, response.status_code, response.text)
path = '/v2/images/%s' % image_id
headers = self._headers({'content-type': 'application/json'})
func_utils.wait_for_status(self, request_path=path,
request_headers=headers,
status='active',
max_sec=10,
delay_sec=0.2,
start_delay_sec=1, multistore=True)
# Get Locations not allowed for any other user
headers = self._headers({'X-Roles': 'admin,member'})
path = '/v2/images/%s/locations' % image_id
response = self.api_get(path, headers=headers)
self.assertEqual(http.FORBIDDEN, response.status_code, response.text)
# Get Locations allowed only for service user
headers = self._headers({'X-Roles': 'service'})
path = '/v2/images/%s/locations' % image_id
response = self.api_get(path, headers=headers)
self.assertEqual(200, response.status_code, response.text)
output = jsonutils.loads(response.text)
self.assertEqual(url, output[0]['url'])

View File

@ -530,6 +530,12 @@ class TestDefaultPolicyCheckStrings(base.IsolatedUnitTest):
)
self.assertEqual(expected, base_policy.SERVICE_OR_PROJECT_MEMBER)
def test_service_check_string(self):
expected = (
'rule:service_api'
)
self.assertEqual(expected, base_policy.SERVICE)
class TestImageTarget(base.IsolatedUnitTest):
def test_image_target_ignores_locations(self):

View File

@ -4246,6 +4246,73 @@ class TestImagesController(base.IsolatedUnitTest):
self.controller.add_location,
request, image_id, req_body)
def test_get_locations_by_owner_or_admin(self):
url = '%s/fake_location_1' % BASE_URI
image_id = str(uuid.uuid4())
self.images = [
_db_fixture(image_id, owner=TENANT1, checksum=CHKSUM,
name='1',
disk_format='raw',
container_format='bare',
status='active',
locations=[{'url': url,
'metadata': {}, 'status': 'active'}]),
]
self.db.image_create(None, self.images[0])
enforcer = unit_test_utils.enforcer_from_rules({
"get_image": "",
"fetch_image_location": "role:service"
})
self.controller.policy = enforcer
request = unit_test_utils.get_fake_request(roles=['admin', 'member'])
self.assertRaises(
webob.exc.HTTPForbidden,
self.controller.get_locations, request, image_id)
def test_get_locations(self):
image_id = str(uuid.uuid4())
url = '%s/fake_location_1' % BASE_URI
self.images = [
_db_fixture(image_id, owner=TENANT1, checksum=CHKSUM,
name='1',
disk_format='raw',
container_format='bare',
status='active',
locations=[{'url': url,
'metadata': {}, 'status': 'active'}]),
]
self.db.image_create(None, self.images[0])
enforcer = unit_test_utils.enforcer_from_rules({
"get_image": "",
"fetch_image_location": "role:service"
})
self.controller.policy = enforcer
request = unit_test_utils.get_fake_request(roles=['service'])
output = self.controller.get_locations(request, image_id)
self.assertEqual(1, len(output))
self.assertEqual(url, output[0]['url'])
def test_get_locations_of_non_existing_image(self):
url = '%s/fake_location_1' % BASE_URI
image_id = str(uuid.uuid4())
self.images = [
_db_fixture(image_id, owner=TENANT1, checksum=CHKSUM,
name='1',
disk_format='raw',
container_format='bare',
status='active',
locations=[{'url': url,
'metadata': {}, 'status': 'active'}]),
]
self.db.image_create(None, self.images[0])
request = unit_test_utils.get_fake_request(roles=['member'])
self.assertRaisesRegex(
webob.exc.HTTPNotFound,
'No image found with ID .*',
self.controller.get_locations,
request, str(uuid.uuid4()))
class TestImagesControllerPolicies(base.IsolatedUnitTest):
@ -4392,6 +4459,13 @@ class TestImagesControllerPolicies(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden,
self.controller.add_location, request, UUID1, body)
def test_get_locations_unauthorized(self):
rules = {"fetch_image_location": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden,
self.controller.get_locations, request, UUID1)
class TestImagesDeserializer(test_utils.BaseTestCase):

View File

@ -0,0 +1,7 @@
---
features:
- |
This release brings the additional functionality of get locations
associated to an image accessible to only service users
i.e., consumers like cinder and nova for OSSN-0090 and OSSN-0065.