Add base files and service-list command

This patch adds the following:
* All the base files.
* Support for service-list command.

Change-Id: I8e63e6b49c7730fec8ed12723b117bece75b6909
NOTE: Code is copied from magnumclient.
This commit is contained in:
Madhuri Kumari 2016-07-05 06:05:00 +05:30
parent 9407989637
commit e38ba88d4d
34 changed files with 4766 additions and 13 deletions

View File

@ -21,7 +21,11 @@ classifier =
[files]
packages =
python-zunclient
zunclient
[entry_points]
console_scripts =
zun = zunclient.shell:main
[build_sphinx]
source-dir = doc/source
@ -32,18 +36,18 @@ all_files = 1
upload-dir = doc/build/html
[compile_catalog]
directory = python-zunclient/locale
domain = python-zunclient
directory = zunclient/locale
domain = zunclient
[update_catalog]
domain = python-zunclient
output_dir = python-zunclient/locale
input_file = python-zunclient/locale/python-zunclient.pot
domain = zunclient
output_dir = zunclient/locale
input_file = zunclient/locale/zunclient.pot
[extract_messages]
keywords = _ gettext ngettext l_ lazy_gettext
mapping_file = babel.cfg
output_file = python-zunclient/locale/python-zunclient.pot
output_file = zunclient/locale/zunclient.pot
[build_releasenotes]
all_files = 1

View File

@ -60,7 +60,7 @@ commands = ./coverage.sh {posargs}
show-source = True
ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
[hacking]
import_exceptions = zunclient.openstack.common._i18n
import_exceptions = zunclient._i18n

View File

@ -16,4 +16,4 @@ import pbr.version
__version__ = pbr.version.VersionInfo(
'zunclient').version_string()
'python-zunclient').version_string()

24
zunclient/client.py Normal file
View File

@ -0,0 +1,24 @@
# Copyright (c) 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zunclient.v1 import client
def Client(version='1', **kwargs):
"""Factory function to create a new container service client."""
if version != '1':
raise ValueError(
"zun only has one API version. Valid values for 'version'"
" are '1'")
return client.Client(**kwargs)

View File

View File

View File

@ -0,0 +1,231 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-zunclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
import abc
import argparse
import os
import six
from stevedore import extension
from zunclient.common.apiclient import exceptions
_discovered_plugins = {}
def discover_auth_systems():
"""Discover the available auth-systems.
This won't take into account the old style auth-systems.
"""
global _discovered_plugins
_discovered_plugins = {}
def add_plugin(ext):
_discovered_plugins[ext.name] = ext.plugin
ep_namespace = "zunclient.common.apiclient.auth"
mgr = extension.ExtensionManager(ep_namespace)
mgr.map(add_plugin)
def load_auth_system_opts(parser):
"""Load options needed by the available auth-systems into a parser.
This function will try to populate the parser with options from the
available plugins.
"""
group = parser.add_argument_group("Common auth options")
BaseAuthPlugin.add_common_opts(group)
for name, auth_plugin in _discovered_plugins.items():
group = parser.add_argument_group(
"Auth-system '%s' options" % name,
conflict_handler="resolve")
auth_plugin.add_opts(group)
def load_plugin(auth_system):
try:
plugin_class = _discovered_plugins[auth_system]
except KeyError:
raise exceptions.AuthSystemNotFound(auth_system)
return plugin_class(auth_system=auth_system)
def load_plugin_from_args(args):
"""Load required plugin and populate it with options.
Try to guess auth system if it is not specified. Systems are tried in
alphabetical order.
:type args: argparse.Namespace
:raises: AuthPluginOptionsMissing
"""
auth_system = args.os_auth_system
if auth_system:
plugin = load_plugin(auth_system)
plugin.parse_opts(args)
plugin.sufficient_options()
return plugin
for plugin_auth_system in sorted(six.iterkeys(_discovered_plugins)):
plugin_class = _discovered_plugins[plugin_auth_system]
plugin = plugin_class()
plugin.parse_opts(args)
try:
plugin.sufficient_options()
except exceptions.AuthPluginOptionsMissing:
continue
return plugin
raise exceptions.AuthPluginOptionsMissing(["auth_system"])
@six.add_metaclass(abc.ABCMeta)
class BaseAuthPlugin(object):
"""Base class for authentication plugins.
An authentication plugin needs to override at least the authenticate
method to be a valid plugin.
"""
auth_system = None
opt_names = []
common_opt_names = [
"auth_system",
"username",
"password",
"tenant_name",
"token",
"auth_url",
]
def __init__(self, auth_system=None, **kwargs):
self.auth_system = auth_system or self.auth_system
self.opts = dict((name, kwargs.get(name))
for name in self.opt_names)
@staticmethod
def _parser_add_opt(parser, opt):
"""Add an option to parser in two variants.
:param opt: option name (with underscores)
"""
dashed_opt = opt.replace("_", "-")
env_var = "OS_%s" % opt.upper()
arg_default = os.environ.get(env_var, "")
arg_help = "Defaults to env[%s]." % env_var
parser.add_argument(
"--os-%s" % dashed_opt,
metavar="<%s>" % dashed_opt,
default=arg_default,
help=arg_help)
parser.add_argument(
"--os_%s" % opt,
metavar="<%s>" % dashed_opt,
help=argparse.SUPPRESS)
@classmethod
def add_opts(cls, parser):
"""Populate the parser with the options for this plugin."""
for opt in cls.opt_names:
# use `BaseAuthPlugin.common_opt_names` since it is never
# changed in child classes
if opt not in BaseAuthPlugin.common_opt_names:
cls._parser_add_opt(parser, opt)
@classmethod
def add_common_opts(cls, parser):
"""Add options that are common for several plugins."""
for opt in cls.common_opt_names:
cls._parser_add_opt(parser, opt)
@staticmethod
def get_opt(opt_name, args):
"""Return option name and value.
:param opt_name: name of the option, e.g., "username"
:param args: parsed arguments
"""
return (opt_name, getattr(args, "os_%s" % opt_name, None))
def parse_opts(self, args):
"""Parse the actual auth-system options if any.
This method is expected to populate the attribute `self.opts` with a
dict containing the options and values needed to make authentication.
"""
self.opts.update(dict(self.get_opt(opt_name, args)
for opt_name in self.opt_names))
def authenticate(self, http_client):
"""Authenticate using plugin defined method.
The method usually analyses `self.opts` and performs
a request to authentication server.
:param http_client: client object that needs authentication
:type http_client: HTTPClient
:raises: AuthorizationFailure
"""
self.sufficient_options()
self._do_authenticate(http_client)
@abc.abstractmethod
def _do_authenticate(self, http_client):
"""Protected method for authentication."""
def sufficient_options(self):
"""Check if all required options are present.
:raises: AuthPluginOptionsMissing
"""
missing = [opt
for opt in self.opt_names
if not self.opts.get(opt)]
if missing:
raise exceptions.AuthPluginOptionsMissing(missing)
@abc.abstractmethod
def token_and_endpoint(self, endpoint_type, service_type):
"""Return token and endpoint.
:param service_type: Service type of the endpoint
:type service_type: string
:param endpoint_type: Type of endpoint.
Possible values: public or publicURL,
internal or internalURL,
admin or adminURL
:type endpoint_type: string
:returns: tuple of token and endpoint strings
:raises: EndpointException
"""

View File

