Andreas Jaeger c609b98c40 Update api-ref location
The api documentation is now published on docs.openstack.org instead
of developer.openstack.org. Update all links that are changed to the
new location.

Note that redirects will be set up as well but let's point now to the
new location.

For details, see:
http://lists.openstack.org/pipermail/openstack-discuss/2019-July/007828.html

Change-Id: I1572a21632740b4d9a233a6a31c49e3bac5394ef
2019-07-22 20:55:43 +02:00

657 lines
16 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Compute v2 API Library"""
from keystoneauth1 import exceptions as ksa_exceptions
from osc_lib.api import api
from osc_lib import exceptions
from osc_lib.i18n import _
# TODO(dtroyer): Mingrate this to osc-lib
class InvalidValue(Exception):
"""An argument value is not valid: wrong type, out of range, etc"""
message = "Supplied value is not valid"
class APIv2(api.BaseAPI):
"""Compute v2 API"""
def __init__(self, **kwargs):
super(APIv2, self).__init__(**kwargs)
# Overrides
def _check_integer(self, value, msg=None):
"""Attempt to convert value to an integer
Raises InvalidValue on failure
:param value:
Convert this to an integer. None is converted to 0 (zero).
:param msg:
An alternate message for the exception, must include exactly
one substitution to receive the attempted value.
"""
if value is None:
return 0
try:
value = int(value)
except (TypeError, ValueError):
if not msg:
msg = _("%s is not an integer") % value
raise InvalidValue(msg)
return value
# TODO(dtroyer): Override find() until these fixes get into an osc-lib
# minimum release
def find(
self,
path,
value=None,
attr=None,
):
"""Find a single resource by name or ID
:param string path:
The API-specific portion of the URL path
:param string value:
search expression (required, really)
:param string attr:
name of attribute for secondary search
"""
try:
ret = self._request('GET', "/%s/%s" % (path, value)).json()
if isinstance(ret, dict):
# strip off the enclosing dict
key = list(ret.keys())[0]
ret = ret[key]
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
kwargs = {attr: value}
try:
ret = self.find_one(path, **kwargs)
except ksa_exceptions.NotFound:
msg = _("%s not found") % value
raise exceptions.NotFound(msg)
return ret
# Floating IPs
def floating_ip_add(
self,
server,
address,
fixed_address=None,
):
"""Add a floating IP to a server
:param server:
The :class:`Server` (or its ID) to add an IP to.
:param address:
The FloatingIP or string floating address to add.
:param fixed_address:
The FixedIP the floatingIP should be associated with (optional)
"""
url = '/servers'
server = self.find(
url,
attr='name',
value=server,
)
address = address.ip if hasattr(address, 'ip') else address
if fixed_address:
if hasattr(fixed_address, 'ip'):
fixed_address = fixed_address.ip
body = {
'address': address,
'fixed_address': fixed_address,
}
else:
body = {
'address': address,
}
return self._request(
"POST",
"/%s/%s/action" % (url, server['id']),
json={'addFloatingIp': body},
)
def floating_ip_create(
self,
pool=None,
):
"""Create a new floating ip
https://docs.openstack.org/api-ref/compute/#create-allocate-floating-ip-address
:param pool: Name of floating IP pool
"""
url = "/os-floating-ips"
try:
return self.create(
url,
json={'pool': pool},
)['floating_ip']
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
msg = _("%s not found") % pool
raise exceptions.NotFound(msg)
def floating_ip_delete(
self,
floating_ip_id=None,
):
"""Delete a floating IP
https://docs.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address
:param string floating_ip_id:
Floating IP ID
"""
url = "/os-floating-ips"
if floating_ip_id is not None:
return self.delete('/%s/%s' % (url, floating_ip_id))
return None
def floating_ip_find(
self,
floating_ip=None,
):
"""Return a security group given name or ID
https://docs.openstack.org/api-ref/compute/#list-floating-ip-addresses
:param string floating_ip:
Floating IP address
:returns: A dict of the floating IP attributes
"""
url = "/os-floating-ips"
return self.find(
url,
attr='ip',
value=floating_ip,
)
def floating_ip_list(
self,
):
"""Get floating IPs
https://docs.openstack.org/api-ref/compute/#show-floating-ip-address-details
:returns:
list of floating IPs
"""
url = "/os-floating-ips"
return self.list(url)["floating_ips"]
def floating_ip_remove(
self,
server,
address,
):
"""Remove a floating IP from a server
:param server:
The :class:`Server` (or its ID) to add an IP to.
:param address:
The FloatingIP or string floating address to add.
"""
url = '/servers'
server = self.find(
url,
attr='name',
value=server,
)
address = address.ip if hasattr(address, 'ip') else address
body = {
'address': address,
}
return self._request(
"POST",
"/%s/%s/action" % (url, server['id']),
json={'removeFloatingIp': body},
)
# Floating IP Pools
def floating_ip_pool_list(
self,
):
"""Get floating IP pools
https://docs.openstack.org/api-ref/compute/?expanded=#list-floating-ip-pools
:returns:
list of floating IP pools
"""
url = "/os-floating-ip-pools"
return self.list(url)["floating_ip_pools"]
# Hosts
def host_list(
self,
zone=None,
):
"""Lists hypervisor Hosts
https://docs.openstack.org/api-ref/compute/#list-hosts
Valid for Compute 2.0 - 2.42
:param string zone:
Availability zone
:returns: A dict of the floating IP attributes
"""
url = "/os-hosts"
if zone:
url = '/os-hosts?zone=%s' % zone
return self.list(url)["hosts"]
def host_set(
self,
host=None,
status=None,
maintenance_mode=None,
**params
):
"""Modify host properties
https://docs.openstack.org/api-ref/compute/#update-host-status
Valid for Compute 2.0 - 2.42
status
maintenance_mode
"""
url = "/os-hosts"
params = {}
if status:
params['status'] = status
if maintenance_mode:
params['maintenance_mode'] = maintenance_mode
if params == {}:
# Don't bother calling if nothing given
return None
else:
return self._request(
"PUT",
"/%s/%s" % (url, host),
json=params,
).json()
def host_show(
self,
host=None,
):
"""Show host
https://docs.openstack.org/api-ref/compute/#show-host-details
Valid for Compute 2.0 - 2.42
"""
url = "/os-hosts"
r_host = self.find(
url,
attr='host_name',
value=host,
)
data = []
for h in r_host:
data.append(h['resource'])
return data
# Networks
def network_create(
self,
name=None,
subnet=None,
share_subnet=None,
):
"""Create a new network
https://docs.openstack.org/api-ref/compute/#create-network
:param string name:
Network label (required)
:param integer subnet:
Subnet for IPv4 fixed addresses in CIDR notation (required)
:param integer share_subnet:
Shared subnet between projects, True or False
:returns: A dict of the network attributes
"""
url = "/os-networks"
params = {
'label': name,
'cidr': subnet,
}
if share_subnet is not None:
params['share_address'] = share_subnet
return self.create(
url,
json={'network': params},
)['network']
def network_delete(
self,
network=None,
):
"""Delete a network
https://docs.openstack.org/api-ref/compute/#delete-network
:param string network:
Network name or ID
"""
url = "/os-networks"
network = self.find(
url,
attr='label',
value=network,
)['id']
if network is not None:
return self.delete('/%s/%s' % (url, network))
return None
def network_find(
self,
network=None,
):
"""Return a network given name or ID
https://docs.openstack.org/api-ref/compute/#show-network-details
:param string network:
Network name or ID
:returns: A dict of the network attributes
"""
url = "/os-networks"
return self.find(
url,
attr='label',
value=network,
)
def network_list(
self,
):
"""Get networks
https://docs.openstack.org/api-ref/compute/#list-networks
:returns:
list of networks
"""
url = "/os-networks"
return self.list(url)["networks"]
# Security Groups
def security_group_create(
self,
name=None,
description=None,
):
"""Create a new security group
https://docs.openstack.org/api-ref/compute/#create-security-group
:param string name:
Security group name
:param integer description:
Security group description
"""
url = "/os-security-groups"
params = {
'name': name,
'description': description,
}
return self.create(
url,
json={'security_group': params},
)['security_group']
def security_group_delete(
self,
security_group=None,
):
"""Delete a security group
https://docs.openstack.org/api-ref/compute/#delete-security-group
:param string security_group:
Security group name or ID
"""
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)['id']
if security_group is not None:
return self.delete('/%s/%s' % (url, security_group))
return None
def security_group_find(
self,
security_group=None,
):
"""Return a security group given name or ID
https://docs.openstack.org/api-ref/compute/#show-security-group-details
:param string security_group:
Security group name or ID
:returns: A dict of the security group attributes
"""
url = "/os-security-groups"
return self.find(
url,
attr='name',
value=security_group,
)
def security_group_list(
self,
limit=None,
marker=None,
search_opts=None,
):
"""Get security groups
https://docs.openstack.org/api-ref/compute/#list-security-groups
:param integer limit:
query return count limit
:param string marker:
query marker
:param search_opts:
(undocumented) Search filter dict
all_tenants: True|False - return all projects
:returns:
list of security groups names
"""
params = {}
if search_opts is not None:
params = dict((k, v) for (k, v) in search_opts.items() if v)
if limit:
params['limit'] = limit
if marker:
params['offset'] = marker
url = "/os-security-groups"
return self.list(url, **params)["security_groups"]
def security_group_set(
self,
security_group=None,
# name=None,
# description=None,
**params
):
"""Update a security group
https://docs.openstack.org/api-ref/compute/#update-security-group
:param string security_group:
Security group name or ID
TODO(dtroyer): Create an update method in osc-lib
"""
# Short-circuit no-op
if params is None:
return None
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)
if security_group is not None:
for (k, v) in params.items():
# Only set a value if it is already present
if k in security_group:
security_group[k] = v
return self._request(
"PUT",
"/%s/%s" % (url, security_group['id']),
json={'security_group': security_group},
).json()['security_group']
return None
# Security Group Rules
def security_group_rule_create(
self,
security_group_id=None,
ip_protocol=None,
from_port=None,
to_port=None,
remote_ip=None,
remote_group=None,
):
"""Create a new security group rule
https://docs.openstack.org/api-ref/compute/#create-security-group-rule
:param string security_group_id:
Security group ID
:param ip_protocol:
IP protocol, 'tcp', 'udp' or 'icmp'
:param from_port:
Source port
:param to_port:
Destination port
:param remote_ip:
Source IP address in CIDR notation
:param remote_group:
Remote security group
"""
url = "/os-security-group-rules"
if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']:
raise InvalidValue(
"%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol
)
params = {
'parent_group_id': security_group_id,
'ip_protocol': ip_protocol,
'from_port': self._check_integer(from_port),
'to_port': self._check_integer(to_port),
'cidr': remote_ip,
'group_id': remote_group,
}
return self.create(
url,
json={'security_group_rule': params},
)['security_group_rule']
def security_group_rule_delete(
self,
security_group_rule_id=None,
):
"""Delete a security group rule
https://docs.openstack.org/api-ref/compute/#delete-security-group-rule
:param string security_group_rule_id:
Security group rule ID
"""
url = "/os-security-group-rules"
if security_group_rule_id is not None:
return self.delete('/%s/%s' % (url, security_group_rule_id))
return None