Clean up keystone authentication

Organize keystone authentication code for testability and
better mainatainability.

Change-Id: Iab065a2047f3ef1ff2282223f4f06b1e83f07657
This commit is contained in:
Andras Kovi 2017-10-14 16:25:50 +02:00 committed by Adriano Petrich
parent 4d8d9e98be
commit 69e8925b20
4 changed files with 264 additions and 99 deletions

View File

@ -53,7 +53,7 @@ class Client(object):
# If auth url was provided then we perform an authentication, otherwise # If auth url was provided then we perform an authentication, otherwise
# just ignore this step # just ignore this step
if req.get('auth_url'): if req.get('auth_url') or req.get('target_auth_url'):
auth_handler = auth.get_auth_handler(auth_type) auth_handler = auth.get_auth_handler(auth_type)
auth_response = auth_handler.authenticate(req, session=session) auth_response = auth_handler.authenticate(req, session=session)
else: else:

View File

@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import keystoneauth1.access.service_catalog as sc
import keystoneauth1.identity.generic as auth_plugin import keystoneauth1.identity.generic as auth_plugin
from keystoneauth1 import session as ks_session from keystoneauth1 import session as ks_session
import mistralclient.api.httpclient as api import mistralclient.api.httpclient as api
@ -19,87 +22,145 @@ from mistralclient import auth as mistral_auth
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
LOG = logging.getLogger(__name__)
class KeystoneAuthHandler(mistral_auth.AuthHandler): class KeystoneAuthHandler(mistral_auth.AuthHandler):
def authenticate(self, req, session=None): def authenticate(self, req, session=None):
"""Perform authentication via Keystone. """Perform authentication via Keystone.
:param req: Request dict containing list of parameters required :param req: Request dict containing the parameters required
for Keystone authentication. for Keystone authentication.
""" """
if not isinstance(req, dict): reqs = self._separate_target_reqs(req)
raise TypeError('The input "req" is not typeof dict.') try:
result = self._authenticate(reqs, session)
except Exception as e:
if "Cannot use v2 authentication with domain scope" in str(e):
LOG.warning("Client tried to use v2 authentication with "
"domain scope. Domain parameters are assumed "
"to be erroneously set. Retrying "
"authentication without them. "
"Request parameters: %s" % str(reqs))
domainless_reqs = [reqs[0],
self._remove_domain(reqs[1])]
result = self._authenticate(domainless_reqs, session)
else:
raise
return result
session = session @staticmethod
mistral_url = req.get('mistral_url') def _separate_target_reqs(req):
endpoint_type = req.get('endpoint_type', 'publicURL') """Separates parameters into target and non-target ones.
service_type = req.get('service_type', 'workflowv2')
auth_url = req.get('auth_url') target_* parameters are rekeyed by removing the prefix.
username = req.get('username')
user_id = req.get('user_id') :param req: Request dict containing the parameters for Keystone
api_key = req.get('api_key') authentication.
auth_token = req.get('auth_token') :return: list of [non-target, target] request parameters
project_name = req.get('project_name') """
project_id = req.get('project_id') r = {}
region_name = req.get('region_name') target_r = {}
user_domain_name = req.get('user_domain_name') target_prefix = "target_"
user_domain_id = req.get('user_domain_id') for key in req:
project_domain_name = req.get('project_domain_name') if key.startswith(target_prefix):
project_domain_id = req.get('project_domain_id') target_r[key[len(target_prefix):]] = req[key]
target_auth_url = req.get('target_auth_url') else:
target_username = req.get('target_username') r[key] = req[key]
target_user_id = req.get('target_user_id')
target_api_key = req.get('target_api_key') return [r, target_r]
target_auth_token = req.get('target_auth_token')
target_project_name = req.get('target_project_name') @staticmethod
target_project_id = req.get('target_project_id') def _remove_domain(req):
target_user_domain_name = req.get('target_user_domain_name') """Remove all domain parameters from req.
target_user_domain_id = req.get('target_user_domain_id')
target_project_domain_name = req.get('target_project_domain_name') Keystoneauth with V2 does not accept domain parameters. This
target_project_domain_id = req.get('target_project_domain_id') is an incompatible change from Keystoneclient but it would
unnecessarily break clients of Mistral. It is safe to remove
domain parameters if V2 auth is targeted.
:param req: Request dict containing list of parameters required
:return: Request dict without domains
"""
r = {}
for key in req:
if "domain" not in key:
r[key] = req[key]
return r
@staticmethod
def _get_auth(api_key=None, auth_token=None, auth_url=None,
project_domain_id=None, project_domain_name=None,
project_id=None, project_name=None, user_domain_id=None,
user_domain_name=None, user_id=None, username=None,
**kwargs):
if project_name and project_id: if project_name and project_id:
raise RuntimeError( raise RuntimeError(
'Only project name or project id should be set' 'Only one of project_name or project_id should be set'
) )
if username and user_id: if username and user_id:
raise RuntimeError( raise RuntimeError(
'Only user name or user id should be set' 'Only one of username or user_id should be set'
) )
auth = {}
if auth_token:
auth = auth_plugin.Token(
auth_url=auth_url,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
project_id=project_id,
project_name=project_name,
token=auth_token
)
elif api_key and (username or user_id):
auth = auth_plugin.Password(
auth_url=auth_url,
password=api_key,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
project_id=project_id,
project_name=project_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
user_id=user_id,
username=username
)
return auth
def _authenticate(self, reqs, session=None):
"""Performs authentication via Keystone.
:param reqs: Request dict containing list of parameters required
for Keystone authentication.
:return: Auth response dict
"""
if not isinstance(reqs[0], dict):
raise TypeError('The input "req" is not typeof dict.')
if not isinstance(reqs[1], dict):
raise TypeError('The input "req" is not typeof dict.')
auth_response = {} auth_response = {}
req = reqs[0]
cacert = req.get('cacert')
endpoint_type = req.get('endpoint_type', 'publicURL')
insecure = req.get('insecure')
mistral_url = req.get('mistral_url')
region_name = req.get('region_name')
service_type = req.get('service_type', 'workflowv2')
verify = self._verification_needed(cacert, insecure)
if not session: if not session:
if auth_token: auth = self._get_auth(**req)
auth = auth_plugin.Token(
auth_url=auth_url,
token=auth_token,
project_id=project_id,
project_name=project_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id)
elif api_key and (username or user_id):
auth = auth_plugin.Password(
auth_url=auth_url,
username=username,
user_id=user_id,
password=api_key,
user_domain_name=user_domain_name,
user_domain_id=user_domain_id,
project_id=project_id,
project_name=project_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id)
else:
raise RuntimeError("You must either provide a valid token or "
"a password (api_key) and a user.")
if auth: if auth:
session = ks_session.Session(auth=auth) session = ks_session.Session(auth=auth, verify=verify)
if session: if session:
if not mistral_url: if not mistral_url:
@ -115,47 +176,75 @@ class KeystoneAuthHandler(mistral_auth.AuthHandler):
auth_response['mistral_url'] = mistral_url auth_response['mistral_url'] = mistral_url
auth_response['session'] = session auth_response['session'] = session
if target_auth_url: target_req = reqs[1]
if target_auth_token:
target_auth = auth_plugin.Token(
auth_url=target_auth_url,
token=target_auth_token,
project_id=target_project_id,
project_name=target_project_name,
project_domain_name=target_project_domain_name,
project_domain_id=target_project_domain_id)
elif target_api_key and (target_username or target_user_id): if "auth_url" in target_req:
target_auth = auth_plugin.Password( target_auth = self._get_auth(**target_req)
auth_url=target_auth_url,
username=target_username,
user_id=target_user_id,
password=target_api_key,
user_domain_name=target_user_domain_name,
user_domain_id=target_user_domain_id,
project_id=target_project_id,
project_name=target_project_name,
project_domain_name=target_project_domain_name,
project_domain_id=target_project_domain_id)
else:
raise RuntimeError("You must either provide a valid token or "
"a password (target_api_key) and a user.")
target_session = ks_session.Session(auth=target_auth) if target_auth:
target_auth_headers = target_session.get_auth_headers() or {}
# NOTE: (sharatss) The target_auth_token is required here so that # target cacert and insecure
# it can be passed as a separate header later. cacert = target_req.get('cacert')
target_auth_token = target_auth_headers.get('X-Auth-Token') insecure = target_req.get('insecure')
auth_response.update({ verify = self._verification_needed(cacert, insecure)
api.TARGET_AUTH_TOKEN: target_auth_token,
api.TARGET_PROJECT_ID: target_session.get_project_id(), target_session = ks_session.Session(
api.TARGET_USER_ID: target_session.get_user_id(), auth=target_auth,
api.TARGET_AUTH_URI: target_auth_url, verify=verify
api.TARGET_SERVICE_CATALOG: jsonutils.dumps( )
target_auth.get_access(
target_session).service_catalog.catalog) target_auth_headers = target_session.get_auth_headers() or {}
})
target_auth_token = target_auth_headers.get('X-Auth-Token')
auth_response.update({
api.TARGET_AUTH_TOKEN: target_auth_token,
api.TARGET_PROJECT_ID: target_session.get_project_id(),
api.TARGET_USER_ID: target_session.get_user_id(),
api.TARGET_AUTH_URI: target_auth._plugin.auth_url,
})
access = target_auth.get_access(target_session)
service_catalog = access.service_catalog
if self._is_service_catalog_v2(service_catalog):
access_data = access._data["access"]
if not len(access_data['serviceCatalog']):
LOG.warning(
"Service Catalog empty, some authentication"
"credentials may be missing. This can cause"
"malfunction in the Mistral action executions.")
sc_json = jsonutils.dumps(access_data)
auth_response[api.TARGET_SERVICE_CATALOG] = sc_json
if not auth_response:
LOG.debug("No valid token or password + user provided. "
"Continuing without authentication")
return {}
return auth_response return auth_response
@staticmethod
def _verification_needed(cacert, insecure):
"""Return the verify parameter.
The value of verify can be either True/False or a cacert.
:param cacert None or path to CA cert file
:param insecure truthy value to switch on SSL verification
"""
if insecure is False or insecure is None:
verify = cacert or True
else:
verify = False
return verify
@staticmethod
def _is_service_catalog_v2(catalog):
"""Check if the service catalog is of type ServiceCatalogV2
:param catalog: the service catalog
:return: True if V2, False otherwise
"""
return type(catalog) is sc.ServiceCatalogV2

