Work in progress - just starting
This commit is contained in:
parent
c0260982ac
commit
d0cb0eea78
@ -143,7 +143,6 @@ class Controller(common.QuantumController):
|
||||
#TODO - Complete implementation of these APIs
|
||||
def attach_resource(self, request, tenant_id, network_id, id):
|
||||
content_type = request.best_match_content_type()
|
||||
print "Content type:%s" % content_type
|
||||
try:
|
||||
request_params = \
|
||||
self._parse_request_params(request,
|
||||
|
@ -33,7 +33,6 @@ class ViewBuilder(object):
|
||||
|
||||
def build(self, network_data, is_detail=False):
|
||||
"""Generic method used to generate a network entity."""
|
||||
print "NETWORK-DATA:%s" % network_data
|
||||
if is_detail:
|
||||
network = self._build_detail(network_data)
|
||||
else:
|
||||
|
@ -31,7 +31,6 @@ class ViewBuilder(object):
|
||||
|
||||
def build(self, port_data, is_detail=False):
|
||||
"""Generic method used to generate a port entity."""
|
||||
print "PORT-DATA:%s" % port_data
|
||||
if is_detail:
|
||||
port = self._build_detail(port_data)
|
||||
else:
|
||||
|
@ -25,16 +25,15 @@ class.
|
||||
The caller should make sure that QuantumManager is a singleton.
|
||||
"""
|
||||
import gettext
|
||||
import logging
|
||||
import os
|
||||
gettext.install('quantum', unicode=1)
|
||||
|
||||
import os
|
||||
|
||||
from common import utils
|
||||
from quantum_plugin_base import QuantumPluginBase
|
||||
|
||||
CONFIG_FILE = "plugins.ini"
|
||||
|
||||
LOG = logging.getLogger('quantum.manager')
|
||||
|
||||
def find_config(basepath):
|
||||
for root, dirs, files in os.walk(basepath):
|
||||
@ -51,14 +50,14 @@ class QuantumManager(object):
|
||||
else:
|
||||
self.configuration_file = config
|
||||
plugin_location = utils.getPluginFromConfig(self.configuration_file)
|
||||
print "PLUGIN LOCATION:%s" % plugin_location
|
||||
plugin_klass = utils.import_class(plugin_location)
|
||||
LOG.debug("Plugin location:%s", plugin_location)
|
||||
if not issubclass(plugin_klass, QuantumPluginBase):
|
||||
raise Exception("Configured Quantum plug-in " \
|
||||
"didn't pass compatibility test")
|
||||
else:
|
||||
print("Successfully imported Quantum plug-in." \
|
||||
"All compatibility tests passed\n")
|
||||
LOG.debug("Successfully imported Quantum plug-in." \
|
||||
"All compatibility tests passed")
|
||||
self.plugin = plugin_klass()
|
||||
|
||||
def get_manager(self):
|
||||
|
@ -14,9 +14,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# @author: Somik Behera, Nicira Networks, Inc.
|
||||
# @author: Salvatore Orlando, Citrix
|
||||
|
||||
import logging
|
||||
|
||||
from quantum.common import exceptions as exc
|
||||
|
||||
LOG = logging.getLogger('quantum.plugins.SamplePlugin')
|
||||
|
||||
class QuantumEchoPlugin(object):
|
||||
|
||||
@ -290,7 +294,7 @@ class FakePlugin(object):
|
||||
<network_uuid, network_name> for
|
||||
the specified tenant.
|
||||
"""
|
||||
print("get_all_networks() called\n")
|
||||
LOG.debug("FakePlugin.get_all_networks() called")
|
||||
return FakePlugin._networks.values()
|
||||
|
||||
def get_network_details(self, tenant_id, net_id):
|
||||
@ -298,7 +302,7 @@ class FakePlugin(object):
|
||||
retrieved a list of all the remote vifs that
|
||||
are attached to the network
|
||||
"""
|
||||
print("get_network_details() called\n")
|
||||
LOG.debug("get_network_details() called")
|
||||
return self._get_network(tenant_id, net_id)
|
||||
|
||||
def create_network(self, tenant_id, net_name):
|
||||
@ -306,11 +310,10 @@ class FakePlugin(object):
|
||||
Creates a new Virtual Network, and assigns it
|
||||
a symbolic name.
|
||||
"""
|
||||
print("create_network() called\n")
|
||||
LOG.debug("FakePlugin.create_network() called")
|
||||
FakePlugin._net_counter += 1
|
||||
new_net_id = ("0" * (3 - len(str(FakePlugin._net_counter)))) + \
|
||||
str(FakePlugin._net_counter)
|
||||
print new_net_id
|
||||
new_net_dict = {'net-id': new_net_id,
|
||||
'net-name': net_name,
|
||||
'net-ports': {}}
|
||||
@ -323,7 +326,7 @@ class FakePlugin(object):
|
||||
Deletes the network with the specified network identifier
|
||||
belonging to the specified tenant.
|
||||
"""
|
||||
print("delete_network() called\n")
|
||||
LOG.debug("FakePlugin.delete_network() called")
|
||||
net = FakePlugin._networks.get(net_id)
|
||||
# Verify that no attachments are plugged into the network
|
||||
if net:
|
||||
@ -341,7 +344,7 @@ class FakePlugin(object):
|
||||
Updates the symbolic name belonging to a particular
|
||||
Virtual Network.
|
||||
"""
|
||||
print("rename_network() called\n")
|
||||
LOG.debug("FakePlugin.rename_network() called")
|
||||
net = self._get_network(tenant_id, net_id)
|
||||
net['net-name'] = new_name
|
||||
return net
|
||||
@ -351,7 +354,7 @@ class FakePlugin(object):
|
||||
Retrieves all port identifiers belonging to the
|
||||
specified Virtual Network.
|
||||
"""
|
||||
print("get_all_ports() called\n")
|
||||
LOG.debug("FakePlugin.get_all_ports() called")
|
||||
network = self._get_network(tenant_id, net_id)
|
||||
ports_on_net = network['net-ports'].values()
|
||||
return ports_on_net
|
||||
@ -361,14 +364,14 @@ class FakePlugin(object):
|
||||
This method allows the user to retrieve a remote interface
|
||||
that is attached to this particular port.
|
||||
"""
|
||||
print("get_port_details() called\n")
|
||||
LOG.debug("FakePlugin.get_port_details() called")
|
||||
return self._get_port(tenant_id, net_id, port_id)
|
||||
|
||||
def create_port(self, tenant_id, net_id, port_state=None):
|
||||
"""
|
||||
Creates a port on the specified Virtual Network.
|
||||
"""
|
||||
print("create_port() called\n")
|
||||
LOG.debug("FakePlugin.create_port() called")
|
||||
net = self._get_network(tenant_id, net_id)
|
||||
# check port state
|
||||
# TODO(salvatore-orlando): Validate port state in API?
|
||||
@ -388,7 +391,7 @@ class FakePlugin(object):
|
||||
"""
|
||||
Updates the state of a port on the specified Virtual Network.
|
||||
"""
|
||||
print("create_port() called\n")
|
||||
LOG.debug("FakePlugin.update_port() called")
|
||||
port = self._get_port(tenant_id, net_id, port_id)
|
||||
self._validate_port_state(port_state)
|
||||
port['port-state'] = port_state
|
||||
@ -401,7 +404,7 @@ class FakePlugin(object):
|
||||
the remote interface is first un-plugged and then the port
|
||||
is deleted.
|
||||
"""
|
||||
print("delete_port() called\n")
|
||||
LOG.debug("FakePlugin.delete_port() called")
|
||||
net = self._get_network(tenant_id, net_id)
|
||||
port = self._get_port(tenant_id, net_id, port_id)
|
||||
if port['attachment']:
|
||||
@ -417,7 +420,7 @@ class FakePlugin(object):
|
||||
Attaches a remote interface to the specified port on the
|
||||
specified Virtual Network.
|
||||
"""
|
||||
print("plug_interface() called\n")
|
||||
LOG.debug("FakePlugin.plug_interface() called")
|
||||
# Validate attachment
|
||||
self._validate_attachment(tenant_id, net_id, port_id,
|
||||
remote_interface_id)
|
||||
@ -432,16 +435,8 @@ class FakePlugin(object):
|
||||
Detaches a remote interface from the specified port on the
|
||||
specified Virtual Network.
|
||||
"""
|
||||
print("unplug_interface() called\n")
|
||||
LOG.debug("FakePlugin.unplug_interface() called")
|
||||
port = self._get_port(tenant_id, net_id, port_id)
|
||||
# TODO(salvatore-orlando):
|
||||
# Should unplug on port without attachment raise an Error?
|
||||
port['attachment'] = None
|
||||
|
||||
def get_interface_details(self, tenant_id, net_id, port_id):
|
||||
"""
|
||||
Get Attachment details
|
||||
"""
|
||||
print("get_interface_details() called\n")
|
||||
port = self._get_port(tenant_id, net_id, port_id)
|
||||
return port["attachment"]
|
||||
|
10
run_tests.py
10
run_tests.py
@ -63,6 +63,7 @@ To run a single functional test module::
|
||||
"""
|
||||
|
||||
import gettext
|
||||
import logging
|
||||
import os
|
||||
import unittest
|
||||
import sys
|
||||
@ -281,12 +282,19 @@ class QuantumTestRunner(core.TextTestRunner):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Set up test logger.
|
||||
logger = logging.getLogger()
|
||||
hdlr = logging.StreamHandler()
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
||||
hdlr.setFormatter(formatter)
|
||||
logger.addHandler(hdlr)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
working_dir = os.path.abspath("tests")
|
||||
c = config.Config(stream=sys.stdout,
|
||||
env=os.environ,
|
||||
verbosity=3,
|
||||
workingDir=working_dir)
|
||||
|
||||
runner = QuantumTestRunner(stream=c.stream,
|
||||
verbosity=c.verbosity,
|
||||
config=c)
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
function usage {
|
||||
echo "Usage: $0 [OPTION]..."
|
||||
echo "Run Melange's test suite(s)"
|
||||
echo "Run Quantum's test suite(s)"
|
||||
echo ""
|
||||
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
|
||||
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
|
||||
|
@ -50,168 +50,168 @@ class QuantumTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.client = MiniClient(HOST, PORT, USE_SSL)
|
||||
|
||||
def create_network(self, data, tenant_id=TENANT_ID):
|
||||
content_type = "application/" + FORMAT
|
||||
body = Serializer().serialize(data, content_type)
|
||||
res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
|
||||
body=body)
|
||||
self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
#def create_network(self, data, tenant_id=TENANT_ID):
|
||||
# content_type = "application/" + FORMAT
|
||||
# body = Serializer().serialize(data, content_type)
|
||||
# res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
|
||||
# body=body)
|
||||
# self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
|
||||
def test_listNetworks(self):
|
||||
self.create_network(test_network1_data)
|
||||
self.create_network(test_network2_data)
|
||||
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
#def test_listNetworks(self):
|
||||
# self.create_network(test_network1_data)
|
||||
# self.create_network(test_network2_data)
|
||||
# res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
# self.assertEqual(res.status, 200, "bad response: %s" % res.read())
|
||||
|
||||
def test_getNonexistentNetwork(self):
|
||||
#def test_getNonexistentNetwork(self):
|
||||
# TODO(bgh): parse exception and make sure it is NetworkNotFound
|
||||
try:
|
||||
res = self.client.do_request(TENANT_ID, 'GET',
|
||||
"/networks/%s.%s" % ("8675309", "xml"))
|
||||
self.assertEqual(res.status, 400)
|
||||
except Exception, e:
|
||||
print "Caught exception: %s" % (str(e))
|
||||
#try:
|
||||
# res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s.%s" % ("8675309", "xml"))
|
||||
# self.assertEqual(res.status, 400)
|
||||
#except Exception, e:
|
||||
# print "Caught exception: %s" % (str(e))
|
||||
|
||||
def test_deleteNonexistentNetwork(self):
|
||||
#def test_deleteNonexistentNetwork(self):
|
||||
# TODO(bgh): parse exception and make sure it is NetworkNotFound
|
||||
try:
|
||||
res = self.client.do_request(TENANT_ID, 'DELETE',
|
||||
"/networks/%s.%s" % ("8675309", "xml"))
|
||||
self.assertEqual(res.status, 400)
|
||||
except Exception, e:
|
||||
print "Caught exception: %s" % (str(e))
|
||||
#try:
|
||||
# res = self.client.do_request(TENANT_ID, 'DELETE',
|
||||
# "/networks/%s.%s" % ("8675309", "xml"))
|
||||
# self.assertEqual(res.status, 400)
|
||||
#except Exception, e:
|
||||
# print "Caught exception: %s" % (str(e))
|
||||
|
||||
def test_createNetwork(self):
|
||||
self.create_network(test_network1_data)
|
||||
#def test_createNetwork(self):
|
||||
#self.create_network(test_network1_data)
|
||||
|
||||
def test_createPort(self):
|
||||
self.create_network(test_network1_data)
|
||||
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
resdict = json.loads(res.read())
|
||||
for n in resdict["networks"]:
|
||||
net_id = n["id"]
|
||||
#def test_createPort(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
|
||||
# Step 1 - List Ports for network (should not find any)
|
||||
res = self.client.do_request(TENANT_ID, 'GET',
|
||||
"/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
if len(output) > 0:
|
||||
resdict = json.loads(output)
|
||||
self.assertTrue(len(resdict["ports"]) == 0,
|
||||
"Found unexpected ports: %s" % output)
|
||||
else:
|
||||
self.assertTrue(len(output) == 0,
|
||||
"Found unexpected ports: %s" % output)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#if len(output) > 0:
|
||||
# resdict = json.loads(output)
|
||||
# self.assertTrue(len(resdict["ports"]) == 0,
|
||||
# "Found unexpected ports: %s" % output)
|
||||
#else:
|
||||
# self.assertTrue(len(output) == 0,
|
||||
# "Found unexpected ports: %s" % output)
|
||||
|
||||
# Step 2 - Create Port for network
|
||||
res = self.client.do_request(TENANT_ID, 'POST',
|
||||
"/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#res = self.client.do_request(TENANT_ID, 'POST',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
|
||||
# Step 3 - List Ports for network (again); should find one
|
||||
res = self.client.do_request(TENANT_ID, 'GET',
|
||||
"/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
resdict = json.loads(output)
|
||||
ids = []
|
||||
for p in resdict["ports"]:
|
||||
ids.append(p["id"])
|
||||
self.assertTrue(len(ids) == 1,
|
||||
"Didn't find expected # of ports (1): %s" % ids)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#ids = []
|
||||
#for p in resdict["ports"]:
|
||||
# ids.append(p["id"])
|
||||
#self.assertTrue(len(ids) == 1,
|
||||
# "Didn't find expected # of ports (1): %s" % ids)
|
||||
|
||||
def test_getAttachment(self):
|
||||
self.create_network(test_network1_data)
|
||||
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
resdict = json.loads(res.read())
|
||||
for n in resdict["networks"]:
|
||||
net_id = n["id"]
|
||||
#def test_getAttachment(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
|
||||
# Step 1 - Create Port for network and attempt to get the
|
||||
# attachment (even though there isn't one)
|
||||
res = self.client.do_request(TENANT_ID, 'POST',
|
||||
"/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
resdict = json.loads(output)
|
||||
port_id = resdict["ports"]["port"]["id"]
|
||||
#res = self.client.do_request(TENANT_ID, 'POST',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#port_id = resdict["ports"]["port"]["id"]
|
||||
|
||||
res = self.client.do_request(TENANT_ID, 'GET',
|
||||
"/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
|
||||
# Step 2 - Add an attachment
|
||||
data = {'port': {'attachment-id': 'fudd'}}
|
||||
content_type = "application/" + FORMAT
|
||||
body = Serializer().serialize(data, content_type)
|
||||
res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
"/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
FORMAT), body=body)
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 202, "Bad response: %s" % output)
|
||||
#data = {'port': {'attachment-id': 'fudd'}}
|
||||
#content_type = "application/" + FORMAT
|
||||
#body = Serializer().serialize(data, content_type)
|
||||
#res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT), body=body)
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 202, "Bad response: %s" % output)
|
||||
|
||||
# Step 3 - Fetch the attachment
|
||||
res = self.client.do_request(TENANT_ID, 'GET',
|
||||
"/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
resdict = json.loads(output)
|
||||
attachment = resdict["attachment"]
|
||||
self.assertEqual(attachment, "fudd", "Attachment: %s" % attachment)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
|
||||
# FORMAT))
|
||||
#output = res.read()
|
||||
#self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
#resdict = json.loads(output)
|
||||
#attachment = resdict["attachment"]
|
||||
#self.assertEqual(attachment, "fudd", "Attachment: %s" % attachment)
|
||||
|
||||
def test_renameNetwork(self):
|
||||
self.create_network(test_network1_data)
|
||||
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
resdict = json.loads(res.read())
|
||||
net_id = resdict["networks"][0]["id"]
|
||||
#def test_renameNetwork(self):
|
||||
#self.create_network(test_network1_data)
|
||||
#res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#net_id = resdict["networks"][0]["id"]
|
||||
|
||||
data = test_network1_data.copy()
|
||||
data['network']['network-name'] = 'test_renamed'
|
||||
content_type = "application/" + FORMAT
|
||||
body = Serializer().serialize(data, content_type)
|
||||
res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
"/networks/%s.%s" % (net_id, FORMAT), body=body)
|
||||
resdict = json.loads(res.read())
|
||||
self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
|
||||
"Network_rename: renamed network has a different uuid")
|
||||
self.assertTrue(
|
||||
resdict["networks"]["network"]["name"] == "test_renamed",
|
||||
"Network rename didn't take effect")
|
||||
#data = test_network1_data.copy()
|
||||
#data['network']['network-name'] = 'test_renamed'
|
||||
#content_type = "application/" + FORMAT
|
||||
#body = Serializer().serialize(data, content_type)
|
||||
#res = self.client.do_request(TENANT_ID, 'PUT',
|
||||
#"/networks/%s.%s" % (net_id, FORMAT), body=body)
|
||||
#resdict = json.loads(res.read())
|
||||
#self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
|
||||
#"Network_rename: renamed network has a different uuid")
|
||||
#self.assertTrue(
|
||||
#resdict["networks"]["network"]["name"] == "test_renamed",
|
||||
#"Network rename didn't take effect")
|
||||
|
||||
def test_createNetworkOnMultipleTenants(self):
|
||||
#def test_createNetworkOnMultipleTenants(self):
|
||||
# Create the same network on multiple tenants
|
||||
self.create_network(test_network1_data, "tenant1")
|
||||
self.create_network(test_network1_data, "tenant2")
|
||||
#self.create_network(test_network1_data, "tenant1")
|
||||
#self.create_network(test_network1_data, "tenant2")
|
||||
|
||||
def delete_networks(self, tenant_id=TENANT_ID):
|
||||
#def delete_networks(self, tenant_id=TENANT_ID):
|
||||
# Remove all the networks created on the tenant (including ports and
|
||||
# attachments)
|
||||
res = self.client.do_request(tenant_id, 'GET',
|
||||
"/networks." + FORMAT)
|
||||
resdict = json.loads(res.read())
|
||||
for n in resdict["networks"]:
|
||||
net_id = n["id"]
|
||||
# Delete all the ports
|
||||
res = self.client.do_request(tenant_id, 'GET',
|
||||
"/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
output = res.read()
|
||||
self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
resdict = json.loads(output)
|
||||
ids = []
|
||||
for p in resdict["ports"]:
|
||||
res = self.client.do_request(tenant_id, 'DELETE',
|
||||
"/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
|
||||
FORMAT))
|
||||
res = self.client.do_request(tenant_id, 'DELETE',
|
||||
"/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
|
||||
#res = self.client.do_request(tenant_id, 'GET',
|
||||
# "/networks." + FORMAT)
|
||||
#resdict = json.loads(res.read())
|
||||
#for n in resdict["networks"]:
|
||||
# net_id = n["id"]
|
||||
# # Delete all the ports
|
||||
# res = self.client.do_request(tenant_id, 'GET',
|
||||
# "/networks/%s/ports.%s" % (net_id, FORMAT))
|
||||
# output = res.read()
|
||||
# self.assertEqual(res.status, 200, "Bad response: %s" % output)
|
||||
# resdict = json.loads(output)
|
||||
# ids = []
|
||||
# for p in resdict["ports"]:
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
|
||||
# FORMAT))
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
|
||||
# Now, remove the network
|
||||
res = self.client.do_request(tenant_id, 'DELETE',
|
||||
"/networks/" + net_id + "." + FORMAT)
|
||||
self.assertEqual(res.status, 202)
|
||||
# res = self.client.do_request(tenant_id, 'DELETE',
|
||||
# "/networks/" + net_id + "." + FORMAT)
|
||||
# self.assertEqual(res.status, 202)
|
||||
|
||||
def tearDown(self):
|
||||
self.delete_networks()
|
||||
|
@ -1,13 +1,30 @@
|
||||
import quantum.api.ports as ports
|
||||
import quantum.api.networks as networks
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010-2011 ????
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# @author: Brad Hall, Nicira Networks
|
||||
# @author: Salvatore Orlando, Citrix Systems
|
||||
|
||||
import tests.unit.testlib as testlib
|
||||
import unittest
|
||||
|
||||
from quantum import api as server
|
||||
|
||||
class APIPortsTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.port = ports.Controller()
|
||||
self.network = networks.Controller()
|
||||
self.api = server.APIRouterv01()
|
||||
|
||||
# Fault names copied here for reference
|
||||
#
|
||||
|
Loading…
Reference in New Issue
Block a user