RBAC Patch 5: Nova tests

This patch chain aims to suggest a set of default policies for user
management on stx-openstack. We suggest the creation of the project_admin
and project_readonly roles and provide some policies to fine tune the
access control over the Openstack services to those roles, as described
on README.md.

Also, we provide a set of tests to ensure the policies and permissions
are all working as expected on site for the cloud administrators.

This commit includes Nova related tests and functions

Story: 2008910
Task: 42501

Signed-off-by: Heitor Matsui <heitorvieira.matsui@windriver.com>
Signed-off-by: Thiago Brito <thiago.brito@windriver.com>
Co-authored-by: Miriam Yumi Peixoto <miriam.yumipeixoto@windriver.com>
Co-authored-by: Leonardo Zaccarias <leonardo.zaccarias@windriver.com>
Co-authored-by: Rogerio Oliveira Ferraz <rogeriooliveira.ferraz@windriver.com>

Change-Id: Ic1b10381e789751d655e35560c119876029f8fd7
This commit is contained in:
Heitor Matsui 2021-05-17 18:09:35 -03:00
parent 319c274e09
commit 82d50c6f9c
4 changed files with 1695 additions and 1 deletions

View File

@ -1648,4 +1648,244 @@ class OpenStackBasicTesting():
# ResourceNotFound when no resource can be found.
"""
sg = self._find_security_group(sg_name_or_id, ignore_missing=False)
return self.os_sdk_conn.network.get_security_group(sg.id)
return self.os_sdk_conn.network.get_security_group(sg.id)
# -------------------------------------------------------------------------
# Server methods - Nova
# -------------------------------------------------------------------------
def _create_server(self, server_name, image_name_or_id, flavor_name_or_id,
network_name=None, autoclear=True, **attrs):
"""
# Create a new server from attributes
# Parameters
# autoclear Used in the teardown mechanism (keep default value)
# attrs (dict) Keyword arguments which will be used to create a
# Server, comprised of the properties on the Server class.
# Returns
# The results of server creation
# Return type
# Server
"""
image = self._find_image(image_name_or_id, ignore_missing=False)
flavor = self._find_flavor(flavor_name_or_id, ignore_missing=False)
if network_name:
network = self.os_sdk_conn.network.find_network(network_name)
instance = self.os_sdk_conn.compute.create_server(
name=server_name,
image_id=image.id,
flavor_id=flavor.id,
networks=[{"uuid": network.id}],
**attrs
)
else:
# instance = self.os_sdk_conn.compute.create_server(
# name=server_name,
# image_id=image.id,
# flavor_id=flavor.id,
# networks=[],
# autoip=False,
# **attrs
# )
instance = self.os_sdk_conn.compute.create_server(
name=server_name,
image_id=image.id,
flavor_id=flavor.id,
networks=[],
**attrs
)
if debug1: print(
"created instance: " + instance.name + " id: " + instance.id)
if autoclear:
self.instances_clearing.append(instance.id)
return self._wait_for_server(instance)
def _delete_server(self, instance_name_or_id, autoclear=True):
"""
# Delete a server
# Parameters
# name_or_id The name or ID of a server.
# ignore_missing (bool) When set to False ResourceNotFound will be
# raised when the server does not exist. When set to True, no
# exception will be set when attempting to delete a nonexistent
# server
# force (bool) When set to True, the server deletion will be forced
# immediately.
# autoclear Used in the teardown mechanism (keep default value)
# Returns
# None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
if instance:
self.os_sdk_conn.compute.delete_server(instance.id,
ignore_missing=False)
if debug1: print(
"deleted instance: " + instance.name + " id: " + instance.id)
if autoclear:
self.instances_clearing.remove(instance.id)
self.os_sdk_conn.compute.wait_for_delete(instance)
elif debug1:
print("error: not found instance: " + instance_name_or_id)
def _update_server(self, instance_name_or_id, **kwargs):
"""
# Update a server
# Parameters
# instance_name_or_id The name or ID of a server.
# Attrs kwargs
# The attributes to update on the server represented by server.
# Returns
# The updated server
# Return type
# Server
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
update = None
if instance:
update = self.os_sdk_conn.compute.update_server(instance.id,
**kwargs)
if debug1: print(
"updated instance: " + instance.name + " id: " + instance.id)
elif debug1:
print("error: not found instance: " + instance_name_or_id)
return update
def _wait_for_server(self, server, status='ACTIVE', failures=None,
interval=2, wait=120):
"""
# Wait for a server to be in a particular status.
# Parameters
# server (Server:) The Server to wait on to reach the specified
# status.
# status Desired status.
# failures (list) Statuses that would be interpreted as failures.
# interval (int) Number of seconds to wait before to consecutive
# checks. Default to 2.
# wait (int) Maximum number of seconds to wait before the change.
# Default to 120.
# Returns
# The resource is returned on success.
# Raises
# ResourceTimeout if transition to the desired status failed to occur
# in specified seconds.
# ResourceFailure if the resource has transited to one of the failure
# statuses.
# AttributeError if the resource does not have a status attribute.
"""
return self.os_sdk_conn.compute.wait_for_server(server, status=status,
failures=failures,
interval=interval,
wait=wait)
def _list_servers(self, details=True, all_projects=False, **query):
"""
# Retrieve a generator of servers
# Parameters
# details (bool) When set to False instances with only basic data
# will be returned. The default, True, will cause instances with full
# data to be returned.
# query (kwargs) Optional query parameters to be sent to limit the
# servers being returned. Available parameters can be seen under
# https://docs.openstack.org/api-ref/compute/#list-servers
# Returns
# A generator of server instances.
##
# def _servers(self, details=True, all_projects=False, **query):
# return self.os_sdk_conn.compute.servers(details, all_projects,
# **query)
"""
return self.os_sdk_conn.compute.servers(details=details,
all_projects=all_projects,
**query)
def _find_server(self, instance_name_or_id, ignore_missing=True):
"""
# Find a single server
# Parameters
# instance_name_or_id The name or ID of a server.
# ignore_missing (bool) When set to False ResourceNotFound will be
# raised when the resource does not exist. When set to True, None
# will be returned when attempting to find a nonexistent resource.
# Returns
# One Server or None
"""
return self.os_sdk_conn.compute.find_server(
instance_name_or_id,
ignore_missing=ignore_missing
)
def _get_server(self, instance_name_or_id):
"""
# Get a single server
# Parameters:
# instance_name_or_id The name or ID of a server.
# Returns:
# One Server
# Raises:
# ResourceNotFound when no resource can be found.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.get_server(instance.id)
def _get_flavor(self, flavor_name_or_id, get_extra_specs=False):
"""
# Get a single flavor
# Parameters
# flavor_name_or_id The name or ID of a flavor.
# get_extra_specs (bool) When set to True and extra_specs not
# present in the response will invoke additional API call to fetch
# extra_specs.
# Returns
# One Flavor
# Raises
# ResourceNotFound when no resource can be found.
"""
flavor = self._find_flavor(flavor_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.get_flavor(flavor.id,
get_extra_specs=get_extra_specs)
def _find_flavor(self, flavor_name_or_id, ignore_missing=True,
get_extra_specs=False, **query):
"""
# Find a single flavor
# Parameters
# flavor_name_or_id The name or ID of a flavor.
# ignore_missing (bool) When set to False ResourceNotFound will be
# raised when the resource does not exist. When set to True, None
# will be returned when attempting to find a nonexistent resource.
# get_extra_specs (bool) When set to True and extra_specs not
# present in the response will invoke additional API call to fetch
# extra_specs.
# query (kwargs) Optional query parameters to be sent to limit the
# flavors being returned.
# Returns
# One Flavor or None
"""
return self.os_sdk_conn.compute.find_flavor(flavor_name_or_id,
ignore_missing=ignore_missing,
get_extra_specs=get_extra_specs,
**query)
def _list_flavors(self, details=True, get_extra_specs=False, **query):
"""
# Return a generator of flavors
# Parameters
# details (bool) When True, returns Flavor objects, with additional
# attributes filled.
# get_extra_specs (bool) When set to True and extra_specs not
# present in the response will invoke additional API call to fetch
# extra_specs.
# query (kwargs) Optional query parameters to be sent to limit the
# flavors being returned.
# Returns
# A generator of flavor objects
##
# def _flavors(self):
# return self.os_sdk_conn.compute.flavors(details=True,
# get_extra_specs=False, **query)
"""
return self.os_sdk_conn.compute.flavors(
details=details,
get_extra_specs=get_extra_specs,
**query
)

View File

@ -0,0 +1,363 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# All Rights Reserved.
#
from tests.fv_rbac import OpenStackBasicTesting
class OpenStackComputeTesting(OpenStackBasicTesting):
def _get_server_console_output(self, instance_name_or_id, length=10):
"""
Return the console output for a server.
Parameters
instance_name_or_id The name or ID of a server.
length Optional number of line to fetch from the end of console log. All lines will be returned if this is not specified.
Returns
The console output as a dict. Control characters will be escaped to create a valid JSON string.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.get_server_console_output(instance.id, length)
def _get_vnc_console(self, instance_name_or_id, console_type):
"""
Get a vnc console for an instance
Parameters
instance_name_or_id The name or ID of a server.
console_type Type of vnc console to get (novnc or xvpvnc)
Returns
An instance of novaclient.base.DictWithMeta
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.nc.servers.get_vnc_console(instance.id, console_type)
def _create_server_image(self, server, name, metadata=None, wait=False, timeout=120):
"""
Create an image from a server
Parameters
server Either the ID of a server or a Server instance.
name (str) The name of the image to be created.
metadata (dict) A dictionary of metadata to be set on the image.
Returns
Image object
"""
return self.os_sdk_conn.compute.create_server_image(server.id, name, metadata=metadata, wait=wait, timeout=timeout)
def _rebuild_server(self, instance_name_or_id, new_instance_name, user_password, image_name_or_id, **attrs):
"""
Rebuild a server
Parameters
instance_name_or_id The name or ID of a server.
new_instance_name (str) Name of the new server instance
user_password (str) The user password
preserve_ephemeral (bool) Indicates whether the server is rebuilt with the preservation of the ephemeral partition. Default: False
image_name_or_id The name or ID of an image to rebuild with
access_ipv4 (str) The IPv4 address to rebuild with. Default: None
access_ipv6 (str) The IPv6 address to rebuild with. Default: None
metadata (dict) A dictionary of metadata to rebuild with. Default: None
personality A list of dictionaries, each including a path and contents key, to be injected into the rebuilt server at launch. Default: None
Returns
The rebuilt Server instance.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
image = self._find_image(image_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.rebuild_server(instance.id, new_instance_name, user_password, image=image.id, **attrs)
instance = self._wait_for_server(instance, status='REBUILD', wait=60)
return self._wait_for_server(instance, wait=60)
def _start_server(self, instance_name_or_id):
"""
Starts a stopped server and changes its state to ACTIVE.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.start_server(instance.id)
return self._wait_for_server(instance, wait=60)
def _stop_server(self, instance_name_or_id):
"""
Stops a running server and changes its state to SHUTOFF.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.stop_server(instance.id)
return self._wait_for_server(instance, status='SHUTOFF', wait=60)
def _shelve_server(self, instance_name_or_id):
"""
Shelves a server.
All associated data and resources are kept but anything still in memory is not retained. Policy defaults enable only users with administrative role or the owner of the server to perform this operation. Cloud provides could change this permission though.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.shelve_server(instance.id)
return self._wait_for_server(instance, status='SHELVED_OFFLOADED', wait=60)
def _unshelve_server(self, instance_name_or_id):
"""
Unselves or restores a shelved server.
Policy defaults enable only users with administrative role or the owner of the server to perform this operation. Cloud provides could change this permission though.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.unshelve_server(instance.id)
return self._wait_for_server(instance, status='ACTIVE', wait=60)
def _lock_server(self, instance_name_or_id):
"""
Locks a server.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.lock_server(instance.id)
return self._get_server(instance.id)
def _unlock_server(self, instance_name_or_id):
"""
Unlocks a locked server.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.unlock_server(instance.id)
return self._get_server(instance.id)
def _reboot_server(self, instance_name_or_id, reboot_type):
"""
Reboot a server
Parameters
instance_name_or_id The name or ID of a server.
reboot_type (str) The type of reboot to perform. HARD and SOFT are the current options.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.reboot_server(instance.id, reboot_type)
if reboot_type is 'soft':
instance = self._wait_for_server(instance,status='REBOOT', wait=60)
elif reboot_type is 'hard':
instance = self._wait_for_server(instance,status='HARD_REBOOT', wait=60)
return self._wait_for_server(instance, wait=60)
def _pause_server(self, instance_name_or_id):
"""
Pauses a server and changes its status to PAUSED.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.pause_server(instance.id)
return self._wait_for_server(instance,status='PAUSED', wait=60)
def _unpause_server(self, instance_name_or_id):
"""
Unpauses a paused server and changes its status to ACTIVE.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.unpause_server(instance.id)
return self._wait_for_server(instance, wait=60)
def _suspend_server(self, instance_name_or_id):
"""
Suspends a server and changes its status to SUSPENDED.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.suspend_server(instance.id)
return self._wait_for_server(instance,status='SUSPENDED', wait=60)
def _resume_server(self, instance_name_or_id):
"""
Resumes a suspended server and changes its status to ACTIVE.
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.resume_server(instance.id)
return self._wait_for_server(instance, wait=60)
def _resize_server(self, instance_name_or_id, flavor_name_or_id):
"""
Resize a server
Parameters
instance_name_or_id The name or ID of a server.
flavor_name_or_id The name or ID of a flavor.
Returns
Server instance.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
flavor = self._find_flavor(flavor_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.resize_server(instance.id, flavor.id)
instance = self._wait_for_server(instance, status='RESIZE', wait=120)
return self._wait_for_server(instance, status='VERIFY_RESIZE', wait=300)
def _confirm_server_resize(self, instance_name_or_id):
"""
Confirm a server resize
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.confirm_server_resize(instance.id)
return self._wait_for_server(instance, wait=60)
def _revert_server_resize(self, instance_name_or_id):
"""
Revert a server resize
Parameters
instance_name_or_id The name or ID of a server.
Returns
Server instance.
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.revert_server_resize(instance.id)
return self._wait_for_server(instance, wait=60)
def _set_server_metadata(self, instance_name_or_id, **metadata):
"""
Update metadata for a server
Parameters
instance_name_or_id The name or ID of a server.
metadata (kwargs) Key/value pairs to be updated in the servers metadata. No other metadata is modified by this call. All keys and values are stored as Unicode.
Returns
A Server with only the servers metadata. All keys and values are Unicode text.
Return type
Server
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.set_server_metadata(instance.id, **metadata)
def _delete_server_metadata(self, instance_name_or_id, keys):
"""
Delete metadata for a server
Note: This method will do a HTTP DELETE request for every key in keys.
Parameters
instance_name_or_id The name or ID of a server.
keys The keys to delete
Return type
None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.delete_server_metadata(instance.id, keys)
def _get_server_metadata(self, instance_name_or_id):
"""
Return a dictionary of metadata for a server
Parameters
instance_name_or_id The name or ID of a server.
Returns
A Server with only the servers metadata. All keys and values are Unicode text.
Return type
Server
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.get_server_metadata(instance.id)
def _add_floating_ip_to_server(self, instance_name_or_id, fip, fixed_address=None):
"""
Adds a floating IP address to a server instance.
Parameters
instance_name_or_id The name or ID of a server.
fip The floating IP address to be added to the server.
fixed_address The fixed IP address to be associated with the floating IP address. Used when the server is connected to multiple networks.
Returns
None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.add_floating_ip_to_server(instance.id, fip.floating_ip_address)
def _remove_floating_ip_from_server(self, instance_name_or_id, fip):
"""
Removes a floating IP address from a server instance.
Parameters
instance_name_or_id The name or ID of a server.
address The floating IP address to be disassociated from the server.
Returns
None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.remove_floating_ip_from_server(instance.id, fip.floating_ip_address)
def _list_server_ips(self, instance_name_or_id, network_label=None):
"""
Return a generator of server IPs
Parameters
instance_name_or_id The name or ID of a server.
network_label The name of a particular network to list IP addresses from.
Returns
A generator of ServerIP objects
Return type
ServerIP
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.server_ips(instance.id, network_label=network_label)
def _fetch_server_security_groups(self, instance_name_or_id):
"""
Fetch security groups with details for a server.
Parameters
instance_name_or_id The name or ID of a server.
Returns
updated Server instance
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
return self.os_sdk_conn.compute.fetch_server_security_groups(instance.id)
def _add_security_group_to_server(self, instance_name_or_id, sg_name_or_id):
"""
Add a security group to a server
Parameters
instance_name_or_id The name or ID of a server.
sg_name_or_id The name or ID of a security group.
Returns
None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
sg = self._find_security_group(sg_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.add_security_group_to_server(instance.id, sg)
def _remove_security_group_from_server(self, instance_name_or_id, sg_name_or_id):
"""
Remove a security group from a server
Parameters
instance_name_or_id The name or ID of a server.
sg_name_or_id The name or ID of a security group.
Returns
None
"""
instance = self._find_server(instance_name_or_id, ignore_missing=False)
sg = self._find_security_group(sg_name_or_id, ignore_missing=False)
self.os_sdk_conn.compute.remove_security_group_from_server(instance.id, sg)

File diff suppressed because it is too large Load Diff