Use Keystone client session.Session

This replaces the restapi requests wrapper with the one from Keystone client so
we can take advantage of the auth plugins.

As a first step only the v2 and v3 token and password plugins are supported.
This maintainis no changes to the command options or environment variables.

The next steps will include reworking the other API client interfaces to
fully utilize the single auth session.

Blueprint: ksc-session-auth
Change-Id: I47ec63291e4c3cf36c8061299a4764f60b36ab89
This commit is contained in:
Dean Troyer 2014-08-22 17:26:07 -05:00
parent 3317e0abf6
commit ae957b176e
10 changed files with 102 additions and 742 deletions

View File

@ -19,7 +19,9 @@ import logging
import pkg_resources
import sys
from openstackclient.common import restapi
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import session
from openstackclient.identity import client as identity_client
@ -80,24 +82,68 @@ class ClientManager(object):
self._cacert = verify
self._insecure = False
self.session = restapi.RESTApi(
verify=verify,
debug=True,
)
ver_prefix = identity_client.AUTH_VERSIONS[
self._api_version[identity_client.API_NAME]
]
# Get logging from root logger
root_logger = logging.getLogger('')
LOG.setLevel(root_logger.getEffectiveLevel())
restapi_logger = logging.getLogger('restapi')
restapi_logger.setLevel(root_logger.getEffectiveLevel())
# NOTE(dtroyer): These plugins are hard-coded for the first step
# in using the new Keystone auth plugins.
if self._url:
LOG.debug('Using token auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Token(
auth_url=url,
token=token,
)
else:
self.auth = v3_auth.Token(
auth_url=url,
token=token,
)
else:
LOG.debug('Using password auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
tenant_id=project_id,
tenant_name=project_name,
)
else:
self.auth = v3_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
)
self.session = session.Session(
auth=self.auth,
verify=verify,
)
self.auth_ref = None
if not self._url:
# Trigger the auth call
self.auth_ref = self.session.auth.get_auth_ref(self.session)
# Populate other password flow attributes
self.auth_ref = self.identity.auth_ref
self._token = self.identity.auth_token
self._service_catalog = self.identity.service_catalog
self._token = self.session.auth.get_token(self.session)
self._service_catalog = self.auth_ref.service_catalog
return

View File