@ -0,0 +1,531 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# Copyright 2012 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-zunclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
# E1102: %s is not callable
# pylint: disable=E1102
import abc
import copy
from oslo_utils import strutils
import six
from six.moves.urllib import parse
from zunclient.common.apiclient import exceptions
from zunclient.i18n import _
def getid(obj):
"""Return id if argument is a Resource.
Abstracts the common pattern of allowing both an object or an object's ID
(UUID) as a parameter when dealing with relationships.
"""
try:
if obj.uuid:
return obj.uuid
except AttributeError:
pass
try:
return obj.id
except AttributeError:
return obj
# TODO(aababilov): call run_hooks() in HookableMixin's child classes
class HookableMixin(object):
"""Mixin so classes can register and run hooks."""
_hooks_map = {}
@classmethod
def add_hook(cls, hook_type, hook_func):
"""Add a new hook of specified type.
:param cls: class that registers hooks
:param hook_type: hook type, e.g., '__pre_parse_args__'
:param hook_func: hook function
"""
if hook_type not in cls._hooks_map:
cls._hooks_map[hook_type] = []
cls._hooks_map[hook_type].append(hook_func)
@classmethod
def run_hooks(cls, hook_type, *args, **kwargs):
"""Run all hooks of specified type.
:param cls: class that registers hooks
:param hook_type: hook type, e.g., '__pre_parse_args__'
:param args: args to be passed to every hook function
:param kwargs: kwargs to be passed to every hook function
"""
hook_funcs = cls._hooks_map.get(hook_type) or []
for hook_func in hook_funcs:
hook_func(*args, **kwargs)
class BaseManager(HookableMixin):
"""Basic manager type providing common operations.
Managers interact with a particular type of API (servers, flavors, images,
etc.) and provide CRUD operations for them.
"""
resource_class = None
def __init__(self, client):
"""Initializes BaseManager with `client`.
:param client: instance of BaseClient descendant for HTTP requests
"""
super(BaseManager, self).__init__()
self.client = client
def _list(self, url, response_key=None, obj_class=None, json=None):
"""List the collection.
:param url: a partial URL, e.g., '/servers'
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'. If response_key is None - all response body
will be used.
:param obj_class: class for constructing the returned objects
(self.resource_class will be used by default)
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
"""
if json:
body = self.client.post(url, json=json).json()
else:
body = self.client.get(url).json()
if obj_class is None:
obj_class = self.resource_class
data = body[response_key] if response_key is not None else body
# NOTE(ja): keystone returns values as list as {'values': [ ... ]}
# unlike other services which just return the list...
try:
data = data['values']
except (KeyError, TypeError):
pass
return [obj_class(self, res, loaded=True) for res in data if res]
def _get(self, url, response_key=None):
"""Get an object from collection.
:param url: a partial URL, e.g., '/servers'
:param response_key: the key to be looked up in response dictionary,
e.g., 'server'. If response_key is None - all response body
will be used.
"""
body = self.client.get(url).json()
data = body[response_key] if response_key is not None else body
return self.resource_class(self, data, loaded=True)
def _head(self, url):
"""Retrieve request headers for an object.
:param url: a partial URL, e.g., '/servers'
"""
resp = self.client.head(url)
return resp.status_code == 204
def _post(self, url, json, response_key=None, return_raw=False):
"""Create an object.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'server'. If response_key is None - all response body
will be used.
:param return_raw: flag to force returning raw JSON instead of
Python object of self.resource_class
"""
body = self.client.post(url, json=json).json()
data = body[response_key] if response_key is not None else body
if return_raw:
return data
return self.resource_class(self, data)
def _put(self, url, json=None, response_key=None):
"""Update an object with PUT method.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'. If response_key is None - all response body
will be used.
"""
resp = self.client.put(url, json=json)
# PUT requests may not return a body
if resp.content:
body = resp.json()
if response_key is not None:
return self.resource_class(self, body[response_key])
else:
return self.resource_class(self, body)
def _patch(self, url, json=None, response_key=None):
"""Update an object with PATCH method.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'. If response_key is None - all response body
will be used.
"""
body = self.client.patch(url, json=json).json()
if response_key is not None:
return self.resource_class(self, body[response_key])
else:
return self.resource_class(self, body)
def _delete(self, url):
"""Delete an object.
:param url: a partial URL, e.g., '/servers/my-server'
"""
return self.client.delete(url)
@six.add_metaclass(abc.ABCMeta)
class ManagerWithFind(BaseManager):
"""Manager with additional `find()`/`findall()` methods."""
@abc.abstractmethod
def list(self):
pass
def find(self, **kwargs):
"""Find a single item with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
matches = self.findall(**kwargs)
num_matches = len(matches)
if num_matches == 0:
msg = _("No %(name)s matching %(args)s.") % {
'name': self.resource_class.__name__,
'args': kwargs
}
raise exceptions.NotFound(msg)
elif num_matches > 1:
raise exceptions.NoUniqueMatch()
else:
return matches[0]
def findall(self, **kwargs):
"""Find all items with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
found = []
searches = kwargs.items()
for obj in self.list():
try:
if all(getattr(obj, attr) == value
for (attr, value) in searches):
found.append(obj)
except AttributeError:
continue
return found
class CrudManager(BaseManager):
"""Base manager class for manipulating entities.
Children of this class are expected to define a `collection_key` and `key`.
- `collection_key`: Usually a plural noun by convention (e.g. `entities`);
used to refer collections in both URL's (e.g. `/v3/entities`) and JSON
objects containing a list of member resources (e.g. `{'entities': [{},
{}, {}]}`).
- `key`: Usually a singular noun by convention (e.g. `entity`); used to
refer to an individual member of the collection.
"""
collection_key = None
key = None
def build_url(self, base_url=None, **kwargs):
"""Builds a resource URL for the given kwargs.
Given an example collection where `collection_key = 'entities'` and
`key = 'entity'`, the following URL's could be generated.
By default, the URL will represent a collection of entities, e.g.::
/entities
If kwargs contains an `entity_id`, then the URL will represent a
specific member, e.g.::
/entities/{entity_id}
:param base_url: if provided, the generated URL will be appended to it
"""
url = base_url if base_url is not None else ''
url += '/%s' % self.collection_key
# do we have a specific entity?
entity_id = kwargs.get('%s_id' % self.key)
if entity_id is not None:
url += '/%s' % entity_id
return url
def _filter_kwargs(self, kwargs):
"""Drop null values and handle ids."""
for key, ref in kwargs.copy().items():
if ref is None:
kwargs.pop(key)
else:
if isinstance(ref, Resource):
kwargs.pop(key)
kwargs['%s_id' % key] = getid(ref)
return kwargs
def create(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._post(
self.build_url(**kwargs),
{self.key: kwargs},
self.key)
def get(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._get(
self.build_url(**kwargs),
self.key)
def head(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._head(self.build_url(**kwargs))
def list(self, base_url=None, **kwargs):
"""List the collection.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
return self._list(
'%(base_url)s%(query)s' % {
'base_url': self.build_url(base_url=base_url, **kwargs),
'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
},
self.collection_key)
def put(self, base_url=None, **kwargs):
"""Update an element.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
return self._put(self.build_url(base_url=base_url, **kwargs))
def update(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
params = kwargs.copy()
params.pop('%s_id' % self.key)
return self._patch(
self.build_url(**kwargs),
{self.key: params},
self.key)
def delete(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._delete(
self.build_url(**kwargs))
def find(self, base_url=None, **kwargs):
"""Find a single item with attributes matching ``**kwargs``.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
rl = self._list(
'%(base_url)s%(query)s' % {
'base_url': self.build_url(base_url=base_url, **kwargs),
'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
},
self.collection_key)
num = len(rl)
if num == 0:
msg = _("No %(name)s matching %(args)s.") % {
'name': self.resource_class.__name__,
'args': kwargs
}
raise exceptions.NotFound(msg)
elif num > 1:
raise exceptions.NoUniqueMatch
else:
return rl[0]
class Extension(HookableMixin):
"""Extension descriptor."""
SUPPORTED_HOOKS = ('__pre_parse_args__', '__post_parse_args__')
manager_class = None
def __init__(self, name, module):
super(Extension, self).__init__()
self.name = name
self.module = module
self._parse_extension_module()
def _parse_extension_module(self):
self.manager_class = None
for attr_name, attr_value in self.module.__dict__.items():
if attr_name in self.SUPPORTED_HOOKS:
self.add_hook(attr_name, attr_value)
else:
try:
if issubclass(attr_value, BaseManager):
self.manager_class = attr_value
except TypeError:
pass
def __repr__(self):
return "<Extension '%s'>" % self.name
class Resource(object):
"""Base class for OpenStack resources (tenant, user, etc.).
This is pretty much just a bag for attributes.
"""
HUMAN_ID = False
NAME_ATTR = 'name'
def __init__(self, manager, info, loaded=False):
"""Populate and bind to a manager.
:param manager: BaseManager object
:param info: dictionary representing resource attributes
:param loaded: prevent lazy-loading if set to True
"""
self.manager = manager
self._info = info
self._add_details(info)
self._loaded = loaded
def __repr__(self):
reprkeys = sorted(k
for k in self.__dict__.keys()
if k[0] != '_' and k != 'manager')
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)
@property
def human_id(self):
"""Human-readable ID which can be used for bash completion."""
if self.HUMAN_ID:
name = getattr(self, self.NAME_ATTR, None)
if name is not None:
return strutils.to_slug(name)
return None
def _add_details(self, info):
for (k, v) in info.items():
try:
setattr(self, k, v)
self._info[k] = v
except AttributeError:
# In this case we already defined the attribute on the class
pass
def __getattr__(self, k):
if k not in self.__dict__:
# NOTE(bcwaldon): disallow lazy-loading if already loaded once
if not self.is_loaded():
self.get()
return self.__getattr__(k)
raise AttributeError(k)
else:
return self.__dict__[k]
def get(self):
"""Support for lazy loading details.
Some clients, such as novaclient have the option to lazy load the
details, details which can be loaded with this function.
"""
# set_loaded() first ... so if we have to bail, we know we tried.
self.set_loaded(True)
if not hasattr(self.manager, 'get'):
return
new = self.manager.get(self.id)
if new:
self._add_details(new._info)
self._add_details(
{'x_request_id': self.manager.client.last_request_id})
def __eq__(self, other):
if not isinstance(other, Resource):
return NotImplemented
# two resources of different types are not equal
if not isinstance(other, self.__class__):
return False
if hasattr(self, 'id') and hasattr(other, 'id'):
return self.id == other.id
return self._info == other._info
def is_loaded(self):
return self._loaded
def set_loaded(self, val):
self._loaded = val
def to_dict(self):
return copy.deepcopy(self._info)

View File

@ -0,0 +1,477 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 Nebula, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception definitions.
"""
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-zunclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
import inspect
import sys
import six
from zunclient.i18n import _
class ClientException(Exception):
"""The base exception class for all exceptions this library raises."""
pass
class ValidationError(ClientException):
"""Error in validation on API client side."""
pass
class UnsupportedVersion(ClientException):
"""User is trying to use an unsupported version of the API."""
pass
class CommandError(ClientException):
"""Error in CLI tool."""
pass
class AuthorizationFailure(ClientException):
"""Cannot authorize API client."""
pass
class ConnectionError(ClientException):
"""Cannot connect to API service."""
pass
class ConnectionRefused(ConnectionError):
"""Connection refused while trying to connect to API service."""
pass
class AuthPluginOptionsMissing(AuthorizationFailure):
"""Auth plugin misses some options."""
def __init__(self, opt_names):
super(AuthPluginOptionsMissing, self).__init__(
_("Authentication failed. Missing options: %s") %
", ".join(opt_names))
self.opt_names = opt_names
class AuthSystemNotFound(AuthorizationFailure):
"""User has specified an AuthSystem that is not installed."""
def __init__(self, auth_system):
super(AuthSystemNotFound, self).__init__(
_("AuthSystemNotFound: %r") % auth_system)
self.auth_system = auth_system
class NoUniqueMatch(ClientException):
"""Multiple entities found instead of one."""
pass
class EndpointException(ClientException):
"""Something is rotten in Service Catalog."""
pass
class EndpointNotFound(EndpointException):
"""Could not find requested endpoint in Service Catalog."""
pass
class AmbiguousEndpoints(EndpointException):
"""Found more than one matching endpoint in Service Catalog."""
def __init__(self, endpoints=None):
super(AmbiguousEndpoints, self).__init__(
_("AmbiguousEndpoints: %r") % endpoints)
self.endpoints = endpoints
class HttpError(ClientException):
"""The base exception class for all HTTP exceptions."""
http_status = 0
message = _("HTTP Error")
def __init__(self, message=None, details=None,
response=None, request_id=None,
url=None, method=None, http_status=None):
self.http_status = http_status or self.http_status
self.message = message or self.message
self.details = details
self.request_id = request_id
self.response = response
self.url = url
self.method = method
formatted_string = "%s (HTTP %s)" % (self.message, self.http_status)
if request_id:
formatted_string += " (Request-ID: %s)" % request_id
super(HttpError, self).__init__(formatted_string)
class HTTPRedirection(HttpError):
"""HTTP Redirection."""
message = _("HTTP Redirection")
class HTTPClientError(HttpError):
"""Client-side HTTP error.
Exception for cases in which the client seems to have erred.
"""
message = _("HTTP Client Error")
class HttpServerError(HttpError):
"""Server-side HTTP error.
Exception for cases in which the server is aware that it has
erred or is incapable of performing the request.
"""
message = _("HTTP Server Error")
class MultipleChoices(HTTPRedirection):
"""HTTP 300 - Multiple Choices.
Indicates multiple options for the resource that the client may follow.
"""
http_status = 300
message = _("Multiple Choices")
class BadRequest(HTTPClientError):
"""HTTP 400 - Bad Request.
The request cannot be fulfilled due to bad syntax.
"""
http_status = 400
message = _("Bad Request")
class Unauthorized(HTTPClientError):
"""HTTP 401 - Unauthorized.
Similar to 403 Forbidden, but specifically for use when authentication
is required and has failed or has not yet been provided.
"""
http_status = 401
message = _("Unauthorized")
class PaymentRequired(HTTPClientError):
"""HTTP 402 - Payment Required.
Reserved for future use.
"""
http_status = 402
message = _("Payment Required")
class Forbidden(HTTPClientError):
"""HTTP 403 - Forbidden.
The request was a valid request, but the server is refusing to respond
to it.
"""
http_status = 403
message = _("Forbidden")
class NotFound(HTTPClientError):
"""HTTP 404 - Not Found.
The requested resource could not be found but may be available again
in the future.
"""
http_status = 404
message = _("Not Found")
class MethodNotAllowed(HTTPClientError):
"""HTTP 405 - Method Not Allowed.
A request was made of a resource using a request method not supported
by that resource.
"""
http_status = 405
message = _("Method Not Allowed")
class NotAcceptable(HTTPClientError):
"""HTTP 406 - Not Acceptable.
The requested resource is only capable of generating content not
acceptable according to the Accept headers sent in the request.
"""
http_status = 406
message = _("Not Acceptable")
class ProxyAuthenticationRequired(HTTPClientError):
"""HTTP 407 - Proxy Authentication Required.
The client must first authenticate itself with the proxy.
"""
http_status = 407
message = _("Proxy Authentication Required")
class RequestTimeout(HTTPClientError):
"""HTTP 408 - Request Timeout.
The server timed out waiting for the request.
"""
http_status = 408
message = _("Request Timeout")
class Conflict(HTTPClientError):
"""HTTP 409 - Conflict.
Indicates that the request could not be processed because of conflict
in the request, such as an edit conflict.
"""
http_status = 409
message = _("Conflict")
class Gone(HTTPClientError):
"""HTTP 410 - Gone.
Indicates that the resource requested is no longer available and will
not be available again.
"""
http_status = 410
message = _("Gone")
class LengthRequired(HTTPClientError):
"""HTTP 411 - Length Required.
The request did not specify the length of its content, which is
required by the requested resource.
"""
http_status = 411
message = _("Length Required")
class PreconditionFailed(HTTPClientError):
"""HTTP 412 - Precondition Failed.
The server does not meet one of the preconditions that the requester
put on the request.
"""
http_status = 412
message = _("Precondition Failed")
class RequestEntityTooLarge(HTTPClientError):
"""HTTP 413 - Request Entity Too Large.
The request is larger than the server is willing or able to process.
"""
http_status = 413
message = _("Request Entity Too Large")
def __init__(self, *args, **kwargs):
try:
self.retry_after = int(kwargs.pop('retry_after'))
except (KeyError, ValueError):
self.retry_after = 0
super(RequestEntityTooLarge, self).__init__(*args, **kwargs)
class RequestUriTooLong(HTTPClientError):
"""HTTP 414 - Request-URI Too Long.
The URI provided was too long for the server to process.
"""
http_status = 414
message = _("Request-URI Too Long")
class UnsupportedMediaType(HTTPClientError):
"""HTTP 415 - Unsupported Media Type.
The request entity has a media type which the server or resource does
not support.
"""
http_status = 415
message = _("Unsupported Media Type")
class RequestedRangeNotSatisfiable(HTTPClientError):
"""HTTP 416 - Requested Range Not Satisfiable.
The client has asked for a portion of the file, but the server cannot
supply that portion.
"""
http_status = 416
message = _("Requested Range Not Satisfiable")
class ExpectationFailed(HTTPClientError):
"""HTTP 417 - Expectation Failed.
The server cannot meet the requirements of the Expect request-header field.
"""
http_status = 417
message = _("Expectation Failed")
class UnprocessableEntity(HTTPClientError):
"""HTTP 422 - Unprocessable Entity.
The request was well-formed but was unable to be followed due to semantic
errors.
"""
http_status = 422
message = _("Unprocessable Entity")
class InternalServerError(HttpServerError):
"""HTTP 500 - Internal Server Error.
A generic error message, given when no more specific message is suitable.
"""
http_status = 500
message = _("Internal Server Error")
# NotImplemented is a python keyword.
class HttpNotImplemented(HttpServerError):
"""HTTP 501 - Not Implemented.
The server either does not recognize the request method, or it lacks
the ability to fulfill the request.
"""
http_status = 501
message = _("Not Implemented")
class BadGateway(HttpServerError):
"""HTTP 502 - Bad Gateway.
The server was acting as a gateway or proxy and received an invalid
response from the upstream server.
"""
http_status = 502
message = _("Bad Gateway")
class ServiceUnavailable(HttpServerError):
"""HTTP 503 - Service Unavailable.
The server is currently unavailable.
"""
http_status = 503
message = _("Service Unavailable")
class GatewayTimeout(HttpServerError):
"""HTTP 504 - Gateway Timeout.
The server was acting as a gateway or proxy and did not receive a timely
response from the upstream server.
"""
http_status = 504
message = _("Gateway Timeout")
class HttpVersionNotSupported(HttpServerError):
"""HTTP 505 - HttpVersion Not Supported.
The server does not support the HTTP protocol version used in the request.
"""
http_status = 505
message = _("HTTP Version Not Supported")
# _code_map contains all the classes that have http_status attribute.
_code_map = dict(
(getattr(obj, 'http_status', None), obj)
for name, obj in vars(sys.modules[__name__]).items()
if inspect.isclass(obj) and getattr(obj, 'http_status', False)
)
def from_response(response, method, url):
"""Returns an instance of :class:`HttpError` or subclass based on response.
:param response: instance of `requests.Response` class
:param method: HTTP method used for request
:param url: URL used for request
"""
req_id = response.headers.get("x-openstack-request-id")
# NOTE(hdd) true for older versions of nova and cinder
if not req_id:
req_id = response.headers.get("x-compute-request-id")
kwargs = {
"http_status": response.status_code,
"response": response,
"method": method,
"url": url,
"request_id": req_id,
}
if "retry-after" in response.headers:
kwargs["retry_after"] = response.headers["retry-after"]
content_type = response.headers.get("Content-Type", "")
if content_type.startswith("application/json"):
try:
body = response.json()
except ValueError:
pass
else:
if isinstance(body, dict):
error = body.get(list(body)[0])
if isinstance(error, dict):
kwargs["message"] = (error.get("message") or
error.get("faultstring"))
kwargs["details"] = (error.get("details") or
six.text_type(body))
elif content_type.startswith("text/"):
kwargs["details"] = getattr(response, 'text', '')
try:
cls = _code_map[response.status_code]
except KeyError:
if 500 <= response.status_code < 600:
cls = HttpServerError
elif 400 <= response.status_code < 500:
cls = HTTPClientError
else:
cls = HttpError
return cls(**kwargs)

146
zunclient/common/base.py Normal file
View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
import copy
import six.moves.urllib.parse as urlparse
from zunclient.common.apiclient import base
def getid(obj):
"""Wrapper to get object's ID.
Abstracts the common pattern of allowing both an object or an
object's ID (UUID) as a parameter when dealing with relationships.
"""
try:
return obj.id
except AttributeError:
return obj
class Manager(object):
"""Provides CRUD operations with a particular API."""
resource_class = None
def __init__(self, api):
self.api = api
def _create(self, url, body):
resp, body = self.api.json_request('POST', url, body=body)
if body:
return self.resource_class(self, body)
def _format_body_data(self, body, response_key):
if response_key:
try:
data = body[response_key]
except KeyError:
return []
else:
data = body
if not isinstance(data, list):
data = [data]
return data
def _list_pagination(self, url, response_key=None, obj_class=None,
limit=None):
"""Retrieve a list of items.
The Zun API is configured to return a maximum number of
items per request, (FIXME: see Zun's api.max_limit option). This
iterates over the 'next' link (pagination) in the responses,
to get the number of items specified by 'limit'. If 'limit'
is None this function will continue pagination until there are
no more values to be returned.
:param url: a partial URL, e.g. '/nodes'
:param response_key: the key to be looked up in response
dictionary, e.g. 'nodes'
:param obj_class: class for constructing the returned objects.
:param limit: maximum number of items to return. If None returns
everything.
"""
if obj_class is None:
obj_class = self.resource_class
if limit is not None:
limit = int(limit)
object_list = []
object_count = 0
limit_reached = False
while url:
resp, body = self.api.json_request('GET', url)
data = self._format_body_data(body, response_key)
for obj in data:
object_list.append(obj_class(self, obj, loaded=True))
object_count += 1
if limit and object_count >= limit:
# break the for loop
limit_reached = True
break
# break the while loop and return
if limit_reached:
break
url = body.get('next')
if url:
# NOTE(lucasagomes): We need to edit the URL to remove
# the scheme and netloc
url_parts = list(urlparse.urlparse(url))
url_parts[0] = url_parts[1] = ''
url = urlparse.urlunparse(url_parts)
return object_list
def _list(self, url, response_key=None, obj_class=None, body=None):
resp, body = self.api.json_request('GET', url)
if obj_class is None:
obj_class = self.resource_class
data = self._format_body_data(body, response_key)
return [obj_class(self, res, loaded=True) for res in data if res]
def _update(self, url, body, method='PATCH', response_key=None):
resp, body = self.api.json_request(method, url, body=body)
# PATCH/PUT requests may not return a body
if body:
return self.resource_class(self, body)
def _delete(self, url):
self.api.raw_request('DELETE', url)
class Resource(base.Resource):
"""Represents a particular instance of an object (tenant, user, etc).
This is pretty much just a bag for attributes.
"""
def to_dict(self):
return copy.deepcopy(self._info)

View File

@ -0,0 +1,289 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# W0603: Using the global statement
# W0621: Redefining name %s from outer scope
# pylint: disable=W0603,W0621
from __future__ import print_function
import getpass
import inspect
import os
import sys
import textwrap
from oslo_utils import encodeutils
from oslo_utils import strutils
import prettytable
import six
from six import moves
from zunclient.i18n import _
class MissingArgs(Exception):
"""Supplied arguments are not sufficient for calling a function."""
def __init__(self, missing):
self.missing = missing
msg = _("Missing arguments: %s") % ", ".join(missing)
super(MissingArgs, self).__init__(msg)
def validate_args(fn, *args, **kwargs):
"""Check that the supplied args are sufficient for calling a function.
>>> validate_args(lambda a: None)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): a
>>> validate_args(lambda a, b, c, d: None, 0, c=1)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): b, d
:param fn: the function to check
:param arg: the positional arguments supplied
:param kwargs: the keyword arguments supplied
"""
argspec = inspect.getargspec(fn)
num_defaults = len(argspec.defaults or [])
required_args = argspec.args[:len(argspec.args) - num_defaults]
def isbound(method):
return getattr(method, '__self__', None) is not None
if isbound(fn):
required_args.pop(0)
missing = [arg for arg in required_args if arg not in kwargs]
missing = missing[len(args):]
if missing:
raise MissingArgs(missing)
def arg(*args, **kwargs):
"""Decorator for CLI args.
Example:
>>> @arg("name", help="Name of the new entity")
... def entity_create(args):
... pass
"""
def _decorator(func):
add_arg(func, *args, **kwargs)
return func
return _decorator
def env(*args, **kwargs):
"""Returns the first environment variable set.
If all are empty, defaults to '' or keyword arg `default`.
"""
for arg in args:
value = os.environ.get(arg)
if value:
return value
return kwargs.get('default', '')
def add_arg(func, *args, **kwargs):
"""Bind CLI arguments to a shell.py `do_foo` function."""
if not hasattr(func, 'arguments'):
func.arguments = []
# NOTE(sirp): avoid dups that can occur when the module is shared across
# tests.
if (args, kwargs) not in func.arguments:
# Because of the semantics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.arguments.insert(0, (args, kwargs))
def unauthenticated(func):
"""Adds 'unauthenticated' attribute to decorated function.
Usage:
>>> @unauthenticated
... def mymethod(f):
... pass
"""
func.unauthenticated = True
return func
def isunauthenticated(func):
"""Checks if the function does not require authentication.
Mark such functions with the `@unauthenticated` decorator.
:returns: bool
"""
return getattr(func, 'unauthenticated', False)
def print_list(objs, fields, formatters=None, sortby_index=0,
mixed_case_fields=None, field_labels=None):
"""Print a list or objects as a table, one row per object.
:param objs: iterable of :class:`Resource`
:param fields: attributes that correspond to columns, in order
:param formatters: `dict` of callables for field formatting
:param sortby_index: index of the field for sorting table rows
:param mixed_case_fields: fields corresponding to object attributes that
have mixed case names (e.g., 'serverId')
:param field_labels: Labels to use in the heading of the table, default to
fields.
"""
formatters = formatters or {}
mixed_case_fields = mixed_case_fields or []
field_labels = field_labels or fields
if len(field_labels) != len(fields):
raise ValueError(_("Field labels list %(labels)s has different number "
"of elements than fields list %(fields)s"),
{'labels': field_labels, 'fields': fields})
if sortby_index is None:
kwargs = {}
else:
kwargs = {'sortby': field_labels[sortby_index]}
pt = prettytable.PrettyTable(field_labels)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
row.append(data)
pt.add_row(row)
if six.PY3:
print(encodeutils.safe_encode(pt.get_string(**kwargs)).decode())
else:
print(encodeutils.safe_encode(pt.get_string(**kwargs)))
def keys_and_vals_to_strs(dictionary):
"""Recursively convert a dictionary's keys and values to strings.
:param dictionary: dictionary whose keys/vals are to be converted to strs
"""
def to_str(k_or_v):
if isinstance(k_or_v, dict):
return keys_and_vals_to_strs(k_or_v)
elif isinstance(k_or_v, six.text_type):
return str(k_or_v)
else:
return k_or_v
return dict((to_str(k), to_str(v)) for k, v in dictionary.items())
def print_dict(dct, dict_property="Property", wrap=0):
"""Print a `dict` as a table of two columns.
:param dct: `dict` to print
:param dict_property: name of the first column
:param wrap: wrapping for the second column
"""
pt = prettytable.PrettyTable([dict_property, 'Value'])
pt.align = 'l'
for k, v in dct.items():
# convert dict to str to check length
if isinstance(v, dict):
v = six.text_type(keys_and_vals_to_strs(v))
if wrap > 0:
v = textwrap.fill(six.text_type(v), wrap)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, six.string_types) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
elif isinstance(v, list):
val = str([str(i) for i in v])
pt.add_row([k, val])
else:
pt.add_row([k, v])
if six.PY3:
print(encodeutils.safe_encode(pt.get_string()).decode())
else:
print(encodeutils.safe_encode(pt.get_string()))
def get_password(max_password_prompts=3):
"""Read password from TTY."""
verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD"))
pw = None
if hasattr(sys.stdin, "isatty") and sys.stdin.isatty():
# Check for Ctrl-D
try:
for __ in moves.range(max_password_prompts):
pw1 = getpass.getpass("OS Password: ")
if verify:
pw2 = getpass.getpass("Please verify: ")
else:
pw2 = pw1
if pw1 == pw2 and pw1:
pw = pw1
break
except EOFError:
pass
return pw
def service_type(stype):
"""Adds 'service_type' attribute to decorated function.
Usage:
.. code-block:: python
@service_type('volume')
def mymethod(f):
...
"""
def inner(f):
f.service_type = stype
return f
return inner
def get_service_type(f):
"""Retrieves service type from function."""
return getattr(f, 'service_type', None)
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
def exit(msg=''):
if msg:
print (msg, file=sys.stderr)
sys.exit(1)

View File

@ -0,0 +1,400 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import logging
import os
import socket
import ssl
from keystoneauth1 import adapter
import six
import six.moves.urllib.parse as urlparse
from zunclient import exceptions
LOG = logging.getLogger(__name__)
USER_AGENT = 'python-zunclient'
CHUNKSIZE = 1024 * 64 # 64kB
API_VERSION = '/v1'
def _extract_error_json(body):
"""Return error_message from the HTTP response body."""
error_json = {}
try:
body_json = json.loads(body)
if 'error_message' in body_json:
raw_msg = body_json['error_message']
error_json = json.loads(raw_msg)
else:
error_body = body_json['errors'][0]
raw_msg = error_body['title']
error_json = {'faultstring': error_body['title'],
'debuginfo': error_body['detail']}
except ValueError:
return {}
return error_json
class HTTPClient(object):
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.auth_token = kwargs.get('token')
self.auth_ref = kwargs.get('auth_ref')
self.connection_params = self.get_connection_params(endpoint, **kwargs)
@staticmethod
def get_connection_params(endpoint, **kwargs):
parts = urlparse.urlparse(endpoint)
# trim API version and trailing slash from endpoint
path = parts.path
path = path.rstrip('/').rstrip(API_VERSION)
_args = (parts.hostname, parts.port, path)
_kwargs = {'timeout': (float(kwargs.get('timeout'))
if kwargs.get('timeout') else 600)}
if parts.scheme == 'https':
_class = VerifiedHTTPSConnection
_kwargs['ca_file'] = kwargs.get('ca_file', None)
_kwargs['cert_file'] = kwargs.get('cert_file', None)
_kwargs['key_file'] = kwargs.get('key_file', None)
_kwargs['insecure'] = kwargs.get('insecure', False)
elif parts.scheme == 'http':
_class = six.moves.http_client.HTTPConnection
else:
msg = 'Unsupported scheme: %s' % parts.scheme
raise exceptions.EndpointException(msg)
return (_class, _args, _kwargs)
def get_connection(self):
_class = self.connection_params[0]
try:
return _class(*self.connection_params[1][0:2],
**self.connection_params[2])
except six.moves.http_client.InvalidURL:
raise exceptions.EndpointException()
def log_curl_request(self, method, url, kwargs):
curl = ['curl -i -X %s' % method]
for (key, value) in kwargs['headers'].items():
header = '-H \'%s: %s\'' % (key, value)
curl.append(header)
conn_params_fmt = [
('key_file', '--key %s'),
('cert_file', '--cert %s'),
('ca_file', '--cacert %s'),
]
for (key, fmt) in conn_params_fmt:
value = self.connection_params[2].get(key)
if value:
curl.append(fmt % value)
if self.connection_params[2].get('insecure'):
curl.append('-k')
if 'body' in kwargs:
curl.append('-d \'%s\'' % kwargs['body'])
curl.append('%s/%s' % (self.endpoint, url.lstrip(API_VERSION)))
LOG.debug(' '.join(curl))
@staticmethod
def log_http_response(resp, body=None):
status = (resp.version / 10.0, resp.status, resp.reason)
dump = ['\nHTTP/%.1f %s %s' % status]
dump.extend(['%s: %s' % (k, v) for k, v in resp.getheaders()])
dump.append('')
if body:
dump.extend([body, ''])
LOG.debug('\n'.join(dump))
def _make_connection_url(self, url):
(_class, _args, _kwargs) = self.connection_params
base_url = _args[2]
return '%s/%s' % (base_url, url.lstrip('/'))
def _http_request(self, url, method, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
as setting headers and error handling.
"""
# Copy the kwargs so we can reuse the original in case of redirects
kwargs['headers'] = copy.deepcopy(kwargs.get('headers', {}))
kwargs['headers'].setdefault('User-Agent', USER_AGENT)
if self.auth_token:
kwargs['headers'].setdefault('X-Auth-Token', self.auth_token)
self.log_curl_request(method, url, kwargs)
conn = self.get_connection()
try:
conn_url = self._make_connection_url(url)
conn.request(method, conn_url, **kwargs)
resp = conn.getresponse()
except socket.gaierror as e:
message = ("Error finding address for %(url)s: %(e)s"
% dict(url=url, e=e))
raise exceptions.EndpointNotFound(message)
except (socket.error, socket.timeout) as e:
endpoint = self.endpoint
message = ("Error communicating with %(endpoint)s %(e)s"
% dict(endpoint=endpoint, e=e))
raise exceptions.ConnectionRefused(message)
body_iter = ResponseBodyIterator(resp)
# Read body into string if it isn't obviously image data
body_str = None
if resp.getheader('content-type', None) != 'application/octet-stream':
# decoding byte to string is necessary for Python 3.4 compatibility
# this issues has not been found with Python 3.4 unit tests
# because the test creates a fake http response of type str
# the if statement satisfies test (str) and real (bytes) behavior
body_list = [
chunk.decode("utf-8") if isinstance(chunk, bytes)
else chunk for chunk in body_iter
]
body_str = ''.join(body_list)
self.log_http_response(resp, body_str)
body_iter = six.StringIO(body_str)
else:
self.log_http_response(resp)
if 400 <= resp.status < 600:
LOG.warning("Request returned failure status.")
error_json = _extract_error_json(body_str)
raise exceptions.from_response(
resp, error_json.get('faultstring'),
error_json.get('debuginfo'), method, url)
elif resp.status in (301, 302, 305):
# Redirected. Reissue the request to the new location.
return self._http_request(resp['location'], method, **kwargs)
elif resp.status == 300:
raise exceptions.from_response(resp, method=method, url=url)
return resp, body_iter
def json_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type', 'application/json')
kwargs['headers'].setdefault('Accept', 'application/json')
if 'body' in kwargs:
kwargs['body'] = json.dumps(kwargs['body'])
resp, body_iter = self._http_request(url, method, **kwargs)
content_type = resp.getheader('content-type', None)
if resp.status == 204 or resp.status == 205 or content_type is None:
return resp, list()
if 'application/json' in content_type:
body = ''.join([chunk for chunk in body_iter])
try:
body = json.loads(body)
except ValueError:
LOG.error('Could not decode response body as JSON')
else:
body = None
return resp, body
def raw_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type',
'application/octet-stream')
return self._http_request(url, method, **kwargs)
class VerifiedHTTPSConnection(six.moves.http_client.HTTPSConnection):
"""httplib-compatibile connection using client-side SSL authentication
:see http://code.activestate.com/recipes/
577548-https-httplib-client-connection-with-certificate-v/
"""
def __init__(self, host, port, key_file=None, cert_file=None,
ca_file=None, timeout=None, insecure=False):
six.moves.http_client.HTTPSConnection.__init__(self, host, port,
key_file=key_file,
cert_file=cert_file)
self.key_file = key_file
self.cert_file = cert_file
if ca_file is not None:
self.ca_file = ca_file
else:
self.ca_file = self.get_system_ca_file()
self.timeout = timeout
self.insecure = insecure
def connect(self):
"""Connect to a host on a given (SSL) port.
If ca_file is pointing somewhere, use it to check Server Certificate.
Redefined/copied and extended from httplib.py:1105 (Python 2.6.x).
This is needed to pass cert_reqs=ssl.CERT_REQUIRED as parameter to
ssl.wrap_socket(), which forces SSL to check server certificate against
our client certificate.
"""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if self.insecure is True:
kwargs = {'cert_reqs': ssl.CERT_NONE}
else:
kwargs = {'cert_reqs': ssl.CERT_REQUIRED, 'ca_certs': self.ca_file}
if self.cert_file:
kwargs['certfile'] = self.cert_file
if self.key_file:
kwargs['keyfile'] = self.key_file
self.sock = ssl.wrap_socket(sock, **kwargs)
@staticmethod
def get_system_ca_file():
"""Return path to system default CA file."""
# Standard CA file locations for Debian/Ubuntu, RedHat/Fedora,
# Suse, FreeBSD/OpenBSD
ca_path = ['/etc/ssl/certs/ca-certificates.crt',
'/etc/pki/tls/certs/ca-bundle.crt',
'/etc/ssl/ca-bundle.pem',
'/etc/ssl/cert.pem']
for ca in ca_path:
if os.path.exists(ca):
return ca
return None
class SessionClient(adapter.LegacyJsonAdapter):
"""HTTP client based on Keystone client session."""
def __init__(self, user_agent=USER_AGENT, logger=LOG, *args, **kwargs):
super(SessionClient, self).__init__(*args, **kwargs)
def _http_request(self, url, method, **kwargs):
if url.startswith(API_VERSION):
url = url[len(API_VERSION):]
kwargs.setdefault('user_agent', self.user_agent)
kwargs.setdefault('auth', self.auth)
kwargs.setdefault('endpoint_override', self.endpoint_override)
endpoint_filter = kwargs.setdefault('endpoint_filter', {})
endpoint_filter.setdefault('interface', self.interface)
endpoint_filter.setdefault('service_type', self.service_type)
endpoint_filter.setdefault('region_name', self.region_name)
resp = self.session.request(url, method,
raise_exc=False, **kwargs)
if 400 <= resp.status_code < 600:
error_json = _extract_error_json(resp.content)
raise exceptions.from_response(
resp, error_json.get('faultstring'),
error_json.get('debuginfo'), method, url)
elif resp.status_code in (301, 302, 305):
# Redirected. Reissue the request to the new location.
location = resp.headers.get('location')
resp = self._http_request(location, method, **kwargs)
elif resp.status_code == 300:
raise exceptions.from_response(resp, method=method, url=url)
return resp
def json_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type', 'application/json')
kwargs['headers'].setdefault('Accept', 'application/json')
if 'body' in kwargs:
kwargs['data'] = json.dumps(kwargs.pop('body'))
resp = self._http_request(url, method, **kwargs)
body = resp.content
content_type = resp.headers.get('content-type', None)
status = resp.status_code
if status == 204 or status == 205 or content_type is None:
return resp, list()
if 'application/json' in content_type:
try:
body = resp.json()
except ValueError:
LOG.error('Could not decode response body as JSON')
else:
body = None
return resp, body
def raw_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type',
'application/octet-stream')
return self._http_request(url, method, **kwargs)
class ResponseBodyIterator(object):
"""A class that acts as an iterator over an HTTP response."""
def __init__(self, resp):
self.resp = resp
def __iter__(self):
while True:
yield self.next()
def next(self):
chunk = self.resp.read(CHUNKSIZE)
if chunk:
return chunk
else:
raise StopIteration()
def _construct_http_client(*args, **kwargs):
session = kwargs.pop('session', None)
auth = kwargs.pop('auth', None)
if session:
service_type = kwargs.pop('service_type', 'baremetal')
interface = kwargs.pop('endpoint_type', None)
region_name = kwargs.pop('region_name', None)
return SessionClient(session=session,
auth=auth,
interface=interface,
service_type=service_type,
region_name=region_name,
service_name=None,
user_agent='python-zunclient')
else:
return HTTPClient(*args, **kwargs)

113
zunclient/common/utils.py Normal file
View File

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from zunclient import exceptions as exc
from zunclient.i18n import _
def common_filters(marker=None, limit=None, sort_key=None, sort_dir=None):
"""Generate common filters for any list request.
:param marker: entity ID from which to start returning entities.
:param limit: maximum number of entities to return.
:param sort_key: field to use for sorting.
:param sort_dir: direction of sorting: 'asc' or 'desc'.
:returns: list of string filters.
"""
filters = []
if isinstance(limit, int):
filters.append('limit=%s' % limit)
if marker is not None:
filters.append('marker=%s' % marker)
if sort_key is not None:
filters.append('sort_key=%s' % sort_key)
if sort_dir is not None:
filters.append('sort_dir=%s' % sort_dir)
return filters
def split_and_deserialize(string):
"""Split and try to JSON deserialize a string.
Gets a string with the KEY=VALUE format, split it (using '=' as the
separator) and try to JSON deserialize the VALUE.
:returns: A tuple of (key, value).
"""
try:
key, value = string.split("=", 1)
except ValueError:
raise exc.CommandError(_('Attributes must be a list of '
'PATH=VALUE not "%s"') % string)
try:
value = json.loads(value)
except ValueError:
pass
return (key, value)
def args_array_to_patch(op, attributes):
patch = []
for attr in attributes:
# Sanitize
if not attr.startswith('/'):
attr = '/' + attr
if op in ['add', 'replace']:
path, value = split_and_deserialize(attr)
patch.append({'op': op, 'path': path, 'value': value})
elif op == "remove":
# For remove only the key is needed
patch.append({'op': op, 'path': attr})
else:
raise exc.CommandError(_('Unknown PATCH operation: %s') % op)
return patch
def format_labels(lbls, parse_comma=True):
'''Reformat labels into dict of format expected by the API.'''
if not lbls:
return {}
if parse_comma:
# expect multiple invocations of --labels but fall back
# to either , or ; delimited if only one --labels is specified
if len(lbls) == 1:
lbls = lbls[0].replace(';', ',').split(',')
labels = {}
for l in lbls:
try:
(k, v) = l.split(('='), 1)
except ValueError:
raise exc.CommandError(_('labels must be a list of KEY=VALUE '
'not %s') % l)
if k not in labels:
labels[k] = v
else:
if not isinstance(labels[k], list):
labels[k] = [labels[k]]
labels[k].append(v)
return labels
def print_list_field(field):
return lambda obj: ', '.join(getattr(obj, field))

74
zunclient/exceptions.py Normal file
View File

@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zunclient.common.apiclient import exceptions
from zunclient.common.apiclient.exceptions import * # noqa
# NOTE(akurilin): This alias is left here since v.0.1.3 to support backwards
# compatibility.
InvalidEndpoint = EndpointException
CommunicationError = ConnectionRefused
HTTPBadRequest = BadRequest
HTTPInternalServerError = InternalServerError
HTTPNotFound = NotFound
HTTPServiceUnavailable = ServiceUnavailable
class AmbiguousAuthSystem(ClientException):
"""Could not obtain token and endpoint using provided credentials."""
pass
# Alias for backwards compatibility
AmbigiousAuthSystem = AmbiguousAuthSystem
class InvalidAttribute(ClientException):
pass
def from_response(response, message=None, traceback=None, method=None,
url=None):
"""Return an HttpError instance based on response from httplib/requests."""
error_body = {}
if message:
error_body['message'] = message
if traceback:
error_body['details'] = traceback
if hasattr(response, 'status') and not hasattr(response, 'status_code'):
# NOTE(akurilin): These modifications around response object give
# ability to get all necessary information in method `from_response`
# from common code, which expecting response object from `requests`
# library instead of object from `httplib/httplib2` library.
response.status_code = response.status
response.headers = {
'Content-Type': response.getheader('content-type', "")}
if hasattr(response, 'status_code'):
# NOTE(hongbin): This allows SessionClient to handle faultstring.
response.json = lambda: {'error': error_body}
if (response.headers.get('Content-Type', '').startswith('text/') and
not hasattr(response, 'text')):
# NOTE(clif_h): There seems to be a case in the
# common.apiclient.exceptions module where if the
# content-type of the response is text/* then it expects
# the response to have a 'text' attribute, but that
# doesn't always seem to necessarily be the case.
# This is to work around that problem.
response.text = ''
return exceptions.from_response(response, method, url)

35
zunclient/i18n.py Normal file
View File

@ -0,0 +1,35 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo_i18n integration module for zunclient.
See http://docs.openstack.org/developer/oslo.i18n/usage.html .
"""
import oslo_i18n
_translators = oslo_i18n.TranslatorFactory(domain='zunclient')
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical

636
zunclient/shell.py Normal file
View File

@ -0,0 +1,636 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
# This code is taken from python-novaclient. Goal is minimal modification.
###
"""
Command-line interface to the OpenStack Zun API.
"""
from __future__ import print_function
import argparse
import getpass
import logging
import os
import sys
from oslo_utils import encodeutils
from oslo_utils import strutils
import six
HAS_KEYRING = False
all_errors = ValueError
try:
import keyring
HAS_KEYRING = True
try:
if isinstance(keyring.get_keyring(), keyring.backend.GnomeKeyring):
import gnomekeyring
all_errors = (ValueError,
gnomekeyring.IOError,
gnomekeyring.NoKeyringDaemonError)
except Exception:
pass
except ImportError:
pass
from zunclient.common.apiclient import auth
from zunclient.common import cliutils
from zunclient import exceptions as exc
from zunclient.v1 import client as client_v1
from zunclient.v1 import shell as shell_v1
from zunclient import version
DEFAULT_API_VERSION = '1'
DEFAULT_ENDPOINT_TYPE = 'publicURL'
DEFAULT_SERVICE_TYPE = 'container'
logger = logging.getLogger(__name__)
def positive_non_zero_float(text):
if text is None:
return None
try:
value = float(text)
except ValueError:
msg = "%s must be a float" % text
raise argparse.ArgumentTypeError(msg)
if value <= 0:
msg = "%s must be greater than 0" % text
raise argparse.ArgumentTypeError(msg)
return value
class SecretsHelper(object):
def __init__(self, args, client):
self.args = args
self.client = client
self.key = None
def _validate_string(self, text):
if text is None or len(text) == 0:
return False
return True
def _make_key(self):
if self.key is not None:
return self.key
keys = [
self.client.auth_url,
self.client.projectid,
self.client.user,
self.client.region_name,
self.client.endpoint_type,
self.client.service_type,
self.client.service_name,
self.client.volume_service_name,
]
for (index, key) in enumerate(keys):
if key is None:
keys[index] = '?'
else:
keys[index] = str(keys[index])
self.key = "/".join(keys)
return self.key
def _prompt_password(self, verify=True):
pw = None
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
# Check for Ctl-D
try:
while True:
pw1 = getpass.getpass('OS Password: ')
if verify:
pw2 = getpass.getpass('Please verify: ')
else:
pw2 = pw1
if pw1 == pw2 and self._validate_string(pw1):
pw = pw1
break
except EOFError:
pass
return pw
def save(self, auth_token, management_url, tenant_id):
if not HAS_KEYRING or not self.args.os_cache:
return
if (auth_token == self.auth_token and
management_url == self.management_url):
# Nothing changed....
return
if not all([management_url, auth_token, tenant_id]):
raise ValueError("Unable to save empty management url/auth token")
value = "|".join([str(auth_token),
str(management_url),
str(tenant_id)])
keyring.set_password("zunclient_auth", self._make_key(), value)
@property
def password(self):
if self._validate_string(self.args.os_password):
return self.args.os_password
verify_pass = (
strutils.bool_from_string(cliutils.env("OS_VERIFY_PASSWORD"))
)
return self._prompt_password(verify_pass)
@property
def management_url(self):
if not HAS_KEYRING or not self.args.os_cache:
return None
management_url = None
try:
block = keyring.get_password('zunclient_auth',
self._make_key())
if block:
_token, management_url, _tenant_id = block.split('|', 2)
except all_errors:
pass
return management_url
@property
def auth_token(self):
# Now is where it gets complicated since we
# want to look into the keyring module, if it
# exists and see if anything was provided in that
# file that we can use.
if not HAS_KEYRING or not self.args.os_cache:
return None
token = None
try:
block = keyring.get_password('zunclient_auth',
self._make_key())
if block:
token, _management_url, _tenant_id = block.split('|', 2)
except all_errors:
pass
return token
@property
def tenant_id(self):
if not HAS_KEYRING or not self.args.os_cache:
return None
tenant_id = None
try:
block = keyring.get_password('zunclient_auth',
self._make_key())
if block:
_token, _management_url, tenant_id = block.split('|', 2)
except all_errors:
pass
return tenant_id
class ZunClientArgumentParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super(ZunClientArgumentParser, self).__init__(*args, **kwargs)
def error(self, message):
"""error(message: string)
Prints a usage message incorporating the message to stderr and
exits.
"""
self.print_usage(sys.stderr)
# FIXME(lzyeval): if changes occur in argparse.ArgParser._check_value
choose_from = ' (choose from'
progparts = self.prog.partition(' ')
self.exit(2, "error: %(errmsg)s\nTry '%(mainp)s help %(subp)s'"
" for more information.\n" %
{'errmsg': message.split(choose_from)[0],
'mainp': progparts[0],
'subp': progparts[2]})
class OpenStackZunShell(object):
def get_base_parser(self):
parser = ZunClientArgumentParser(
prog='zun',
description=__doc__.strip(),
epilog='See "zun help COMMAND" '
'for help on a specific command.',
add_help=False,
formatter_class=OpenStackHelpFormatter,
)
# Global arguments
parser.add_argument('-h', '--help',
action='store_true',
help=argparse.SUPPRESS)
parser.add_argument('--version',
action='version',
version=version.version_info.version_string())
parser.add_argument('--debug',
default=False,
action='store_true',
help="Print debugging output.")
parser.add_argument('--os-cache',
default=strutils.bool_from_string(
cliutils.env('OS_CACHE', default=False)),
action='store_true',
help="Use the auth token cache. Defaults to False "
"if env[OS_CACHE] is not set.")
parser.add_argument('--os-region-name',
metavar='<region-name>',
default=os.environ.get('OS_REGION_NAME'),
help='Region name. Default=env[OS_REGION_NAME].')
# TODO(mattf) - add get_timings support to Client
# parser.add_argument('--timings',
# default=False,
# action='store_true',
# help="Print call timing info")
# TODO(mattf) - use timeout
# parser.add_argument('--timeout',
# default=600,
# metavar='<seconds>',
# type=positive_non_zero_float,
# help="Set HTTP call timeout (in seconds)")
parser.add_argument('--os-tenant-id',
metavar='<auth-tenant-id>',
default=cliutils.env('OS_TENANT_ID'),
help='Defaults to env[OS_TENANT_ID].')
parser.add_argument('--os-user-domain-id',
metavar='<auth-user-domain-id>',
default=cliutils.env('OS_USER_DOMAIN_ID'),
help='Defaults to env[OS_USER_DOMAIN_ID].')
parser.add_argument('--os-user-domain-name',
metavar='<auth-user-domain-name>',
default=cliutils.env('OS_USER_DOMAIN_NAME'),
help='Defaults to env[OS_USER_DOMAIN_NAME].')
parser.add_argument('--os-project-domain-id',
metavar='<auth-project-domain-id>',
default=cliutils.env('OS_PROJECT_DOMAIN_ID'),
help='Defaults to env[OS_PROJECT_DOMAIN_ID].')
parser.add_argument('--os-project-domain-name',
metavar='<auth-project-domain-name>',
default=cliutils.env('OS_PROJECT_DOMAIN_NAME'),
help='Defaults to env[OS_PROJECT_DOMAIN_NAME].')
parser.add_argument('--service-type',
metavar='<service-type>',
help='Defaults to container for all '
'actions.')
parser.add_argument('--service_type',
help=argparse.SUPPRESS)
parser.add_argument('--endpoint-type',
metavar='<endpoint-type>',
default=cliutils.env(
'OS_ENDPOINT_TYPE',
default=DEFAULT_ENDPOINT_TYPE),
help='Defaults to env[OS_ENDPOINT_TYPE] or '
+ DEFAULT_ENDPOINT_TYPE + '.')
# NOTE(dtroyer): We can't add --endpoint_type here due to argparse
# thinking usage-list --end is ambiguous; but it
# works fine with only --endpoint-type present
# Go figure. I'm leaving this here for doc purposes.
# parser.add_argument('--endpoint_type',
# help=argparse.SUPPRESS)
parser.add_argument('--zun-api-version',
metavar='<zun-api-ver>',
default=cliutils.env(
'MAGNUM_API_VERSION',
default=DEFAULT_API_VERSION),
help='Accepts "api", '
'defaults to env[MAGNUM_API_VERSION].')
parser.add_argument('--zun_api_version',
help=argparse.SUPPRESS)
parser.add_argument('--os-cacert',
metavar='<ca-certificate>',
default=cliutils.env('OS_CACERT', default=None),
help='Specify a CA bundle file to use in '
'verifying a TLS (https) server certificate. '
'Defaults to env[OS_CACERT].')
parser.add_argument('--bypass-url',
metavar='<bypass-url>',
default=cliutils.env('BYPASS_URL', default=None),
dest='bypass_url',
help="Use this API endpoint instead of the "
"Service Catalog.")
parser.add_argument('--bypass_url',
help=argparse.SUPPRESS)
parser.add_argument('--insecure',
default=cliutils.env('MAGNUMCLIENT_INSECURE',
default=False),
action='store_true',
help="Do not verify https connections")
# The auth-system-plugins might require some extra options
auth.load_auth_system_opts(parser)
return parser
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
try:
actions_modules = {
'1': shell_v1.COMMAND_MODULES,
}[version]
except KeyError:
actions_modules = shell_v1.COMMAND_MODULES
for actions_module in actions_modules:
self._find_actions(subparsers, actions_module)
self._find_actions(subparsers, self)
self._add_bash_completion_subparser(subparsers)
return parser
def _add_bash_completion_subparser(self, subparsers):
subparser = (
subparsers.add_parser('bash_completion',
add_help=False,
formatter_class=OpenStackHelpFormatter)
)
self.subcommands['bash_completion'] = subparser
subparser.set_defaults(func=self.do_bash_completion)
def _find_actions(self, subparsers, actions_module):
for attr in (a for a in dir(actions_module) if a.startswith('do_')):
# I prefer to be hyphen-separated instead of underscores.
command = attr[3:].replace('_', '-')
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ''
action_help = desc.strip()
arguments = getattr(callback, 'arguments', [])
subparser = (
subparsers.add_parser(command,
help=action_help,
description=desc,
add_help=False,
formatter_class=OpenStackHelpFormatter)
)
subparser.add_argument('-h', '--help',
action='help',
help=argparse.SUPPRESS,)
self.subcommands[command] = subparser
for (args, kwargs) in arguments:
subparser.add_argument(*args, **kwargs)
subparser.set_defaults(func=callback)
def setup_debugging(self, debug):
if debug:
streamformat = "%(levelname)s (%(module)s:%(lineno)d) %(message)s"
# Set up the root logger to debug so that the submodules can
# print debug messages
logging.basicConfig(level=logging.DEBUG,
format=streamformat)
else:
streamformat = "%(levelname)s %(message)s"
logging.basicConfig(level=logging.CRITICAL,
format=streamformat)
def main(self, argv):
# NOTE(Christoph Jansen): With Python 3.4 argv somehow becomes a Map.
# This hack fixes it.
argv = list(argv)
# Parse args once to find version and debug settings
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
self.setup_debugging(options.debug)
# NOTE(dtroyer): Hackery to handle --endpoint_type due to argparse
# thinking usage-list --end is ambiguous; but it
# works fine with only --endpoint-type present
# Go figure.
if '--endpoint_type' in argv:
spot = argv.index('--endpoint_type')
argv[spot] = '--endpoint-type'
subcommand_parser = (
self.get_subcommand_parser(options.zun_api_version)
)
self.parser = subcommand_parser
if options.help or not argv:
subcommand_parser.print_help()
return 0
args = subcommand_parser.parse_args(argv)
# Short-circuit and deal with help right away.
# NOTE(jamespage): args.func is not guaranteed with python >= 3.4
if not hasattr(args, 'func') or args.func == self.do_help:
self.do_help(args)
return 0
elif args.func == self.do_bash_completion:
self.do_bash_completion(args)
return 0
(os_username, os_tenant_name, os_tenant_id,
os_user_domain_id, os_user_domain_name,
os_project_domain_id, os_project_domain_name,
os_auth_url, os_auth_system, endpoint_type,
service_type, bypass_url, insecure) = (
(args.os_username, args.os_tenant_name, args.os_tenant_id,
args.os_user_domain_id, args.os_user_domain_name,
args.os_project_domain_id, args.os_project_domain_name,
args.os_auth_url, args.os_auth_system, args.endpoint_type,
args.service_type, args.bypass_url, args.insecure)
)
if os_auth_system and os_auth_system != "keystone":
auth_plugin = auth.load_plugin(os_auth_system)
else:
auth_plugin = None
# Fetched and set later as needed
os_password = None
if not endpoint_type:
endpoint_type = DEFAULT_ENDPOINT_TYPE
if not service_type:
service_type = DEFAULT_SERVICE_TYPE
# NA - there is only one service this CLI accesses
# service_type = utils.get_service_type(args.func) or service_type
# FIXME(usrleon): Here should be restrict for project id same as
# for os_username or os_password but for compatibility it is not.
if not cliutils.isunauthenticated(args.func):
if auth_plugin:
auth_plugin.parse_opts(args)
if not auth_plugin or not auth_plugin.opts:
if not os_username:
raise exc.CommandError("You must provide a username "
"via either --os-username or "
"env[OS_USERNAME]")
if not os_tenant_name and not os_tenant_id:
raise exc.CommandError("You must provide a tenant name "
"or tenant id via --os-tenant-name, "
"--os-tenant-id, env[OS_TENANT_NAME] "
"or env[OS_TENANT_ID]")
if not os_auth_url:
if os_auth_system and os_auth_system != 'keystone':
os_auth_url = auth_plugin.get_auth_url()
if not os_auth_url:
raise exc.CommandError("You must provide an auth url "
"via either --os-auth-url or "
"env[OS_AUTH_URL] or specify an "
"auth_system which defines a "
"default url with --os-auth-system "
"or env[OS_AUTH_SYSTEM]")
# NOTE: The Zun client authenticates when you create it. So instead of
# creating here and authenticating later, which is what the novaclient
# does, we just create the client later.
# Now check for the password/token of which pieces of the
# identifying keyring key can come from the underlying client
if not cliutils.isunauthenticated(args.func):
# NA - Client can't be used with SecretsHelper
if (auth_plugin and auth_plugin.opts and
"os_password" not in auth_plugin.opts):
use_pw = False
else:
use_pw = True
if use_pw:
# Auth using token must have failed or not happened
# at all, so now switch to password mode and save
# the token when its gotten... using our keyring
# saver
os_password = args.os_password
if not os_password:
raise exc.CommandError(
'Expecting a password provided via either '
'--os-password, env[OS_PASSWORD], or '
'prompted response')
try:
client = {
'1': client_v1,
}[options.zun_api_version]
except KeyError:
client = client_v1
self.cs = client.Client(username=os_username,
api_key=os_password,
project_id=os_tenant_id,
project_name=os_tenant_name,
user_domain_id=os_user_domain_id,
user_domain_name=os_user_domain_name,
project_domain_id=os_project_domain_id,
project_domain_name=os_project_domain_name,
auth_url=os_auth_url,
service_type=service_type,
region_name=args.os_region_name,
zun_url=bypass_url,
endpoint_type=endpoint_type,
insecure=insecure)
args.func(self.cs, args)
def _dump_timings(self, timings):
class Tyme(object):
def __init__(self, url, seconds):
self.url = url
self.seconds = seconds
results = [Tyme(url, end - start) for url, start, end in timings]
total = 0.0
for tyme in results:
total += tyme.seconds
results.append(Tyme("Total", total))
cliutils.print_list(results, ["url", "seconds"], sortby_index=None)
def do_bash_completion(self, _args):
"""Prints arguments for bash-completion.
Prints all of the commands and options to stdout so that the
zun.bash_completion script doesn't have to hard code them.
"""
commands = set()
options = set()
for sc_str, sc in self.subcommands.items():
commands.add(sc_str)
for option in sc._optionals._option_string_actions.keys():
options.add(option)
commands.remove('bash-completion')
commands.remove('bash_completion')
print(' '.join(commands | options))
@cliutils.arg('command', metavar='<subcommand>', nargs='?',
help='Display help for <subcommand>.')
def do_help(self, args):
"""Display help about this program or one of its subcommands."""
# NOTE(jamespage): args.command is not guaranteed with python >= 3.4
command = getattr(args, 'command', '')
if command:
if args.command in self.subcommands:
self.subcommands[args.command].print_help()
else:
raise exc.CommandError("'%s' is not a valid subcommand" %
args.command)
else:
self.parser.print_help()
# I'm picky about my shell help.
class OpenStackHelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
super(OpenStackHelpFormatter, self).start_section(heading)
def main():
try:
OpenStackZunShell().main(map(encodeutils.safe_decode, sys.argv[1:]))
except Exception as e:
logger.debug(e, exc_info=1)
print("ERROR: %s" % encodeutils.safe_encode(six.text_type(e)),
file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -15,9 +15,39 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
import os
import fixtures
import testtools
_TRUE_VALUES = ('true', '1', 'yes')
class TestCase(base.BaseTestCase):
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())

View File

@ -0,0 +1,38 @@
# Copyright (c) 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from zunclient import client
class ClientTest(testtools.TestCase):
@mock.patch('zunclient.v1.client.Client')
def test_no_version_argument(self, mock_zun_client):
client.Client(input_auth_token='mytoken', zun_url='http://myurl/')
mock_zun_client.assert_called_with(
input_auth_token='mytoken', zun_url='http://myurl/')
@mock.patch('zunclient.v1.client.Client')
def test_valid_version_argument(self, mock_zun_client):
client.Client(version='1', zun_url='http://myurl/')
mock_zun_client.assert_called_with(zun_url='http://myurl/')
@mock.patch('zunclient.v1.client.Client')
def test_invalid_version_argument(self, mock_zun_client):
self.assertRaises(
ValueError,
client.Client, version='2', zun_url='http://myurl/')

View File

@ -0,0 +1,299 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import six
from zunclient.common.apiclient.exceptions import GatewayTimeout
from zunclient.common import httpclient as http
from zunclient import exceptions as exc
from zunclient.tests import utils
def _get_error_body(faultstring=None, debuginfo=None):
error_body = {
'faultstring': faultstring,
'debuginfo': debuginfo
}
raw_error_body = json.dumps(error_body)
body = {'error_message': raw_error_body}
raw_body = json.dumps(body)
return raw_body
HTTP_CLASS = six.moves.http_client.HTTPConnection
HTTPS_CLASS = http.VerifiedHTTPSConnection
DEFAULT_TIMEOUT = 600
class HttpClientTest(utils.BaseTestCase):
def test_url_generation_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/resources')
self.assertEqual('/v1/resources', url)
def test_url_generation_without_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('/v1/resources')
self.assertEqual('/v1/resources', url)
def test_url_generation_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/resources')
self.assertEqual('/v1/resources', url)
def test_url_generation_without_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('v1/resources')
self.assertEqual('/v1/resources', url)
def test_server_exception_empty_body(self):
error_body = _get_error_body()
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient('http://localhost/')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/resources')
self.assertEqual('Internal Server Error (HTTP 500)', str(error))
def test_server_exception_msg_only(self):
error_msg = 'test error msg'
error_body = _get_error_body(error_msg)
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient('http://localhost/')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/resources')
self.assertEqual(error_msg + ' (HTTP 500)', str(error))
def test_server_exception_msg_and_traceback(self):
error_msg = 'another test error'
error_trace = ("\"Traceback (most recent call last):\\n\\n "
"File \\\"/usr/local/lib/python2.7/...")
error_body = _get_error_body(error_msg, error_trace)
fake_resp = utils.FakeResponse({'content-type': 'application/json'},
six.StringIO(error_body),
version=1,
status=500)
client = http.HTTPClient('http://localhost/')
client.get_connection = (
lambda *a, **kw: utils.FakeConnection(fake_resp))
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/resources')
self.assertEqual(
'%(error)s (HTTP 500)\n%(trace)s' % {'error': error_msg,
'trace': error_trace},
"%(error)s\n%(details)s" % {'error': str(error),
'details': str(error.details)})
def test_get_connection_params(self):
endpoint = 'http://zun-host:6385'
expected = (HTTP_CLASS,
('zun-host', 6385, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_trailing_slash(self):
endpoint = 'http://zun-host:6385/'
expected = (HTTP_CLASS,
('zun-host', 6385, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_ssl(self):
endpoint = 'https://zun-host:6385'
expected = (HTTPS_CLASS,
('zun-host', 6385, ''),
{
'timeout': DEFAULT_TIMEOUT,
'ca_file': None,
'cert_file': None,
'key_file': None,
'insecure': False,
})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_ssl_params(self):
endpoint = 'https://zun-host:6385'
ssl_args = {
'ca_file': '/path/to/ca_file',
'cert_file': '/path/to/cert_file',
'key_file': '/path/to/key_file',
'insecure': True,
}
expected_kwargs = {'timeout': DEFAULT_TIMEOUT}
expected_kwargs.update(ssl_args)
expected = (HTTPS_CLASS,
('zun-host', 6385, ''),
expected_kwargs)
params = http.HTTPClient.get_connection_params(endpoint, **ssl_args)
self.assertEqual(expected, params)
def test_get_connection_params_with_timeout(self):
endpoint = 'http://zun-host:6385'
expected = (HTTP_CLASS,
('zun-host', 6385, ''),
{'timeout': 300.0})
params = http.HTTPClient.get_connection_params(endpoint, timeout=300)
self.assertEqual(expected, params)
def test_get_connection_params_with_version(self):
endpoint = 'http://zun-host:6385/v1'
expected = (HTTP_CLASS,
('zun-host', 6385, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_version_trailing_slash(self):
endpoint = 'http://zun-host:6385/v1/'
expected = (HTTP_CLASS,
('zun-host', 6385, ''),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath(self):
endpoint = 'http://zun-host:6385/zun'
expected = (HTTP_CLASS,
('zun-host', 6385, '/zun'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_trailing_slash(self):
endpoint = 'http://zun-host:6385/zun/'
expected = (HTTP_CLASS,
('zun-host', 6385, '/zun'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_version(self):
endpoint = 'http://zun-host:6385/zun/v1'
expected = (HTTP_CLASS,
('zun-host', 6385, '/zun'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_get_connection_params_with_subpath_version_trailing_slash(self):
endpoint = 'http://zun-host:6385/zun/v1/'
expected = (HTTP_CLASS,
('zun-host', 6385, '/zun'),
{'timeout': DEFAULT_TIMEOUT})
params = http.HTTPClient.get_connection_params(endpoint)
self.assertEqual(expected, params)
def test_401_unauthorized_exception(self):
error_body = _get_error_body()
fake_resp = utils.FakeResponse({'content-type': 'text/plain'},
six.StringIO(error_body),
version=1,
status=401)
client = http.HTTPClient('http://localhost/')
client.get_connection = (lambda *a,
**kw: utils.FakeConnection(fake_resp))
self.assertRaises(exc.Unauthorized, client.json_request,
'GET', '/v1/resources')
class SessionClientTest(utils.BaseTestCase):
def test_server_exception_msg_and_traceback(self):
error_msg = 'another test error'
error_trace = ("\"Traceback (most recent call last):\\n\\n "
"File \\\"/usr/local/lib/python2.7/...")
error_body = _get_error_body(error_msg, error_trace)
fake_session = utils.FakeSession({'Content-Type': 'application/json'},
error_body,
500)
client = http.SessionClient(session=fake_session)
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/resources')
self.assertEqual(
'%(error)s (HTTP 500)\n%(trace)s' % {'error': error_msg,
'trace': error_trace},
"%(error)s\n%(details)s" % {'error': str(error),
'details': str(error.details)})
def test_server_exception_empty_body(self):
error_body = _get_error_body()
fake_session = utils.FakeSession({'Content-Type': 'application/json'},
error_body,
500)
client = http.SessionClient(session=fake_session)
error = self.assertRaises(exc.InternalServerError,
client.json_request,
'GET', '/v1/resources')
self.assertEqual('Internal Server Error (HTTP 500)', str(error))
def test_bypass_url(self):
fake_response = utils.FakeSessionResponse(
{}, content="", status_code=201)
fake_session = mock.MagicMock()
fake_session.request.side_effect = [fake_response]
client = http.SessionClient(
session=fake_session, endpoint_override='http://zun')
client.json_request('GET', '/v1/services')
self.assertEqual(
fake_session.request.call_args[1]['endpoint_override'],
'http://zun'
)
def test_exception(self):
fake_response = utils.FakeSessionResponse(
{}, content="", status_code=504)
fake_session = mock.MagicMock()
fake_session.request.side_effect = [fake_response]
client = http.SessionClient(
session=fake_session, endpoint_override='http://zun')
self.assertRaises(GatewayTimeout,
client.json_request,
'GET', '/v1/resources')

View File

@ -0,0 +1,314 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import sys
import fixtures
from keystoneauth1 import fixture
import mock
import six
from testtools import matchers
from zunclient import exceptions
import zunclient.shell
from zunclient.tests import utils
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_NAME': 'tenant_name',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV2 = {'OS_USER_ID': 'user_id',
'OS_PASSWORD': 'password',
'OS_TENANT_ID': 'tenant_id',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV3 = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_ID': 'tenant_id',
'OS_AUTH_URL': 'http://no.where/v2.0'}
FAKE_ENV4 = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_ID': 'tenant_id',
'OS_USER_DOMAIN_NAME': 'Default',
'OS_PROJECT_DOMAIN_NAME': 'Default',
'OS_AUTH_URL': 'http://no.where/v3'}
def _create_ver_list(versions):
return {'versions': {'values': versions}}
class ParserTest(utils.TestCase):
def setUp(self):
super(ParserTest, self).setUp()
self.parser = zunclient.shell.ZunClientArgumentParser()
def test_ambiguous_option(self):
self.parser.add_argument('--tic')
self.parser.add_argument('--tac')
try:
self.parser.parse_args(['--t'])
except SystemExit as err:
self.assertEqual(2, err.code)
else:
self.fail('SystemExit not raised')
class ShellTest(utils.TestCase):
AUTH_URL = utils.FAKE_ENV['OS_AUTH_URL']
_msg_no_tenant_project = ("You must provide a tenant name or tenant id"
" via --os-tenant-name, --os-tenant-id,"
" env[OS_TENANT_NAME] or env[OS_TENANT_ID]")
def setUp(self):
super(ShellTest, self).setUp()
self.nc_util = mock.patch(
'zunclient.common.cliutils.isunauthenticated').start()
self.nc_util.return_value = False
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, self.shell, 'help foofoo')
def test_help(self):
required = [
'.*?^usage: ',
'.*?^\s+container-stop\s+Stop specified container.',
'.*?^See "zun help COMMAND" for help on a specific command',
]
stdout, stderr = self.shell('help')
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_help_on_subcommand(self):
required = [
'.*?^usage: zun container-create',
'.*?^Create a container.',
'.*?^Optional arguments:',
]
stdout, stderr = self.shell('help container-create')
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_help_no_options(self):
required = [
'.*?^usage: ',
'.*?^\s+container-stop\s+Stop specified container.',
'.*?^See "zun help COMMAND" for help on a specific command',
]
stdout, stderr = self.shell('')
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_bash_completion(self):
stdout, stderr = self.shell('bash-completion')
# just check we have some output
required = [
'.*--json',
'.*help',
'.*container-show',
'.*--container']
for r in required:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_no_username(self):
required = ('You must provide a username via either'
' --os-username or env[OS_USERNAME]')
self.make_env(exclude='OS_USERNAME')
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args[0])
else:
self.fail('CommandError not raised')
def test_no_user_id(self):
required = ('You must provide a username via'
' either --os-username or env[OS_USERNAME]')
self.make_env(exclude='OS_USER_ID', fake_env=FAKE_ENV2)
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args[0])
else:
self.fail('CommandError not raised')
def test_no_tenant_name(self):
required = self._msg_no_tenant_project
self.make_env(exclude='OS_TENANT_NAME')
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args[0])
else:
self.fail('CommandError not raised')
def test_no_tenant_id(self):
required = self._msg_no_tenant_project
self.make_env(exclude='OS_TENANT_ID', fake_env=FAKE_ENV3)
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args[0])
else:
self.fail('CommandError not raised')
def test_no_auth_url(self):
required = ('You must provide an auth url'
' via either --os-auth-url or env[OS_AUTH_URL] or'
' specify an auth_system which defines a default url'
' with --os-auth-system or env[OS_AUTH_SYSTEM]',)
self.make_env(exclude='OS_AUTH_URL')
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args)
else:
self.fail('CommandError not raised')
# FIXME(madhuri) Remove this harcoded v1 Client class.
# In future, when a new version of API will
# introduce, this needs to be dynamic then.
@mock.patch('zunclient.v1.client.Client')
def test_service_type(self, mock_client):
self.make_env()
self.shell('service-list')
_, client_kwargs = mock_client.call_args_list[0]
self.assertEqual('container', client_kwargs['service_type'])
@mock.patch('zunclient.v1.services_shell.do_services_list')
@mock.patch('zunclient.v1.client.ksa_session')
def test_insecure(self, mock_session, mock_services_list):
self.make_env()
self.shell('--insecure service-list')
_, session_kwargs = mock_session.Session.call_args_list[0]
self.assertEqual(False, session_kwargs['verify'])
@mock.patch('sys.stdin', side_effect=mock.MagicMock)
@mock.patch('getpass.getpass', side_effect=EOFError)
def test_no_password(self, mock_getpass, mock_stdin):
required = ('Expecting a password provided'
' via either --os-password, env[OS_PASSWORD],'
' or prompted response',)
self.make_env(exclude='OS_PASSWORD')
try:
self.shell('service-list')
except exceptions.CommandError as message:
self.assertEqual(required, message.args)
else:
self.fail('CommandError not raised')
@mock.patch('sys.argv', ['zun'])
@mock.patch('sys.stdout', six.StringIO())
@mock.patch('sys.stderr', six.StringIO())
def test_main_noargs(self):
# Ensure that main works with no command-line arguments
try:
zunclient.shell.main()
except SystemExit:
self.fail('Unexpected SystemExit')
# We expect the normal usage as a result
self.assertIn('Command-line interface to the OpenStack Zun API',
sys.stdout.getvalue())
@mock.patch('zunclient.v1.client.Client')
def _test_main_region(self, command, expected_region_name, mock_client):
self.shell(command)
mock_client.assert_called_once_with(
username='username', api_key='password',
endpoint_type='publicURL', project_id='',
project_name='tenant_name', auth_url=self.AUTH_URL,
service_type='container', region_name=expected_region_name,
project_domain_id='', project_domain_name='',
user_domain_id='', user_domain_name='',
zun_url=None, insecure=False)
def test_main_option_region(self):
self.make_env()
self._test_main_region('--os-region-name=myregion service-list',
'myregion')
def test_main_env_region(self):
fake_env = dict(utils.FAKE_ENV, OS_REGION_NAME='myregion')
self.make_env(fake_env=fake_env)
self._test_main_region('service-list', 'myregion')
def test_main_no_region(self):
self.make_env()
self._test_main_region('service-list', None)
@mock.patch('zunclient.v1.client.Client')
def test_main_endpoint_public(self, mock_client):
self.make_env()
self.shell('--endpoint-type publicURL service-list')
mock_client.assert_called_once_with(
username='username', api_key='password',
endpoint_type='publicURL', project_id='',
project_name='tenant_name', auth_url=self.AUTH_URL,
service_type='container', region_name=None,
project_domain_id='', project_domain_name='',
user_domain_id='', user_domain_name='',
zun_url=None, insecure=False)
@mock.patch('zunclient.v1.client.Client')
def test_main_endpoint_internal(self, mock_client):
self.make_env()
self.shell('--endpoint-type internalURL service-list')
mock_client.assert_called_once_with(
username='username', api_key='password',
endpoint_type='internalURL', project_id='',
project_name='tenant_name', auth_url=self.AUTH_URL,
service_type='container', region_name=None,
project_domain_id='', project_domain_name='',
user_domain_id='', user_domain_name='',
zun_url=None, insecure=False)
class ShellTestKeystoneV3(ShellTest):
AUTH_URL = 'http://no.where/v3'
def make_env(self, exclude=None, fake_env=FAKE_ENV):
if 'OS_AUTH_URL' in fake_env:
fake_env.update({'OS_AUTH_URL': self.AUTH_URL})
env = dict((k, v) for k, v in fake_env.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def register_keystone_discovery_fixture(self, mreq):
v3_url = "http://no.where/v3"
v3_version = fixture.V3Discovery(v3_url)
mreq.register_uri(
'GET', v3_url, json=_create_ver_list([v3_version]),
status_code=200)
@mock.patch('zunclient.v1.client.Client')
def test_main_endpoint_public(self, mock_client):
self.make_env(fake_env=FAKE_ENV4)
self.shell('--endpoint-type publicURL service-list')
mock_client.assert_called_once_with(
username='username', api_key='password',
endpoint_type='publicURL', project_id='tenant_id',
project_name='', auth_url=self.AUTH_URL,
service_type='container', region_name=None,
project_domain_id='', project_domain_name='Default',
user_domain_id='', user_domain_name='Default',
zun_url=None, insecure=False)

View File

@ -0,0 +1,232 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import six
from zunclient.common import cliutils
from zunclient.common import utils
from zunclient import exceptions as exc
from zunclient.tests import utils as test_utils
class CommonFiltersTest(test_utils.BaseTestCase):
def test_limit(self):
result = utils.common_filters(limit=42)
self.assertEqual(['limit=42'], result)
def test_limit_0(self):
result = utils.common_filters(limit=0)
self.assertEqual(['limit=0'], result)
def test_limit_negative_number(self):
result = utils.common_filters(limit=-2)
self.assertEqual(['limit=-2'], result)
def test_other(self):
for key in ('marker', 'sort_key', 'sort_dir'):
result = utils.common_filters(**{key: 'test'})
self.assertEqual(['%s=test' % key], result)
class SplitAndDeserializeTest(test_utils.BaseTestCase):
def test_split_and_deserialize(self):
ret = utils.split_and_deserialize('str=foo')
self.assertEqual(('str', 'foo'), ret)
ret = utils.split_and_deserialize('int=1')
self.assertEqual(('int', 1), ret)
ret = utils.split_and_deserialize('bool=false')
self.assertEqual(('bool', False), ret)
ret = utils.split_and_deserialize('list=[1, "foo", 2]')
self.assertEqual(('list', [1, "foo", 2]), ret)
ret = utils.split_and_deserialize('dict={"foo": 1}')
self.assertEqual(('dict', {"foo": 1}), ret)
ret = utils.split_and_deserialize('str_int="1"')
self.assertEqual(('str_int', "1"), ret)
def test_split_and_deserialize_fail(self):
self.assertRaises(exc.CommandError,
utils.split_and_deserialize, 'foo:bar')
class ArgsArrayToPatchTest(test_utils.BaseTestCase):
def test_args_array_to_patch(self):
my_args = {
'attributes': ['str=foo', 'int=1', 'bool=true',
'list=[1, 2, 3]', 'dict={"foo": "bar"}'],
'op': 'add',
}
patch = utils.args_array_to_patch(my_args['op'],
my_args['attributes'])
self.assertEqual([{'op': 'add', 'value': 'foo', 'path': '/str'},
{'op': 'add', 'value': 1, 'path': '/int'},
{'op': 'add', 'value': True, 'path': '/bool'},
{'op': 'add', 'value': [1, 2, 3], 'path': '/list'},
{'op': 'add', 'value': {"foo": "bar"},
'path': '/dict'}], patch)
def test_args_array_to_patch_format_error(self):
my_args = {
'attributes': ['foobar'],
'op': 'add',
}
self.assertRaises(exc.CommandError, utils.args_array_to_patch,
my_args['op'], my_args['attributes'])
def test_args_array_to_patch_remove(self):
my_args = {
'attributes': ['/foo', 'extra/bar'],
'op': 'remove',
}
patch = utils.args_array_to_patch(my_args['op'],
my_args['attributes'])
self.assertEqual([{'op': 'remove', 'path': '/foo'},
{'op': 'remove', 'path': '/extra/bar'}], patch)
class FormatLabelsTest(test_utils.BaseTestCase):
def test_format_label_none(self):
self.assertEqual({}, utils.format_labels(None))
def test_format_labels(self):
l = utils.format_labels([
'K1=V1,K2=V2,'
'K3=V3,K4=V4,'
'K5=V5'])
self.assertEqual({'K1': 'V1',
'K2': 'V2',
'K3': 'V3',
'K4': 'V4',
'K5': 'V5'
}, l)
def test_format_labels_semicolon(self):
l = utils.format_labels([
'K1=V1;K2=V2;'
'K3=V3;K4=V4;'
'K5=V5'])
self.assertEqual({'K1': 'V1',
'K2': 'V2',
'K3': 'V3',
'K4': 'V4',
'K5': 'V5'
}, l)
def test_format_labels_mix_commas_semicolon(self):
l = utils.format_labels([
'K1=V1,K2=V2,'
'K3=V3;K4=V4,'
'K5=V5'])
self.assertEqual({'K1': 'V1',
'K2': 'V2',
'K3': 'V3',
'K4': 'V4',
'K5': 'V5'
}, l)
def test_format_labels_split(self):
l = utils.format_labels([
'K1=V1,'
'K2=V22222222222222222222222222222'
'222222222222222222222222222,'
'K3=3.3.3.3'])
self.assertEqual({'K1': 'V1',
'K2': 'V22222222222222222222222222222'
'222222222222222222222222222',
'K3': '3.3.3.3'}, l)
def test_format_labels_multiple(self):
l = utils.format_labels([
'K1=V1',
'K2=V22222222222222222222222222222'
'222222222222222222222222222',
'K3=3.3.3.3'])
self.assertEqual({'K1': 'V1',
'K2': 'V22222222222222222222222222222'
'222222222222222222222222222',
'K3': '3.3.3.3'}, l)
def test_format_labels_multiple_colon_values(self):
l = utils.format_labels([
'K1=V1',
'K2=V2,V22,V222,V2222',
'K3=3.3.3.3'])
self.assertEqual({'K1': 'V1',
'K2': 'V2,V22,V222,V2222',
'K3': '3.3.3.3'}, l)
def test_format_labels_parse_comma_false(self):
l = utils.format_labels(
['K1=V1,K2=2.2.2.2,K=V'],
parse_comma=False)
self.assertEqual({'K1': 'V1,K2=2.2.2.2,K=V'}, l)
def test_format_labels_multiple_values_per_labels(self):
l = utils.format_labels([
'K1=V1',
'K1=V2'])
self.assertIn('K1', l)
self.assertIn('V1', l['K1'])
self.assertIn('V2', l['K1'])
def test_format_label_bad_label(self):
labels = ['K1=V1,K22.2.2.2']
ex = self.assertRaises(exc.CommandError,
utils.format_labels, labels)
self.assertEqual('labels must be a list of KEY=VALUE '
'not K22.2.2.2', str(ex))
def test_format_multiple_bad_label(self):
labels = ['K1=V1', 'K22.2.2.2']
ex = self.assertRaises(exc.CommandError,
utils.format_labels, labels)
self.assertEqual('labels must be a list of KEY=VALUE '
'not K22.2.2.2', str(ex))
class CliUtilsTest(test_utils.BaseTestCase):
def test_keys_and_vals_to_strs(self):
dict_in = {six.u('a'): six.u('1'),
six.u('b'): {six.u('x'): 1,
'y': six.u('2'),
six.u('z'): six.u('3')},
'c': 7}
dict_exp = collections.OrderedDict([
('a', '1'),
('b', collections.OrderedDict([
('x', 1),
('y', '2'),
('z', '3')])),
('c', 7)])
dict_out = cliutils.keys_and_vals_to_strs(dict_in)
dict_act = collections.OrderedDict([
('a', dict_out['a']),
('b', collections.OrderedDict(sorted(dict_out['b'].items()))),
('c', dict_out['c'])])
self.assertEqual(six.text_type(dict_exp), six.text_type(dict_act))

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_zunclient
----------------------------------
Tests for `zunclient` module.
"""
from zunclient.tests import base
class TestZunclient(base.TestCase):
def test_something(self):
pass

180
zunclient/tests/utils.py Normal file
View File

@ -0,0 +1,180 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import os
import sys
import fixtures
import six
import testtools
from zunclient.common import httpclient as http
from zunclient import shell
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_NAME': 'tenant_name',
'OS_AUTH_URL': 'http://no.where/v2.0'}
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.useFixture(fixtures.FakeLogger())
class FakeAPI(object):
def __init__(self, responses):
self.responses = responses
self.calls = []
def _request(self, method, url, headers=None, body=None):
call = (method, url, headers or {}, body)
self.calls.append(call)
return self.responses[url][method]
def raw_request(self, *args, **kwargs):
response = self._request(*args, **kwargs)
body_iter = http.ResponseBodyIterator(six.StringIO(response[1]))
return FakeResponse(response[0]), body_iter
def json_request(self, *args, **kwargs):
response = self._request(*args, **kwargs)
return FakeResponse(response[0]), response[1]
class FakeConnection(object):
def __init__(self, response=None):
self._response = response
self._last_request = None
def request(self, method, conn_url, **kwargs):
self._last_request = (method, conn_url, kwargs)
def setresponse(self, response):
self._response = response
def getresponse(self):
return self._response
class FakeResponse(object):
def __init__(self, headers, body=None, version=None, status=None,
reason=None):
"""Fake object to help testing.
:param headers: dict representing HTTP response headers
:param body: file-like object
"""
self.headers = headers
self.body = body
self.version = version
self.status = status
self.reason = reason
def getheaders(self):
return copy.deepcopy(self.headers).items()
def getheader(self, key, default):
return self.headers.get(key, default)
def read(self, amt):
return self.body.read(amt)
class FakeServiceCatalog(object):
def url_for(self, endpoint_type, service_type, attr=None,
filter_value=None):
if attr == 'region' and filter_value:
return 'http://regionhost:6385/v1/f14b41234'
else:
return 'http://localhost:6385/v1/f14b41234'
class FakeKeystone(object):
service_catalog = FakeServiceCatalog()
timestamp = datetime.datetime.utcnow() + datetime.timedelta(days=5)
def __init__(self, auth_token):
self.auth_token = auth_token
self.auth_ref = {
'token': {'expires': FakeKeystone.timestamp.strftime(
'%Y-%m-%dT%H:%M:%S.%f'),
'id': 'd1a541311782870742235'}
}
class TestCase(testtools.TestCase):
TEST_REQUEST_BASE = {
'verify': True,
}
def setUp(self):
super(TestCase, self).setUp()
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
def make_env(self, exclude=None, fake_env=FAKE_ENV):
env = dict((k, v) for k, v in fake_env.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def shell(self, argstr, exitcodes=(0,)):
orig = sys.stdout
orig_stderr = sys.stderr
try:
sys.stdout = six.StringIO()
sys.stderr = six.StringIO()
_shell = shell.OpenStackZunShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertIn(exc_value.code, exitcodes)
finally:
stdout = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
stderr = sys.stderr.getvalue()
sys.stderr.close()
sys.stderr = orig_stderr
return (stdout, stderr)
class FakeSessionResponse(object):
def __init__(self, headers, content=None, status_code=None):
self.headers = headers
self.content = content
self.status_code = status_code
class FakeSession(object):
def __init__(self, headers, content=None, status_code=None):
self.headers = headers
self.content = content
self.status_code = status_code
def request(self, url, method, **kwargs):
return FakeSessionResponse(self.headers, self.content,
self.status_code)

View File

View File

@ -0,0 +1,77 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import mock
from testtools import matchers
from zunclient.tests import utils
FAKE_ENV = {'OS_USERNAME': 'username',
'OS_PASSWORD': 'password',
'OS_TENANT_NAME': 'tenant_name',
'OS_AUTH_URL': 'http://no.where/v2.0',
'BYPASS_URL': 'http://zun'}
class TestCommandLineArgument(utils.TestCase):
_unrecognized_arg_error = [
'.*?^usage: ',
'.*?^error: unrecognized arguments:',
".*?^Try 'zun help ' for more information.",
]
_mandatory_arg_error = [
'.*?^usage: ',
'.*?^error: (the following arguments|argument)',
".*?^Try 'zun help ",
]
_few_argument_error = [
'.*?^usage: zun ',
'.*?^error: (the following arguments|too few arguments)',
".*?^Try 'zun help ",
]
_invalid_value_error = [
'.*?^usage: ',
'.*?^error: argument .*: invalid .* value:',
".*?^Try 'zun help ",
]
def setUp(self):
super(TestCommandLineArgument, self).setUp()
self.make_env(fake_env=FAKE_ENV)
session_client = mock.patch(
'zunclient.common.httpclient.SessionClient')
session_client.start()
loader = mock.patch('keystoneauth1.loading.get_plugin_loader')
loader.start()
session = mock.patch('keystoneauth1.session.Session')
session.start()
self.addCleanup(session_client.stop)
self.addCleanup(loader.stop)
self.addCleanup(session.stop)
def _test_arg_success(self, command):
stdout, stderr = self.shell(command)
def _test_arg_failure(self, command, error_msg):
stdout, stderr = self.shell(command, (2,))
for line in error_msg:
self.assertThat((stdout + stderr),
matchers.MatchesRegex(line,
re.DOTALL | re.MULTILINE))

View File

@ -0,0 +1,161 @@
# Copyright (c) 2015 Thales Services SAS
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from zunclient.v1 import client
class ClientTest(testtools.TestCase):
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_session(self, mock_session, http_client):
session = mock.Mock()
client.Client(session=session)
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=session)
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.token_endpoint.Token')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_token_and_url(
self, mock_session, mock_token, http_client):
mock_auth_plugin = mock.Mock()
mock_token.return_value = mock_auth_plugin
session = mock.Mock()
mock_session.return_value = session
client.Client(input_auth_token='mytoken', zun_url='http://myurl/')
mock_session.assert_called_once_with(
auth=mock_auth_plugin, verify=True)
http_client.assert_called_once_with(
endpoint_override='http://myurl/',
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=session)
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.loading.get_plugin_loader')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_token(
self, mock_session, mock_loader, http_client):
mock_plugin = mock.Mock()
mock_loader.return_value = mock_plugin
client.Client(input_auth_token='mytoken', auth_url='authurl')
mock_loader.assert_called_once_with('token')
mock_plugin.load_from_options.assert_called_once_with(
auth_url='authurl',
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None,
user_domain_id=None,
user_domain_name=None,
token='mytoken')
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=mock.ANY)
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.loading.get_plugin_loader')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_user(
self, mock_session, mock_loader, http_client):
mock_plugin = mock.Mock()
mock_loader.return_value = mock_plugin
client.Client(username='myuser', auth_url='authurl')
mock_loader.assert_called_once_with('password')
mock_plugin.load_from_options.assert_called_once_with(
auth_url='authurl',
username='myuser',
password=None,
project_domain_id=None,
project_domain_name=None,
user_domain_id=None,
user_domain_name=None,
project_id=None,
project_name=None)
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=mock.ANY)
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.loading.get_plugin_loader')
@mock.patch('keystoneauth1.session.Session')
def test_init_unauthorized(
self, mock_session, mock_loader, http_client):
mock_plugin = mock.Mock()
mock_loader.return_value = mock_plugin
mock_session_obj = mock.Mock()
mock_session.return_value = mock_session_obj
mock_session_obj.get_endpoint.side_effect = Exception()
self.assertRaises(
RuntimeError,
client.Client, username='myuser', auth_url='authurl')
mock_loader.assert_called_once_with('password')
mock_plugin.load_from_options.assert_called_once_with(
auth_url='authurl',
username='myuser',
password=None,
project_domain_id=None,
project_domain_name=None,
user_domain_id=None,
user_domain_name=None,
project_id=None,
project_name=None)
http_client.assert_not_called()
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_endpoint_override(self, mock_session, http_client):
session = mock.Mock()
client.Client(session=session, endpoint_override='zunurl')
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=session,
endpoint_override='zunurl')
@mock.patch('zunclient.common.httpclient.SessionClient')
@mock.patch('keystoneauth1.session.Session')
def test_init_with_zun_url_and_endpoint_override(self, mock_session,
http_client):
session = mock.Mock()
client.Client(session=session, zun_url='zunurl',
endpoint_override='zunurl_override')
mock_session.assert_not_called()
http_client.assert_called_once_with(
interface='public',
region_name=None,
service_name=None,
service_type='container',
session=session,
endpoint_override='zunurl')

View File

@ -0,0 +1,161 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from testtools import matchers
from zunclient.tests import utils
from zunclient.v1 import services
SERVICE1 = {'id': 123,
'host': 'fake-host1',
'binary': 'fake-bin1',
'state': 'up',
}
SERVICE2 = {'id': 124,
'host': 'fake-host2',
'binary': 'fake-bin2',
'state': 'down',
}
fake_responses = {
'/v1/services':
{
'GET': (
{},
{'services': [SERVICE1, SERVICE2]},
),
},
'/v1/services/?limit=2':
{
'GET': (
{},
{'services': [SERVICE1, SERVICE2]},
),
},
'/v1/services/?marker=%s' % SERVICE2['id']:
{
'GET': (
{},
{'services': [SERVICE1, SERVICE2]},
),
},
'/v1/services/?limit=2&marker=%s' % SERVICE2['id']:
{
'GET': (
{},
{'services': [SERVICE2, SERVICE1]},
),
},
'/v1/services/?sort_dir=asc':
{
'GET': (
{},
{'services': [SERVICE1, SERVICE2]},
),
},
'/v1/services/?sort_key=id':
{
'GET': (
{},
{'services': [SERVICE1, SERVICE2]},
),
},
'/v1/services/?sort_key=id&sort_dir=desc':
{
'GET': (
{},
{'services': [SERVICE2, SERVICE1]},
),
},
}
class ServiceManagerTest(testtools.TestCase):
def setUp(self):
super(ServiceManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = services.MServiceManager(self.api)
def test_service_list(self):
services = self.mgr.list()
expect = [
('GET', '/v1/services', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(services, matchers.HasLength(2))
def _test_service_list_with_filters(
self, limit=None, marker=None,
sort_key=None, sort_dir=None,
detail=False, expect=[]):
services_filter = self.mgr.list(limit=limit, marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
detail=detail)
self.assertEqual(expect, self.api.calls)
self.assertThat(services_filter, matchers.HasLength(2))
def test_service_list_with_limit(self):
expect = [
('GET', '/v1/services/?limit=2', {}, None),
]
self._test_service_list_with_filters(
limit=2,
expect=expect)
def test_service_list_with_marker(self):
expect = [
('GET', '/v1/services/?marker=%s' % SERVICE2['id'],
{}, None),
]
self._test_service_list_with_filters(
marker=SERVICE2['id'],
expect=expect)
def test_service_list_with_marker_limit(self):
expect = [
('GET', '/v1/services/?limit=2&marker=%s' % SERVICE2['id'],
{}, None),
]
self._test_service_list_with_filters(
limit=2, marker=SERVICE2['id'],
expect=expect)
def test_service_list_with_sort_dir(self):
expect = [
('GET', '/v1/services/?sort_dir=asc',
{}, None),
]
self._test_service_list_with_filters(
sort_dir='asc',
expect=expect)
def test_service_list_with_sort_key(self):
expect = [
('GET', '/v1/services/?sort_key=id',
{}, None),
]
self._test_service_list_with_filters(
sort_key='id',
expect=expect)
def test_service_list_with_sort_key_dir(self):
expect = [
('GET', '/v1/services/?sort_key=id&sort_dir=desc',
{}, None),
]
self._test_service_list_with_filters(
sort_key='id', sort_dir='desc',
expect=expect)

View File

@ -0,0 +1,31 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from zunclient.tests.v1 import shell_test_base
class ShellTest(shell_test_base.TestCommandLineArgument):
@mock.patch('zunclient.v1.services.ServiceManager.list')
def test_zun_service_list_success(self, mock_list):
self._test_arg_success('service-list')
self.assertTrue(mock_list.called)
@mock.patch('zunclient.v1.services.ServiceManager.list')
def test_zun_service_list_failure(self, mock_list):
self._test_arg_failure('service-list --wrong',
self._unrecognized_arg_error)
self.assertFalse(mock_list.called)

0
zunclient/v1/__init__.py Normal file
View File

109
zunclient/v1/client.py Normal file
View File

@ -0,0 +1,109 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1 import loading
from keystoneauth1 import session as ksa_session
from zunclient.common import httpclient
from zunclient.v1 import services
class Client(object):
def __init__(self, username=None, api_key=None, project_id=None,
project_name=None, auth_url=None, zun_url=None,
endpoint_type=None, endpoint_override=None,
service_type='container',
region_name=None, input_auth_token=None,
session=None, password=None, auth_type='password',
interface='public', service_name=None, insecure=False,
user_domain_id=None, user_domain_name=None,
project_domain_id=None, project_domain_name=None):
# We have to keep the api_key are for backwards compat, but let's
# remove it from the rest of our code since it's not a keystone
# concept
if not password:
password = api_key
# Backwards compat for people assing in endpoint_type
if endpoint_type:
interface = endpoint_type
# fix (yolanda): os-cloud-config is using endpoint_override
# instead of zun_url
if endpoint_override and not zun_url:
zun_url = endpoint_override
if zun_url and input_auth_token:
auth_type = 'admin_token'
session = None
loader_kwargs = dict(
token=input_auth_token,
endpoint=zun_url)
elif input_auth_token and not session:
auth_type = 'token'
loader_kwargs = dict(
token=input_auth_token,
auth_url=auth_url,
project_id=project_id,
project_name=project_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
else:
loader_kwargs = dict(
username=username,
password=password,
auth_url=auth_url,
project_id=project_id,
project_name=project_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
# Backwards compatibility for people not passing in Session
if session is None:
loader = loading.get_plugin_loader(auth_type)
# This should be able to handle v2 and v3 Keystone Auth
auth_plugin = loader.load_from_options(**loader_kwargs)
session = ksa_session.Session(
auth=auth_plugin, verify=(not insecure))
client_kwargs = {}
if zun_url:
client_kwargs['endpoint_override'] = zun_url
if not zun_url:
try:
# Trigger an auth error so that we can throw the exception
# we always have
session.get_endpoint(
service_type=service_type,
service_name=service_name,
interface=interface,
region_name=region_name)
except Exception:
raise RuntimeError("Not Authorized")
self.http_client = httpclient.SessionClient(
service_type=service_type,
service_name=service_name,
interface=interface,
region_name=region_name,
session=session,
**client_kwargs)
self.services = services.ServiceManager(self.http_client)

71
zunclient/v1/services.py Normal file
View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zunclient.common import base
from zunclient.common import utils
class Service(base.Resource):
def __repr__(self):
return "<Service %s>" % self._info
class ServiceManager(base.Manager):
resource_class = Service
@staticmethod
def _path(id=None):
return '/v1/services/%s' % id if id else '/v1/services'
def list(self, marker=None, limit=None, sort_key=None,
sort_dir=None, detail=False):
"""Retrieve list of zun services.
:param marker: Optional, the ID of a zun service, eg the last
services from a previous result set. Return
the next result set.
:param limit: The maximum number of results to return per
request, if:
1) limit > 0, the maximum number of services to return.
2) limit == 0, return the entire list of services.
3) limit param is NOT specified (None), the number of items
returned respect the maximum imposed by the Zun API
(see Zun's api.max_limit option).
:param sort_key: Optional, field used for sorting.
:param sort_dir: Optional, direction of sorting, either 'asc' (the
default) or 'desc'.
:param detail: Optional, boolean whether to return detailed information
about services.
:returns: A list of services.
"""
if limit is not None:
limit = int(limit)
filters = utils.common_filters(marker, limit, sort_key, sort_dir)
path = ''
if detail:
path += 'detail'
if filters:
path += '?' + '&'.join(filters)
if limit is None:
return self._list(self._path(path), "services")
else:
return self._list_pagination(self._path(path), "services",
limit=limit)

View File

@ -0,0 +1,25 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zunclient.common import cliutils as utils
from zunclient.common import utils as zun_utils
def do_service_list(cs, args):
"""Print a list of zun services."""
services = cs.services.list()
columns = ('id', 'host', 'binary', 'state')
utils.print_list(services, columns,
{'versions': zun_utils.print_list_field('versions')})

20
zunclient/v1/shell.py Normal file
View File

@ -0,0 +1,20 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zunclient.v1 import services_shell
COMMAND_MODULES = [
services_shell,
]

18
zunclient/version.py Normal file
View File

@ -0,0 +1,18 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pbr import version
version_info = version.VersionInfo('python-zunclient')