View File

@ -109,6 +109,8 @@ class BaseClientTests(base.BaseTestCase):
mistral_url_for_http = http_client_mock.call_args[0][0] mistral_url_for_http = http_client_mock.call_args[0][0]
self.assertEqual(MISTRAL_HTTP_URL, mistral_url_for_http) self.assertEqual(MISTRAL_HTTP_URL, mistral_url_for_http)
@mock.patch('mistralclient.auth.keystone.KeystoneAuthHandler'
'._is_service_catalog_v2', return_value=True)
@mock.patch('keystoneauth1.identity.generic.Password') @mock.patch('keystoneauth1.identity.generic.Password')
@mock.patch('keystoneauth1.session.Session') @mock.patch('keystoneauth1.session.Session')
@mock.patch('mistralclient.api.httpclient.HTTPClient') @mock.patch('mistralclient.api.httpclient.HTTPClient')
@ -116,14 +118,18 @@ class BaseClientTests(base.BaseTestCase):
self, self,
http_client_mock, http_client_mock,
session_mock, session_mock,
password_mock password_mock,
catalog_type_mock
): ):
session = mock.MagicMock() session = mock.MagicMock()
target_session = mock.MagicMock() target_session = mock.MagicMock()
session_mock.side_effect = [session, target_session] session_mock.side_effect = [session, target_session]
auth = mock.MagicMock() auth = mock.MagicMock()
password_mock.side_effect = [auth, auth] target_auth = mock.MagicMock()
target_auth._plugin.auth_url = AUTH_HTTP_URL_v3
password_mock.side_effect = [auth, target_auth]
get_endpoint = mock.Mock(return_value='http://mistral_host:8989/v2') get_endpoint = mock.Mock(return_value='http://mistral_host:8989/v2')
session.get_endpoint = get_endpoint session.get_endpoint = get_endpoint