@ -1,332 +0,0 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""REST API bits"""
import json
import logging
import requests
try:
from urllib.parse import urlencode # noqa
except ImportError:
from urllib import urlencode # noqa
USER_AGENT = 'RAPI'
_logger = logging.getLogger(__name__)
class RESTApi(object):
"""A REST API client that handles the interface from us to the server
RESTApi is requests.Session wrapper that knows how to do:
* JSON serialization/deserialization
* log requests in 'curl' format
* basic API boilerplate for create/delete/list/set/show verbs
* authentication is handled elsewhere and a token is passed in
The expectation that there will be a RESTApi object per authentication
token in use, i.e. project/username/auth_endpoint
On the other hand, a Client knows details about the specific REST Api that
it communicates with, such as the available endpoints, API versions, etc.
"""
def __init__(
self,
session=None,
auth_header=None,
user_agent=USER_AGENT,
verify=True,
logger=None,
debug=None,
):
"""Construct a new REST client
:param object session: A Session object to be used for
communicating with the identity service.
:param string auth_header: A token from an initialized auth_reference
to be used in the X-Auth-Token header
:param string user_agent: Set the User-Agent header in the requests
:param boolean/string verify: If ``True``, the SSL cert will be
verified. A CA_BUNDLE path can also be
provided.
:param logging.Logger logger: A logger to output to. (optional)
:param boolean debug: Enables debug logging of all request and
responses to identity service.
default False (optional)
"""
self.set_auth(auth_header)
self.debug = debug
if not session:
# We create a default session object
session = requests.Session()
self.session = session
self.session.verify = verify
self.session.user_agent = user_agent
if logger:
self.logger = logger
else:
self.logger = _logger
def set_auth(self, auth_header):
"""Sets the current auth blob"""
self.auth_header = auth_header
def set_header(self, header, content):
"""Sets passed in headers into the session headers
Replaces existing headers!!
"""
if content is None:
del self.session.headers[header]
else:
self.session.headers[header] = content
def request(self, method, url, **kwargs):
"""Make an authenticated (if token available) request
:param method: Request HTTP method
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
"""
kwargs.setdefault('headers', {})
if self.auth_header:
kwargs['headers']['X-Auth-Token'] = self.auth_header
if 'json' in kwargs and isinstance(kwargs['json'], type({})):
kwargs['data'] = json.dumps(kwargs.pop('json'))
kwargs['headers']['Content-Type'] = 'application/json'
kwargs.setdefault('allow_redirects', True)
if self.debug:
self._log_request(method, url, **kwargs)
response = self.session.request(method, url, **kwargs)
if self.debug:
self._log_response(response)
return self._error_handler(response)
def _error_handler(self, response):
if response.status_code < 200 or response.status_code >= 300:
self.logger.debug(
"ERROR: %s",
response.text,
)
response.raise_for_status()
return response
# Convenience methods to mimic the ones provided by requests.Session
def delete(self, url, **kwargs):
"""Send a DELETE request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('DELETE', url, **kwargs)
def get(self, url, **kwargs):
"""Send a GET request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('GET', url, **kwargs)
def head(self, url, **kwargs):
"""Send a HEAD request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
kwargs.setdefault('allow_redirects', False)
return self.request('HEAD', url, **kwargs)
def options(self, url, **kwargs):
"""Send an OPTIONS request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('OPTIONS', url, **kwargs)
def patch(self, url, data=None, json=None, **kwargs):
"""Send a PUT request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('PATCH', url, **kwargs)
def post(self, url, data=None, json=None, **kwargs):
"""Send a POST request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('POST', url, **kwargs)
def put(self, url, data=None, json=None, **kwargs):
"""Send a PUT request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('PUT', url, **kwargs)
# Command verb methods
def create(self, url, data=None, response_key=None, **kwargs):
"""Create a new object via a POST request
:param url: Request URL
:param data: Request body, wil be JSON encoded
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('POST', url, json=data, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def list(self, url, data=None, response_key=None, **kwargs):
"""Retrieve a list of objects via a GET or POST request
:param url: Request URL
:param data: Request body, will be JSON encoded
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if data:
response = self.request('POST', url, json=data, **kwargs)
else:
response = self.request('GET', url, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def set(self, url, data=None, response_key=None, **kwargs):
"""Update an object via a PUT request
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('PUT', url, json=data)
if data:
if response_key:
return response.json()[response_key]
else:
return response.json()
else:
# Nothing to do here
return None
def show(self, url, response_key=None, **kwargs):
"""Retrieve a single object via a GET request
:param url: Request URL
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('GET', url, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def _log_request(self, method, url, **kwargs):
if 'params' in kwargs and kwargs['params'] != {}:
url += '?' + urlencode(kwargs['params'])
string_parts = [
"curl -i",
"-X '%s'" % method,
"'%s'" % url,
]
for element in kwargs['headers']:
header = " -H '%s: %s'" % (element, kwargs['headers'][element])
string_parts.append(header)
self.logger.debug("REQ: %s" % " ".join(string_parts))
if 'data' in kwargs:
self.logger.debug(" REQ BODY: %r\n" % (kwargs['data']))
def _log_response(self, response):
self.logger.debug(
"RESP: [%s] %r\n",
response.status_code,
response.headers,
)
if response._content_consumed:
self.logger.debug(
" RESP BODY: %s\n",
response.text,
)
self.logger.debug(
" encoding: %s",
response.encoding,
)

View File

@ -29,6 +29,12 @@ API_VERSIONS = {
'3': 'keystoneclient.v3.client.Client',
}
# Translate our API version to auth plugin version prefix
AUTH_VERSIONS = {
'2.0': 'v2',
'3': 'v3',
}
def make_client(instance):
"""Returns an identity service client."""
@ -38,6 +44,8 @@ def make_client(instance):
API_VERSIONS)
LOG.debug('Instantiating identity client: %s', identity_client)
# TODO(dtroyer): Something doesn't like the session.auth when using
# token auth, chase that down.
if instance._url:
LOG.debug('Using token auth')
client = identity_client(
@ -50,32 +58,14 @@ def make_client(instance):
else:
LOG.debug('Using password auth')
client = identity_client(
username=instance._username,
password=instance._password,
user_domain_id=instance._user_domain_id,
user_domain_name=instance._user_domain_name,
project_domain_id=instance._project_domain_id,
project_domain_name=instance._project_domain_name,
domain_id=instance._domain_id,
domain_name=instance._domain_name,
tenant_name=instance._project_name,
tenant_id=instance._project_id,
auth_url=instance._auth_url,
region_name=instance._region_name,
session=instance.session,
cacert=instance._cacert,
insecure=instance._insecure,
trust_id=instance._trust_id,
)
# TODO(dtroyer): the identity v2 role commands use this yet, fix that
# so we can remove it
instance.auth_ref = client.auth_ref
# NOTE(dtroyer): this is hanging around until restapi is replace by
# ksc session
instance.session.set_auth(
client.auth_ref.auth_token,
)
# TODO(dtroyer): the identity v2 role commands use this yet, fix that
# so we can remove it
if not instance._url:
instance.auth_ref = instance.auth.get_auth_ref(instance.session)
return client

View File

@ -33,9 +33,8 @@ class IssueToken(show.ShowOne):
def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
identity_client = self.app.client_manager.identity
token = identity_client.service_catalog.get_token()
token = self.app.client_manager.auth_ref.service_catalog.get_token()
token['project_id'] = token.pop('tenant_id')
return zip(*sorted(six.iteritems(token)))

View File

@ -29,7 +29,7 @@ def create_container(
):
"""Create a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to create
:returns: dict of returned headers
@ -53,7 +53,7 @@ def delete_container(
):
"""Delete a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to delete
"""
@ -72,7 +72,7 @@ def list_containers(
):
"""Get containers in an account
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param marker: marker query
:param limit: limit query
@ -127,7 +127,7 @@ def show_container(
):
"""Get container details
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to show
:returns: dict of returned headers

View File

@ -32,7 +32,7 @@ def create_object(
):
"""Create an object, upload it to a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to store object
:param object: local path to object
@ -61,7 +61,7 @@ def delete_object(
):
"""Delete an object stored in a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container that stores object
:param container: name of object to delete
@ -84,7 +84,7 @@ def list_objects(
):
"""Get objects in a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: container name to get a listing for
:param marker: marker query
@ -158,7 +158,7 @@ def show_object(
):
"""Get object details
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: container name to get a listing for
:returns: dict of object properties

View File

@ -13,8 +13,10 @@
# under the License.
#
import mock
from keystoneclient.auth.identity import v2 as auth_v2
from openstackclient.common import clientmanager
from openstackclient.common import restapi
from openstackclient.tests import utils
@ -25,6 +27,10 @@ USERNAME = "itchy"
PASSWORD = "scratchy"
SERVICE_CATALOG = {'sc': '123'}
API_VERSION = {
'identity': '2.0',
}
def FakeMakeClient(instance):
return FakeClient()
@ -52,6 +58,7 @@ class TestClientCache(utils.TestCase):
self.assertEqual(c.attr, c.attr)
@mock.patch('keystoneclient.session.Session')
class TestClientManager(utils.TestCase):
def setUp(self):
super(TestClientManager, self).setUp()
@ -59,12 +66,13 @@ class TestClientManager(utils.TestCase):
clientmanager.ClientManager.identity = \
clientmanager.ClientCache(FakeMakeClient)
def test_client_manager_token(self):
def test_client_manager_token(self, mock):
client_manager = clientmanager.ClientManager(
token=AUTH_TOKEN,
url=AUTH_URL,
verify=True,
api_version=API_VERSION,
)
self.assertEqual(
@ -76,19 +84,20 @@ class TestClientManager(utils.TestCase):
client_manager._url,
)
self.assertIsInstance(
client_manager.session,
restapi.RESTApi,
client_manager.auth,
auth_v2.Token,
)
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
def test_client_manager_password(self):
def test_client_manager_password(self, mock):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify=False,
api_version=API_VERSION,
)
self.assertEqual(
@ -104,33 +113,20 @@ class TestClientManager(utils.TestCase):
client_manager._password,
)
self.assertIsInstance(
client_manager.session,
restapi.RESTApi,
client_manager.auth,
auth_v2.Password,
)
self.assertTrue(client_manager._insecure)
self.assertFalse(client_manager._verify)
# These need to stick around until the old-style clients are gone
self.assertEqual(
AUTH_REF,
client_manager.auth_ref,
)
self.assertEqual(
AUTH_TOKEN,
client_manager._token,
)
self.assertEqual(
SERVICE_CATALOG,
client_manager._service_catalog,
)
def test_client_manager_password_verify_ca(self):
def test_client_manager_password_verify_ca(self, mock):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify='cafile',
api_version=API_VERSION,
)
self.assertFalse(client_manager._insecure)

View File

@ -1,341 +0,0 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Test rest module"""
import json
import mock
import requests
import six
from openstackclient.common import restapi
from openstackclient.tests import utils
fake_user_agent = 'test_rapi'
fake_auth = '11223344556677889900'
fake_url = 'http://gopher.com'
fake_key = 'gopher'
fake_keys = 'gophers'
fake_gopher_mac = {
'id': 'g1',
'name': 'mac',
'actor': 'Mel Blanc',
}
fake_gopher_tosh = {
'id': 'g2',
'name': 'tosh',
'actor': 'Stan Freeberg',
}
fake_gopher_single = {
fake_key: fake_gopher_mac,
}
fake_gopher_list = {
fake_keys:
[
fake_gopher_mac,
fake_gopher_tosh,
]
}
fake_headers = {
'User-Agent': fake_user_agent,
}
class FakeResponse(requests.Response):
def __init__(self, headers={}, status_code=200, data=None, encoding=None):
super(FakeResponse, self).__init__()
self.status_code = status_code
self.headers.update(headers)
self._content = json.dumps(data)
if not isinstance(self._content, six.binary_type):
self._content = self._content.encode()
@mock.patch('openstackclient.common.restapi.requests.Session')
class TestRESTApi(utils.TestCase):
def test_request_get(self, session_mock):
resp = FakeResponse(status_code=200, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 200)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_get_return_300(self, session_mock):
resp = FakeResponse(status_code=300, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 300)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_get_fail_404(self, session_mock):
resp = FakeResponse(status_code=404, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
self.assertRaises(requests.HTTPError, api.request, 'GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
def test_request_get_auth(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
headers=mock.MagicMock(return_value={}),
)
api = restapi.RESTApi(
auth_header=fake_auth,
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={
'X-Auth-Token': fake_auth,
},
allow_redirects=True,
)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_post(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
data = fake_gopher_tosh
gopher = api.request('POST', fake_url, json=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers={
'Content-Type': 'application/json',
},
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher.json(), fake_gopher_single)
# Methods
# TODO(dtroyer): add the other method methods
def test_delete(self, session_mock):
resp = FakeResponse(status_code=200, data=None)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
gopher = api.delete(fake_url)
session_mock.return_value.request.assert_called_with(
'DELETE',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 200)
# Commands
def test_create(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
data = fake_gopher_mac
# Test no key
gopher = api.create(fake_url, data=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_single)
# Test with key
gopher = api.create(fake_url, data=data, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_mac)
def test_list(self, session_mock):
resp = FakeResponse(data=fake_gopher_list)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
# test base
api = restapi.RESTApi()
gopher = api.list(fake_url, response_key=fake_keys)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, [fake_gopher_mac, fake_gopher_tosh])
# test body
api = restapi.RESTApi()
data = {'qwerty': 1}
gopher = api.list(fake_url, response_key=fake_keys, data=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, [fake_gopher_mac, fake_gopher_tosh])
# test query params
api = restapi.RESTApi()
params = {'qaz': '123'}
gophers = api.list(fake_url, response_key=fake_keys, params=params)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
params=params,
)
self.assertEqual(gophers, [fake_gopher_mac, fake_gopher_tosh])
def test_set(self, session_mock):
new_gopher = fake_gopher_single
new_gopher[fake_key]['name'] = 'Chip'
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
data = fake_gopher_mac
data['name'] = 'Chip'
# Test no data, no key
gopher = api.set(fake_url)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
json=None,
)
self.assertEqual(gopher, None)
# Test data, no key
gopher = api.set(fake_url, data=data)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_single)
# NOTE:(dtroyer): Key and no data is not tested as without data
# the response_key is moot
# Test data and key
gopher = api.set(fake_url, data=data, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_mac)
def test_show(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
# Test no key
gopher = api.show(fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, fake_gopher_single)
# Test with key
gopher = api.show(fake_url, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, fake_gopher_mac)

View File

@ -125,7 +125,6 @@ class FakeIdentityv2Client(object):
def __init__(self, **kwargs):
self.roles = mock.Mock()
self.roles.resource_class = fakes.FakeResource(None, {})
self.service_catalog = mock.Mock()
self.services = mock.Mock()
self.services.resource_class = fakes.FakeResource(None, {})
self.tenants = mock.Mock()

View File

@ -13,6 +13,8 @@
# under the License.
#
import mock
from openstackclient.identity.v2_0 import token
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
@ -23,8 +25,9 @@ class TestToken(identity_fakes.TestIdentityv2):
super(TestToken, self).setUp()
# Get a shortcut to the Service Catalog Mock
self.sc_mock = self.app.client_manager.identity.service_catalog
self.sc_mock.reset_mock()
self.sc_mock = mock.Mock()
self.app.client_manager.auth_ref = mock.Mock()
self.app.client_manager.auth_ref.service_catalog = self.sc_mock
class TestTokenIssue(TestToken):