From a0da527edce0dc05813db75008ac763bd4d3ac54 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Fri, 5 Sep 2025 12:15:59 +0100 Subject: [PATCH] identity: Add tokens This is mostly handled by keystoneauth, but there are a few non-auth APIs that we can currently only handle with keystoneclient. Close that gap. Change-Id: Iff8bbcf982b817098b42e69201f125202b41fb04 Signed-off-by: Stephen Finucane --- doc/source/user/proxies/identity_v3.rst | 7 + .../user/resources/identity/v3/token.rst | 12 ++ openstack/identity/v3/_proxy.py | 39 ++++ openstack/identity/v3/token.py | 115 ++++++++++ .../tests/unit/identity/v3/test_proxy.py | 32 +++ .../tests/unit/identity/v3/test_token.py | 198 ++++++++++++++++++ 6 files changed, 403 insertions(+) create mode 100644 doc/source/user/resources/identity/v3/token.rst create mode 100644 openstack/identity/v3/token.py create mode 100644 openstack/tests/unit/identity/v3/test_token.py diff --git a/doc/source/user/proxies/identity_v3.rst b/doc/source/user/proxies/identity_v3.rst index fe02e2fbc..e51f3c94e 100644 --- a/doc/source/user/proxies/identity_v3.rst +++ b/doc/source/user/proxies/identity_v3.rst @@ -86,6 +86,13 @@ User Operations :members: create_user, update_user, delete_user, get_user, find_user, users, user_groups +Token Operations +^^^^^^^^^^^^^^^^ + +.. autoclass:: openstack.identity.v3._proxy.Proxy + :noindex: + :members: validate_token, check_token, revoke_token + Trust Operations ^^^^^^^^^^^^^^^^ diff --git a/doc/source/user/resources/identity/v3/token.rst b/doc/source/user/resources/identity/v3/token.rst new file mode 100644 index 000000000..106390c57 --- /dev/null +++ b/doc/source/user/resources/identity/v3/token.rst @@ -0,0 +1,12 @@ +openstack.identity.v3.token +=========================== + +.. automodule:: openstack.identity.v3.token + +The Token Class +--------------- + +The ``Token`` class inherits from :class:`~openstack.resource.Resource`. + +.. autoclass:: openstack.identity.v3.token.Token + :members: diff --git a/openstack/identity/v3/_proxy.py b/openstack/identity/v3/_proxy.py index b0e78818b..15dbc84ef 100644 --- a/openstack/identity/v3/_proxy.py +++ b/openstack/identity/v3/_proxy.py @@ -53,6 +53,7 @@ from openstack.identity.v3 import ( from openstack.identity.v3 import service as _service from openstack.identity.v3 import service_provider as _service_provider from openstack.identity.v3 import system as _system +from openstack.identity.v3 import token as _token from openstack.identity.v3 import trust as _trust from openstack.identity.v3 import user as _user from openstack import proxy @@ -87,6 +88,7 @@ class Proxy(proxy.Proxy): "service": _service.Service, "system": _system.System, "trust": _trust.Trust, + "token": _token.Token, "user": _user.User, } @@ -976,6 +978,43 @@ class Proxy(proxy.Proxy): """ return self._update(_user.User, user, **attrs) + # ========== Tokens ========== + + def validate_token( + self, token: str, nocatalog: bool = False, allow_expired: bool = False + ) -> _token.Token: + """Validate a token + + :param token: The token to validate. + :param nocatalog: Whether the returned token should not include a + catalog. + :param allow_expired: Whether to allow expired tokens. + + :returns: A :class:`~openstack.identity.v3.token.Token`. + """ + return _token.Token.validate( + self, token, nocatalog=nocatalog, allow_expired=allow_expired + ) + + def check_token(self, token: str, allow_expired: bool = False) -> bool: + """Check if a token is valid. + + :param token: The token to check. + :param allow_expired: Whether to allow expired tokens. + + :returns: True if valid, else False. + """ + return _token.Token.check(self, token, allow_expired=allow_expired) + + def revoke_token(self, token: str) -> None: + """Revoke a token. + + :param token: The token to revoke. + + :returns: None + """ + _token.Token.revoke(self, token) + # ========== Trusts ========== def create_trust(self, **attrs): diff --git a/openstack/identity/v3/token.py b/openstack/identity/v3/token.py new file mode 100644 index 000000000..54518858e --- /dev/null +++ b/openstack/identity/v3/token.py @@ -0,0 +1,115 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystoneauth1 import adapter + +from openstack import exceptions +from openstack import resource + + +class Token(resource.Resource): + resource_key = 'token' + base_path = '/auth/tokens' + + # capabilities + allow_fetch = False + allow_delete = False + allow_list = False + allow_head = False + + # Properties + #: An authentication token. This is used rather than X-Auth-Token to allow + #: users check or revoke a token other than their own. + subject_token = resource.Header('x-subject-token') + + #: A list of one or two audit IDs. An audit ID is a unique, randomly + #: generated, URL-safe string that you can use to track a token. The first + #: audit ID is the current audit ID for the token. The second audit ID is + #: present for only re-scoped tokens and is the audit ID from the token + #: before it was re-scoped. A re- scoped token is one that was exchanged + #: for another token of the same or different scope. You can use these + #: audit IDs to track the use of a token or chain of tokens across multiple + #: requests and endpoints without exposing the token ID to non-privileged + #: users. + audit_ids = resource.Body('audit_ids', type=list) + #: The service catalog. + catalog = resource.Body('catalog', type=list, list_type=dict) + #: The date and time when the token expires. + expires_at = resource.Body('expires_at') + #: The date and time when the token was issued. + issued_at = resource.Body('issued_at') + #: The authentication method. + methods = resource.Body('methods', type=list) + #: The user that owns the token. + user = resource.Body('user', type=dict) + #: The project that the token is scoped to, if any. + project = resource.Body('project', type=dict) + #: The domain that the token is scoped to, if any. + domain = resource.Body('domain', type=dict) + #: Whether the project, if set, is acting as a domain. + is_domain = resource.Body('is_domain', type=bool) + #: The parts of the system the token is scoped to, if system-scoped. + system = resource.Body('system', type=dict) + #: The roles associated with the user. + roles = resource.Body('roles', type=list, list_type=dict) + + @classmethod + def validate( + cls, + session: adapter.Adapter, + token: str, + *, + nocatalog: bool = False, + allow_expired: bool = False, + ) -> 'Token': + path = cls.base_path + + params: dict[str, bool] = {} + if nocatalog: + params['nocatalog'] = nocatalog + if allow_expired: + params['allow_expired'] = allow_expired + + response = session.get( + path, headers={'x-subject-token': token}, params=params + ) + exceptions.raise_from_response(response) + + ret = cls() + ret._translate_response( + response, resource_response_key=cls.resource_key + ) + return ret + + @classmethod + def check( + cls, + session: adapter.Adapter, + token: str, + *, + allow_expired: bool = False, + ) -> bool: + params: dict[str, bool] = {} + if allow_expired: + params['allow_expired'] = allow_expired + + response = session.head( + cls.base_path, headers={'x-subject-token': token}, params=params + ) + return response.status_code == 200 + + @classmethod + def revoke(cls, session: adapter.Adapter, token: str) -> None: + response = session.delete( + cls.base_path, headers={'x-subject-token': token} + ) + exceptions.raise_from_response(response) diff --git a/openstack/tests/unit/identity/v3/test_proxy.py b/openstack/tests/unit/identity/v3/test_proxy.py index 13b930b77..fd153d978 100644 --- a/openstack/tests/unit/identity/v3/test_proxy.py +++ b/openstack/tests/unit/identity/v3/test_proxy.py @@ -394,6 +394,38 @@ class TestIdentityProxyUser(TestIdentityProxyBase): ) +class TestIdentityProxyToken(TestIdentityProxyBase): + def test_token_validate(self): + self._verify( + "openstack.identity.v3.token.Token.validate", + self.proxy.validate_token, + method_args=['token'], + method_kwargs={'nocatalog': False, 'allow_expired': False}, + expected_args=[self.proxy, 'token'], + expected_kwargs={'nocatalog': False, 'allow_expired': False}, + ) + + def test_token_check(self): + self._verify( + "openstack.identity.v3.token.Token.check", + self.proxy.check_token, + method_args=['token'], + method_kwargs={'allow_expired': False}, + expected_args=[self.proxy, 'token'], + expected_kwargs={'allow_expired': False}, + ) + + def test_token_revoke(self): + self._verify( + "openstack.identity.v3.token.Token.revoke", + self.proxy.revoke_token, + method_args=['token'], + method_kwargs={}, + expected_args=[self.proxy, 'token'], + expected_kwargs={}, + ) + + class TestIdentityProxyTrust(TestIdentityProxyBase): def test_trust_create_attrs(self): self.verify_create(self.proxy.create_trust, trust.Trust) diff --git a/openstack/tests/unit/identity/v3/test_token.py b/openstack/tests/unit/identity/v3/test_token.py new file mode 100644 index 000000000..bc78f3c43 --- /dev/null +++ b/openstack/tests/unit/identity/v3/test_token.py @@ -0,0 +1,198 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from keystoneauth1 import adapter + +from openstack import exceptions +from openstack.identity.v3 import token +from openstack.tests.unit import base + +IDENTIFIER = 'IDENTIFIER' +TOKEN_DATA = { + 'audit_ids': ['VcxU2JEMTjufVx7sVk7bPw'], + 'catalog': [ + { + 'endpoints': [ + { + 'id': '068d1b359ee84b438266cb736d81de97', + 'interface': 'public', + 'region': 'RegionOne', + 'region_id': 'RegionOne', + 'url': 'http://example.com/v2.1', + } + ], + 'id': '050726f278654128aba89757ae25950c', + 'name': 'nova', + 'type': 'compute', + } + ], + 'domain': {'id': 'default', 'name': 'Default'}, + 'expires_at': '2013-02-27T18:30:59.999999Z', + 'issued_at': '2013-02-27T16:30:59.999999Z', + 'methods': ['password'], + 'project': { + 'domain': {'id': 'default', 'name': 'Default'}, + 'id': '8538a3f13f9541b28c2620eb19065e45', + 'name': 'admin', + }, + 'roles': [{'id': 'c703057be878458588961ce9a0ce686b', 'name': 'admin'}], + 'system': {'all': True}, + 'user': { + 'domain': {'id': 'default', 'name': 'Default'}, + 'id': '10a2e6e717a245d9acad3e5f97aeca3d', + 'name': 'admin', + 'password_expires_at': None, + }, + 'is_domain': False, +} + +EXAMPLE = {'token': TOKEN_DATA} + + +class TestToken(base.TestCase): + def setUp(self): + super().setUp() + self.session = mock.Mock(spec=adapter.Adapter) + + def test_basic(self): + sot = token.Token() + self.assertEqual('token', sot.resource_key) + self.assertEqual('/auth/tokens', sot.base_path) + self.assertFalse(sot.allow_fetch) + self.assertFalse(sot.allow_delete) + self.assertFalse(sot.allow_list) + self.assertFalse(sot.allow_head) + + def test_make_it(self): + sot = token.Token(**TOKEN_DATA) + self.assertEqual(TOKEN_DATA['audit_ids'], sot.audit_ids) + self.assertEqual(TOKEN_DATA['catalog'], sot.catalog) + self.assertEqual(TOKEN_DATA['expires_at'], sot.expires_at) + self.assertEqual(TOKEN_DATA['issued_at'], sot.issued_at) + self.assertEqual(TOKEN_DATA['methods'], sot.methods) + self.assertEqual(TOKEN_DATA['user'], sot.user) + self.assertEqual(TOKEN_DATA['project'], sot.project) + self.assertEqual(TOKEN_DATA['domain'], sot.domain) + self.assertEqual(TOKEN_DATA['is_domain'], sot.is_domain) + self.assertEqual(TOKEN_DATA['system'], sot.system) + self.assertEqual(TOKEN_DATA['roles'], sot.roles) + + def test_validate(self): + response = mock.Mock() + response.status_code = 200 + response.json.return_value = EXAMPLE + response.headers = {'content-type': 'application/json'} + self.session.get.return_value = response + + result = token.Token.validate(self.session, 'token') + + self.session.get.assert_called_once_with( + '/auth/tokens', headers={'x-subject-token': 'token'}, params={} + ) + self.assertIsInstance(result, token.Token) + + def test_validate_with_params(self): + response = mock.Mock() + response.status_code = 200 + response.json.return_value = EXAMPLE + response.headers = {'content-type': 'application/json'} + self.session.get.return_value = response + + result = token.Token.validate( + self.session, 'token', nocatalog=True, allow_expired=True + ) + + self.session.get.assert_called_once_with( + '/auth/tokens', + headers={'x-subject-token': 'token'}, + params={'nocatalog': True, 'allow_expired': True}, + ) + self.assertIsInstance(result, token.Token) + + def test_validate_error(self): + response = mock.Mock() + response.status_code = 404 + response.json.return_value = {} + response.headers = {'content-type': 'application/json'} + self.session.get.return_value = response + + self.assertRaises( + exceptions.NotFoundException, + token.Token.validate, + self.session, + 'token', + ) + + def test_check(self): + response = mock.Mock() + response.status_code = 200 + self.session.head.return_value = response + + result = token.Token.check(self.session, 'token') + + self.session.head.assert_called_once_with( + '/auth/tokens', headers={'x-subject-token': 'token'}, params={} + ) + self.assertTrue(result) + + def test_check_with_param(self): + response = mock.Mock() + response.status_code = 200 + self.session.head.return_value = response + + result = token.Token.check(self.session, 'token', allow_expired=True) + + self.session.head.assert_called_once_with( + '/auth/tokens', + headers={'x-subject-token': 'token'}, + params={'allow_expired': True}, + ) + self.assertTrue(result) + + def test_check_invalid_token(self): + response = mock.Mock() + response.status_code = 404 + self.session.head.return_value = response + + result = token.Token.check(self.session, 'token') + + self.session.head.assert_called_once_with( + '/auth/tokens', headers={'x-subject-token': 'token'}, params={} + ) + self.assertFalse(result) + + def test_revoke(self): + response = mock.Mock() + response.status_code = 204 + self.session.delete.return_value = response + + token.Token.revoke(self.session, 'token') + + self.session.delete.assert_called_once_with( + '/auth/tokens', headers={'x-subject-token': 'token'} + ) + + def test_revoke_error(self): + response = mock.Mock() + response.status_code = 404 + response.json.return_value = {} + response.headers = {'content-type': 'application/json'} + self.session.delete.return_value = response + + self.assertRaises( + exceptions.NotFoundException, + token.Token.revoke, + self.session, + 'token', + )