View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mistralclient.auth.keystone
from mistralclient.tests.unit.v2 import base
class TestKeystone(base.BaseClientV2Test):
def setUp(self):
super(TestKeystone, self).setUp()
self.keystone = mistralclient.auth.keystone.KeystoneAuthHandler()
def test_get_auth_token(self):
auth = self.keystone._get_auth(
auth_token="token",
auth_url="url",
project_id="project_id",
)
self.assertEqual("url", auth.auth_url)
elements = auth.get_cache_id_elements()
self.assertIsNotNone(elements["token"])
self.assertIsNotNone(elements["project_id"])
def test_remove_domain(self):
params = {
"param1": "p",
"target_param2": "p2",
"user_domain_param3": "p3",
"target_project_domain_param4": "p4"
}
dedomained = self.keystone._remove_domain(params)
self.assertIn("param1", dedomained)
self.assertIn("target_param2", dedomained)
self.assertNotIn("user_domain_param3", dedomained)
self.assertNotIn("target_project_domain_param4", dedomained)
def test_separate_target_reqs(self):
params = {
"a": 1,
"target_b": 2,
"c": 3,
"target_d": 4,
"target_target": 5,
"param_target": 6
}
nontarget, target = self.keystone._separate_target_reqs(params)
self.assertIn("a", nontarget)
self.assertIn("c", nontarget)
self.assertIn("param_target", nontarget)
self.assertIn("b", target)
self.assertIn("d", target)
self.assertIn("target", target)
def test_verify(self):
self.assertTrue(self.keystone._verification_needed("", False))
self.assertFalse(self.keystone._verification_needed("", True))
self.assertFalse(self.keystone._verification_needed("cert", True))
self.assertEqual(
self.keystone._verification_needed("cert", False),
"cert"
)