Removing code related to functional tests
This commit is contained in:
parent
67ee9c1fd4
commit
71ceb8fb84
@ -1,294 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Somebody PLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Base test class for running non-stubbed tests (functional tests)
|
||||
|
||||
The FunctionalTest class contains helper methods for starting the API
|
||||
and Registry server, grabbing the logs of each, cleaning up pidfiles,
|
||||
and spinning down the servers.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
import urlparse
|
||||
|
||||
from tests.utils import execute, get_unused_port
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""
|
||||
Class used to easily manage starting and stopping
|
||||
a server during functional test runs.
|
||||
"""
|
||||
def __init__(self, test_dir, port):
|
||||
"""
|
||||
Creates a new Server object.
|
||||
|
||||
:param test_dir: The directory where all test stuff is kept. This is
|
||||
passed from the FunctionalTestCase.
|
||||
:param port: The port to start a server up on.
|
||||
"""
|
||||
self.verbose = True
|
||||
self.debug = True
|
||||
self.test_dir = test_dir
|
||||
self.bind_port = port
|
||||
self.conf_file = None
|
||||
self.conf_base = None
|
||||
|
||||
def start(self, **kwargs):
|
||||
"""
|
||||
Starts the server.
|
||||
|
||||
Any kwargs passed to this method will override the configuration
|
||||
value in the conf file used in starting the servers.
|
||||
"""
|
||||
if self.conf_file:
|
||||
raise RuntimeError("Server configuration file already exists!")
|
||||
if not self.conf_base:
|
||||
raise RuntimeError("Subclass did not populate config_base!")
|
||||
|
||||
conf_override = self.__dict__.copy()
|
||||
if kwargs:
|
||||
conf_override.update(**kwargs)
|
||||
|
||||
# Create temporary configuration file for Quantum Unit tests.
|
||||
|
||||
conf_file = tempfile.NamedTemporaryFile()
|
||||
conf_file.write(self.conf_base % conf_override)
|
||||
conf_file.flush()
|
||||
self.conf_file = conf_file
|
||||
self.conf_file_name = conf_file.name
|
||||
|
||||
cmd = ("./bin/quantum %(conf_file_name)s" % self.__dict__)
|
||||
return execute(cmd)
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Spin down the server.
|
||||
"""
|
||||
# The only way we can do that at the moment is by killing quantum
|
||||
# TODO - find quantum PID and do a sudo kill
|
||||
|
||||
|
||||
class ApiServer(Server):
|
||||
|
||||
"""
|
||||
Server object that starts/stops/manages the API server
|
||||
"""
|
||||
|
||||
def __init__(self, test_dir, port, registry_port):
|
||||
super(ApiServer, self).__init__(test_dir, port)
|
||||
self.server_name = 'api'
|
||||
self.default_store = 'file'
|
||||
self.image_dir = os.path.join(self.test_dir,
|
||||
"images")
|
||||
self.pid_file = os.path.join(self.test_dir,
|
||||
"api.pid")
|
||||
self.log_file = os.path.join(self.test_dir, "api.log")
|
||||
self.registry_port = registry_port
|
||||
self.conf_base = """[DEFAULT]
|
||||
verbose = %(verbose)s
|
||||
debug = %(debug)s
|
||||
filesystem_store_datadir=%(image_dir)s
|
||||
default_store = %(default_store)s
|
||||
bind_host = 0.0.0.0
|
||||
bind_port = %(bind_port)s
|
||||
registry_host = 0.0.0.0
|
||||
registry_port = %(registry_port)s
|
||||
log_file = %(log_file)s
|
||||
|
||||
[pipeline:glance-api]
|
||||
pipeline = versionnegotiation apiv1app
|
||||
|
||||
[pipeline:versions]
|
||||
pipeline = versionsapp
|
||||
|
||||
[app:versionsapp]
|
||||
paste.app_factory = glance.api.versions:app_factory
|
||||
|
||||
[app:apiv1app]
|
||||
paste.app_factory = glance.api.v1:app_factory
|
||||
|
||||
[filter:versionnegotiation]
|
||||
paste.filter_factory = glance.api.middleware.version_negotiation:filter_factory
|
||||
"""
|
||||
|
||||
|
||||
class QuantumAPIServer(Server):
|
||||
|
||||
"""
|
||||
Server object that starts/stops/manages the Quantum API Server
|
||||
"""
|
||||
|
||||
def __init__(self, test_dir, port):
|
||||
super(QuantumAPIServer, self).__init__(test_dir, port)
|
||||
|
||||
self.db_file = os.path.join(self.test_dir, ':memory:')
|
||||
self.sql_connection = 'sqlite:///%s' % self.db_file
|
||||
self.conf_base = """[DEFAULT]
|
||||
# Show more verbose log output (sets INFO log level output)
|
||||
verbose = %(verbose)s
|
||||
# Show debugging output in logs (sets DEBUG log level output)
|
||||
debug = %(debug)s
|
||||
# Address to bind the API server
|
||||
bind_host = 0.0.0.0
|
||||
# Port for test API server
|
||||
bind_port = %(bind_port)s
|
||||
|
||||
[composite:quantum]
|
||||
use = egg:Paste#urlmap
|
||||
/: quantumversions
|
||||
/v0.1: quantumapi
|
||||
|
||||
[app:quantumversions]
|
||||
paste.app_factory = quantum.api.versions:Versions.factory
|
||||
|
||||
[app:quantumapi]
|
||||
paste.app_factory = quantum.api:APIRouterV01.factory
|
||||
"""
|
||||
|
||||
|
||||
class FunctionalTest(unittest.TestCase):
|
||||
|
||||
"""
|
||||
Base test class for any test that wants to test the actual
|
||||
servers and clients and not just the stubbed out interfaces
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.test_id = random.randint(0, 100000)
|
||||
self.test_port = get_unused_port()
|
||||
|
||||
self.quantum_server = QuantumAPIServer(self.test_dir,
|
||||
self.test_port)
|
||||
|
||||
def tearDown(self):
|
||||
self.cleanup()
|
||||
# We destroy the test data store between each test case,
|
||||
# and recreate it, which ensures that we have no side-effects
|
||||
# from the tests
|
||||
self._reset_database()
|
||||
|
||||
def _reset_database(self):
|
||||
conn_string = self.registry_server.sql_connection
|
||||
conn_pieces = urlparse.urlparse(conn_string)
|
||||
if conn_string.startswith('sqlite'):
|
||||
# We can just delete the SQLite database, which is
|
||||
# the easiest and cleanest solution
|
||||
db_path = conn_pieces.path.strip('/')
|
||||
if db_path and os.path.exists(db_path):
|
||||
os.unlink(db_path)
|
||||
# No need to recreate the SQLite DB. SQLite will
|
||||
# create it for us if it's not there...
|
||||
elif conn_string.startswith('mysql'):
|
||||
# We can execute the MySQL client to destroy and re-create
|
||||
# the MYSQL database, which is easier and less error-prone
|
||||
# than using SQLAlchemy to do this via MetaData...trust me.
|
||||
database = conn_pieces.path.strip('/')
|
||||
loc_pieces = conn_pieces.netloc.split('@')
|
||||
host = loc_pieces[1]
|
||||
auth_pieces = loc_pieces[0].split(':')
|
||||
user = auth_pieces[0]
|
||||
password = ""
|
||||
if len(auth_pieces) > 1:
|
||||
if auth_pieces[1].strip():
|
||||
password = "-p%s" % auth_pieces[1]
|
||||
sql = ("drop database if exists %(database)s; "
|
||||
"create database %(database)s;") % locals()
|
||||
cmd = ("mysql -u%(user)s %(password)s -h%(host)s "
|
||||
"-e\"%(sql)s\"") % locals()
|
||||
exitcode, out, err = execute(cmd)
|
||||
self.assertEqual(0, exitcode)
|
||||
|
||||
def start_servers(self, **kwargs):
|
||||
"""
|
||||
Starts the Quantum API server on an unused port.
|
||||
|
||||
Any kwargs passed to this method will override the configuration
|
||||
value in the conf file used in starting the server.
|
||||
"""
|
||||
|
||||
exitcode, out, err = self.quantum_server.start(**kwargs)
|
||||
|
||||
self.assertEqual(0, exitcode,
|
||||
"Failed to spin up the Quantum server. "
|
||||
"Got: %s" % err)
|
||||
#self.assertTrue("Starting quantum with" in out)
|
||||
#TODO: replace with appropriate assert
|
||||
|
||||
self.wait_for_servers()
|
||||
|
||||
def ping_server(self, port):
|
||||
"""
|
||||
Simple ping on the port. If responsive, return True, else
|
||||
return False.
|
||||
|
||||
:note We use raw sockets, not ping here, since ping uses ICMP and
|
||||
has no concept of ports...
|
||||
"""
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.connect(("127.0.0.1", port))
|
||||
s.close()
|
||||
return True
|
||||
except socket.error, e:
|
||||
return False
|
||||
|
||||
def wait_for_servers(self, timeout=3):
|
||||
"""
|
||||
Tight loop, waiting for both API and registry server to be
|
||||
available on the ports. Returns when both are pingable. There
|
||||
is a timeout on waiting for the servers to come up.
|
||||
|
||||
:param timeout: Optional, defaults to 3 seconds
|
||||
"""
|
||||
now = datetime.datetime.now()
|
||||
timeout_time = now + datetime.timedelta(seconds=timeout)
|
||||
while (timeout_time > now):
|
||||
if self.ping_server(self.api_port) and\
|
||||
self.ping_server(self.registry_port):
|
||||
return
|
||||
now = datetime.datetime.now()
|
||||
time.sleep(0.05)
|
||||
self.assertFalse(True, "Failed to start servers.")
|
||||
|
||||
def stop_servers(self):
|
||||
"""
|
||||
Called to stop the started servers in a normal fashion. Note
|
||||
that cleanup() will stop the servers using a fairly draconian
|
||||
method of sending a SIGTERM signal to the servers. Here, we use
|
||||
the glance-control stop method to gracefully shut the server down.
|
||||
This method also asserts that the shutdown was clean, and so it
|
||||
is meant to be called during a normal test case sequence.
|
||||
"""
|
||||
|
||||
exitcode, out, err = self.quantum_server.stop()
|
||||
self.assertEqual(0, exitcode,
|
||||
"Failed to spin down the Quantum server. "
|
||||
"Got: %s" % err)
|
@ -1,99 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Citrix Systems
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import httplib
|
||||
import socket
|
||||
import urllib
|
||||
|
||||
|
||||
class MiniClient(object):
|
||||
|
||||
"""A base client class - derived from Glance.BaseClient"""
|
||||
|
||||
action_prefix = '/v0.1/tenants/{tenant_id}'
|
||||
|
||||
def __init__(self, host, port, use_ssl):
|
||||
"""
|
||||
Creates a new client to some service.
|
||||
|
||||
:param host: The host where service resides
|
||||
:param port: The port where service resides
|
||||
:param use_ssl: Should we use HTTPS?
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.use_ssl = use_ssl
|
||||
self.connection = None
|
||||
|
||||
def get_connection_type(self):
|
||||
"""
|
||||
Returns the proper connection type
|
||||
"""
|
||||
if self.use_ssl:
|
||||
return httplib.HTTPSConnection
|
||||
else:
|
||||
return httplib.HTTPConnection
|
||||
|
||||
def do_request(self, tenant, method, action, body=None,
|
||||
headers=None, params=None):
|
||||
"""
|
||||
Connects to the server and issues a request.
|
||||
Returns the result data, or raises an appropriate exception if
|
||||
HTTP status code is not 2xx
|
||||
|
||||
:param method: HTTP method ("GET", "POST", "PUT", etc...)
|
||||
:param body: string of data to send, or None (default)
|
||||
:param headers: mapping of key/value pairs to add as headers
|
||||
:param params: dictionary of key/value pairs to add to append
|
||||
to action
|
||||
|
||||
"""
|
||||
action = MiniClient.action_prefix + action
|
||||
action = action.replace('{tenant_id}', tenant)
|
||||
if type(params) is dict:
|
||||
action += '?' + urllib.urlencode(params)
|
||||
|
||||
try:
|
||||
connection_type = self.get_connection_type()
|
||||
headers = headers or {}
|
||||
|
||||
# Open connection and send request
|
||||
c = connection_type(self.host, self.port)
|
||||
c.request(method, action, body, headers)
|
||||
res = c.getresponse()
|
||||
status_code = self.get_status_code(res)
|
||||
if status_code in (httplib.OK,
|
||||
httplib.CREATED,
|
||||
httplib.ACCEPTED,
|
||||
httplib.NO_CONTENT):
|
||||
return res
|
||||
else:
|
||||
raise Exception("Server returned error: %s" % res.read())
|
||||
|
||||
except (socket.error, IOError), e:
|
||||
raise Exception("Unable to connect to "
|
||||
"server. Got error: %s" % e)
|
||||
|
||||
def get_status_code(self, response):
|
||||
"""
|
||||
Returns the integer status code from the response, which
|
||||
can be either a Webob.Response (used in testing) or httplib.Response
|
||||
"""
|
||||
if hasattr(response, 'status_int'):
|
||||
return response.status_int
|
||||
else:
|
||||
return response.status
|
@ -1,221 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Citrix Systems
|
||||
# Copyright 2011 Nicira Networks
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gettext
|
||||
import unittest
|
||||
|
||||
gettext.install('quantum', unicode=1)
|
||||
|
||||
from miniclient import MiniClient
|
||||
from quantum.common.wsgi import Serializer
|
||||
|
||||
HOST = '127.0.0.1'
|
||||
PORT = 9696
|
||||
USE_SSL = False
|
||||
|
||||
TENANT_ID = 'totore'
|
||||
FORMAT = "json"
|
||||
|
||||
test_network1_data = \
|
||||
{'network': {'network-name': 'test1'}}
|
||||
test_network2_data = \
|
||||
{'network': {'network-name': 'test2'}}
|
||||
|
||||
|
||||
def print_response(res):
|
||||
content = res.read()
|
||||
print "Status: %s" % res.status
|
||||
print "Content: %s" % content
|
||||
return content
|
||||
|
||||
|
||||
class QuantumTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.client = MiniClient(HOST, PORT, USE_SSL)
|
||||
|
||||
#def create_network(self, data, tenant_id=TENANT_ID):
|
||||
# content_type = "application/%s" % FORMAT
|
||||
# body = Serializer().serialize(data, content_type)
|
||||
# res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
|
||||
# body=body)
|
||||
# self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
|
||||
#def test_listNetworks(self):
|
||||
# self.create_network(test_network1_data)
|
||||
# self.create_network(test_network2_data)
|
||||
# res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
# self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
|
||||
#def test_getNonexistentNetwork(self):
|
||||
# TODO(bgh): parse exception and make sure it is NetworkNotFound
|
||||
#try:
|
||||
# res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s.%s" % ("8675309", "xml"))
|
||||
# self.assertEqual(res.status, 400)
|
||||
#except Exception, e:
|
||||
# print "Caught exception: %s" % (str(e))
|
||||
|
||||
#def test_deleteNonexistentNetwork(self):
|
||||
# TODO(bgh): parse exception and make sure it is NetworkNotFound
|
||||
#try:
|
||||
# res = self.client.do_request(TENANT_ID, 'DELETE',
|
||||
# "/networks/%s.%s" % ("8675309", "xml"))
|
||||
# self.assertEqual(res.status, 400)
|
||||
#except Exception, e:
|
||||
# print "Caught exception: %s" % (str(e))
|
||||
|
||||
#def test_createNetwork(self):
|
||||
#self.create_network(test_network1_data)
|
||||
|
||||
#def test_createPort(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
|
||||
# Step 1 - List Ports for network (should not find any)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#if len(output) > 0:
|
||||
# resdict = json.loads(output)
|
||||
# self.assertTrue(len(resdict["ports"]) == 0,
|
||||
# "Found unexpected ports: %s" % output)
|
||||
#else:
|
||||
# self.assertTrue(len(output) == 0,
|
||||
# "Found unexpected ports: %s" % output)
|
||||
|
||||
# Step 2 - Create Port for network
|
||||
#res = self.client.do_request(TENANT_ID, 'POST',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
|
||||
# Step 3 - List Ports for network (again); should find one
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#ids = []
|
||||
#for p in resdict["ports"]:
|
||||
# ids.append(p["id"])
|
||||
#self.assertTrue(len(ids) == 1,
|
||||
# "Didn't find expected # of ports (1): %s" % ids)
|
||||
|
||||
#def test_getAttachment(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
|
||||
# Step 1 - Create Port for network and attempt to get the
|
||||
# attachment (even though there isn't one)
|
||||
#res = self.client.do_request(TENANT_ID, 'POST',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#port_id = resdict["ports"]["port"]["id"]
|
||||
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
|
||||
# Step 2 - Add an attachment
|
||||
#data = {'port': {'attachment-id': 'fudd'}}
|
||||
#content_type = "application/" + FORMAT
|
||||
#body = Serializer().serialize(data, content_type)
|
||||
#res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT), body=body)
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 202, "Bad response: %s" % output)
|
||||
|
||||
# Step 3 - Fetch the attachment
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#attachment = resdict["attachment"]
|
||||
#self.assertEqual(attachment, "fudd", "Attachment: %s"
|
||||
#% attachment)
|
||||
|
||||
#def test_renameNetwork(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#net_id = resdict["networks"][0]["id"]
|
||||
|
||||
#data = test_network1_data.copy()
|
||||
#data['network']['network-name'] = 'test_renamed'
|
||||
#content_type = "application/" + FORMAT
|
||||
#body = Serializer().serialize(data, content_type)
|
||||
#res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
#"/networks/%s.%s" % (net_id, FORMAT), body=body)
|
||||
#resdict = json.loads(res.read())
|
||||
#self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
|
||||
#"Network_rename: renamed network has a different uuid")
|
||||
#self.assertTrue(
|
||||
#resdict["networks"]["network"]["name"] == "test_renamed",
|
||||
#"Network rename didn't take effect")
|
||||
|
||||
#def test_createNetworkOnMultipleTenants(self):
|
||||
# Create the same network on multiple tenants
|
||||
#self.create_network(test_network1_data, "tenant1")
|
||||
#self.create_network(test_network1_data, "tenant2")
|
||||
|
||||
#def delete_networks(self, tenant_id=TENANT_ID):
|
||||
# Remove all the networks created on the tenant (including ports and
|
||||
# attachments)
|
||||
#res = self.client.do_request(tenant_id, 'GET',
|
||||
# "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
# # Delete all the ports
|
||||
# res = self.client.do_request(tenant_id, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
# output = res.read()
|
||||
# self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
# resdict = json.loads(output)
|
||||
# ids = []
|
||||
# for p in resdict["ports"]:
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
|
||||
# FORMAT))
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
|
||||
# Now, remove the network
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/" + net_id + "." + FORMAT)
|
||||
# self.assertEqual(res.status, 202)
|
||||
|
||||
def tearDown(self):
|
||||
self.delete_networks()
|
||||
|
||||
# Standard boilerplate to call the main() function.
|
||||
if __name__ == '__main__':
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(QuantumTest)
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
Loading…
Reference in New Issue
Block a user