Remove unit tests that are no longer run.

A long time ago, quantum/tests/unit became the home for all unit
tests, but these ones in the Cisco plugin directory got left
behind. They have suffered bit-rot and need to be removed.

Also:
- Move the fake Nexus driver to new home.
- Filed new bugs to track the task of improving unit test coverage of
  Cisco plugin code.

Fixes: bug #1174311
Change-Id: I372e24aebbe1804e5b6ce62984bfd76b030a44b1
This commit is contained in:
HenryVIII 2013-06-13 20:03:20 -04:00
parent b1cbfb1a13
commit 81e199fc0b
17 changed files with 3 additions and 1606 deletions

View File

@ -11,7 +11,7 @@
#max_networks=65568
#model_class=quantum.plugins.cisco.models.virt_phy_sw_v2.VirtualPhysicalSwitchModelV2
#manager_class=quantum.plugins.cisco.segmentation.l2network_vlan_mgr_v2.L2NetworkVLANMgr
#nexus_driver=quantum.plugins.cisco.tests.unit.v2.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
#nexus_driver=quantum.plugins.cisco.test.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
#svi_round_robin=False
# IMPORTANT: Comment out the following two lines for production deployments

0
quantum/plugins/cisco/README Executable file → Normal file
View File

View File

@ -55,7 +55,7 @@ cisco_opts = [
'l2network_vlan_mgr_v2.L2NetworkVLANMgr',
help=_("Manager Class")),
cfg.StrOpt('nexus_driver',
default='quantum.plugins.cisco.tests.unit.v2.nexus.'
default='quantum.plugins.cisco.test.nexus.'
'fake_nexus_driver.CiscoNEXUSFakeDriver',
help=_("Nexus Driver Name")),
]

View File

@ -1,51 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unittest runner for quantum Cisco plugin
export PLUGIN_DIR=quantum/plugins/cisco
./run_tests.sh -N
"""
import os
import sys
from nose import config
sys.path.append(os.getcwd())
sys.path.append(os.path.dirname(__file__))
from quantum.common.test_lib import run_tests, test_config
def main():
test_config['plugin_name'] = "l2network_plugin.L2Network"
cwd = os.getcwd()
os.chdir(cwd)
working_dir = os.path.abspath("quantum/plugins/cisco")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=working_dir)
sys.exit(run_tests(c))
if __name__ == '__main__':
main()

View File

@ -1,22 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# See http://code.google.com/p/python-nose/issues/detail?id=373
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
setattr(__builtin__, '_', lambda x: x)

View File

@ -1,8 +0,0 @@
[pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app
[filter:extensions]
paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app]
paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory

View File

@ -1,20 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
# Address to bind the API server
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 9696
# Path to the extensions
api_extensions_path = ../../../../extensions
# Paste configuration file
api_paste_config = api-paste.ini.cisco.test
core_plugin = quantum.plugins.cisco.l2network_plugin.L2Network

View File

@ -1,673 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @authors: Shweta Padubidri, Cisco Systems, Inc.
# Peter Strunk , Cisco Systems, Inc.
# Shubhangi Satras , Cisco Systems, Inc.
import logging
import os.path
import routes
import webob
from webtest import TestApp
from quantum.api import extensions
from quantum.api.extensions import (
ExtensionMiddleware,
PluginAwareExtensionManager,
)
from quantum.common import config
from quantum.extensions import (
credential,
qos,
)
from quantum.manager import QuantumManager
from quantum.openstack.common import jsonutils
from quantum.plugins.cisco.db import api as db
from quantum.plugins.cisco import l2network_plugin
from quantum.plugins.cisco.l2network_plugin import L2Network
from quantum.tests import base
from quantum.tests.unit.extension_stubs import StubBaseAppController
from quantum import wsgi
LOG = logging.getLogger('quantum.plugins.cisco.tests.test_cisco_extensions')
EXTENSIONS_PATH = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
os.pardir, os.pardir, "extensions")
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
UNITDIR = os.path.join(ROOTDIR, 'unit')
def testsdir(*p):
return os.path.join(UNITDIR, *p)
config_file = 'quantum.conf.cisco.test'
args = ['--config-file', testsdir(config_file)]
config.parse(args=args)
class ExtensionsTestApp(wsgi.Router):
def __init__(self, options=None):
options = options or {}
mapper = routes.Mapper()
controller = StubBaseAppController()
mapper.resource("dummy_resource", "/dummy_resources",
controller=controller)
super(ExtensionsTestApp, self).__init__(mapper)
def create_request(self, path, body, content_type, method='GET'):
"""Test create request."""
LOG.debug("test_create_request - START")
req = webob.Request.blank(path)
req.method = method
req.headers = {}
req.headers['Accept'] = content_type
req.body = body
LOG.debug("test_create_request - END")
return req
def _create_network(self, name=None):
"""Test create network."""
LOG.debug("Creating network - START")
if name:
net_name = name
else:
net_name = self.network_name
net_path = "/tenants/tt/networks"
net_data = {'network': {'name': '%s' % net_name}}
req_body = wsgi.Serializer().serialize(net_data, self.contenttype)
network_req = self.create_request(net_path, req_body,
self.contenttype, 'POST')
network_res = network_req.get_response(self.api)
network_data = wsgi.Serializer().deserialize(network_res.body,
self.contenttype)
LOG.debug("Creating network - END")
return network_data['network']['id']
def _create_port(self, network_id, port_state):
"""Test create port."""
LOG.debug("Creating port for network %s - START", network_id)
port_path = "/tenants/tt/networks/%s/ports" % network_id
port_req_data = {'port': {'state': '%s' % port_state}}
req_body = wsgi.Serializer().serialize(port_req_data,
self.contenttype)
port_req = self.create_request(port_path, req_body,
self.contenttype, 'POST')
port_res = port_req.get_response(self.api)
port_data = wsgi.Serializer().deserialize(port_res.body,
self.contenttype)
LOG.debug("Creating port for network - END")
return port_data['port']['id']
def _delete_port(self, network_id, port_id):
"""Delete port."""
LOG.debug("Deleting port for network %s - START", network_id)
data = {'network_id': network_id, 'port_id': port_id}
port_path = ("/tenants/tt/networks/%(network_id)s/ports/%(port_id)s" %
data)
port_req = self.create_request(port_path, None,
self.contenttype, 'DELETE')
port_req.get_response(self.api)
LOG.debug("Deleting port for network - END")
def _delete_network(self, network_id):
"""Delete network."""
LOG.debug("Deleting network %s - START", network_id)
network_path = "/tenants/tt/networks/%s" % network_id
network_req = self.create_request(network_path, None,
self.contenttype, 'DELETE')
network_req.get_response(self.api)
LOG.debug("Deleting network - END")
def tear_down_port_network(self, net_id, port_id):
"""Tear down port and network."""
self._delete_port(net_id, port_id)
self._delete_network(net_id)
class QosExtensionTest(base.BaseTestCase):
def setUp(self):
"""Set up function."""
super(QosExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = qos.QosController(QuantumManager.get_plugin())
res_ext = extensions.ResourceExtension('qos', controller,
parent=parent_resource)
self.test_app = setup_extensions_test_app(
SimpleExtensionManager(res_ext))
self.contenttype = 'application/json'
self.qos_path = '/extensions/csco/tenants/tt/qos'
self.qos_second_path = '/extensions/csco/tenants/tt/qos/'
self.test_qos_data = {
'qos': {
'qos_name': 'cisco_test_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
}
self._l2network_plugin = l2network_plugin.L2Network()
def test_create_qos(self):
"""Test create qos."""
LOG.debug("test_create_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path,
req_body,
content_type=self.contenttype)
self.assertEqual(200, index_response.status_int)
# Clean Up - Delete the qos
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
qos_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
qos_path = str(qos_path_temp)
self.tearDownQos(qos_path)
LOG.debug("test_create_qos - END")
def test_create_qosBADRequest(self):
"""Test create qos bad request."""
LOG.debug("test_create_qosBADRequest - START")
index_response = self.test_app.post(self.qos_path,
'BAD_REQUEST',
content_type=self.contenttype,
status='*')
self.assertEqual(400, index_response.status_int)
LOG.debug("test_create_qosBADRequest - END")
def test_list_qoss(self):
"""Test list qoss."""
LOG.debug("test_list_qoss - START")
req_body1 = jsonutils.dumps(self.test_qos_data)
create_resp1 = self.test_app.post(self.qos_path, req_body1,
content_type=self.contenttype)
req_body2 = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos2',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
create_resp2 = self.test_app.post(self.qos_path, req_body2,
content_type=self.contenttype)
index_response = self.test_app.get(self.qos_path)
index_resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
self.assertEqual(200, index_response.status_int)
# Clean Up - Delete the qos's
resp_body1 = wsgi.Serializer().deserialize(create_resp1.body,
self.contenttype)
qos_path1_temp = self.qos_second_path + resp_body1['qoss']['qos']['id']
qos_path1 = str(qos_path1_temp)
resp_body2 = wsgi.Serializer().deserialize(create_resp2.body,
self.contenttype)
list_all_qos = [resp_body1['qoss']['qos'], resp_body2['qoss']['qos']]
self.assertTrue(index_resp_body['qoss'][0] in list_all_qos)
self.assertTrue(index_resp_body['qoss'][1] in list_all_qos)
qos_path2_temp = self.qos_second_path + resp_body2['qoss']['qos']['id']
qos_path2 = str(qos_path2_temp)
self.tearDownQos(qos_path1)
self.tearDownQos(qos_path2)
LOG.debug("test_list_qoss - END")
def test_show_qos(self):
"""Test show qos."""
LOG.debug("test_show_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
show_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
show_qos_path = str(show_path_temp)
show_response = self.test_app.get(show_qos_path)
show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
self.contenttype)
self.assertEqual(show_resp_dict['qoss']['qos']['name'],
self.test_qos_data['qos']['qos_name'])
self.assertEqual(200, show_response.status_int)
# Clean Up - Delete the qos
self.tearDownQos(show_qos_path)
LOG.debug("test_show_qos - END")
def test_show_qosDNE(self, qos_id='100'):
"""Test show qos does not exist."""
LOG.debug("test_show_qosDNE - START")
show_path_temp = self.qos_second_path + qos_id
show_qos_path = str(show_path_temp)
show_response = self.test_app.get(show_qos_path, status='*')
self.assertEqual(452, show_response.status_int)
LOG.debug("test_show_qosDNE - END")
def test_update_qos(self):
"""Test update qos."""
LOG.debug("test_update_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
rename_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype)
self.assertEqual(200, rename_response.status_int)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(rename_resp_dict['qoss']['qos']['name'],
'cisco_rename_qos')
self.tearDownQos(rename_path)
LOG.debug("test_update_qos - END")
def test_update_qosDNE(self, qos_id='100'):
"""Test update qos does not exist."""
LOG.debug("test_update_qosDNE - START")
rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
rename_path_temp = self.qos_second_path + qos_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype,
status='*')
self.assertEqual(452, rename_response.status_int)
LOG.debug("test_update_qosDNE - END")
def test_update_qosBADRequest(self):
"""Test update qos bad request."""
LOG.debug("test_update_qosBADRequest - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
rename_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
status="*")
self.assertEqual(400, rename_response.status_int)
# Clean Up - Delete the Port Profile
self.tearDownQos(rename_path)
LOG.debug("test_update_qosBADRequest - END")
def test_delete_qos(self):
"""Test delte qos."""
LOG.debug("test_delete_qos - START")
req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
delete_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path)
self.assertEqual(200, delete_response.status_int)
LOG.debug("test_delete_qos - END")
def test_delete_qosDNE(self, qos_id='100'):
"""Test delte qos does not exist."""
LOG.debug("test_delete_qosDNE - START")
delete_path_temp = self.qos_second_path + qos_id
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path, status='*')
self.assertEqual(452, delete_response.status_int)
LOG.debug("test_delete_qosDNE - END")
def tearDownQos(self, delete_profile_path):
"""Tear Down Qos."""
self.test_app.delete(delete_profile_path)
def tearDown(self):
db.clear_db()
class CredentialExtensionTest(base.BaseTestCase):
def setUp(self):
"""Set up function."""
super(CredentialExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = credential.CredentialController(QuantumManager.
get_plugin())
res_ext = extensions.ResourceExtension('credentials', controller,
parent=parent_resource)
self.test_app = setup_extensions_test_app(SimpleExtensionManager(
res_ext))
self.contenttype = 'application/json'
self.credential_path = '/extensions/csco/tenants/tt/credentials'
self.cred_second_path = '/extensions/csco/tenants/tt/credentials/'
self.test_credential_data = {
'credential': {
'credential_name': 'cred8',
'user_name': 'newUser2',
'password': 'newPasswd1',
},
}
self._l2network_plugin = l2network_plugin.L2Network()
def test_list_credentials(self):
"""Test list credentials."""
#Create Credential before listing
LOG.debug("test_list_credentials - START")
req_body1 = jsonutils.dumps(self.test_credential_data)
create_response1 = self.test_app.post(
self.credential_path, req_body1,
content_type=self.contenttype)
req_body2 = jsonutils.dumps({
'credential': {
'credential_name': 'cred9',
'user_name': 'newUser2',
'password': 'newPasswd2',
},
})
create_response2 = self.test_app.post(
self.credential_path, req_body2,
content_type=self.contenttype)
index_response = self.test_app.get(self.credential_path)
index_resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
self.assertEqual(200, index_response.status_int)
#CLean Up - Deletion of the Credentials
resp_body1 = wsgi.Serializer().deserialize(create_response1.body,
self.contenttype)
delete_path1_temp = (self.cred_second_path +
resp_body1['credentials']['credential']['id'])
delete_path1 = str(delete_path1_temp)
resp_body2 = wsgi.Serializer().deserialize(create_response2.body,
self.contenttype)
list_all_credential = [resp_body1['credentials']['credential'],
resp_body2['credentials']['credential']]
self.assertTrue(
index_resp_body['credentials'][0] in list_all_credential)
self.assertTrue(
index_resp_body['credentials'][1] in list_all_credential)
delete_path2_temp = (self.cred_second_path +
resp_body2['credentials']['credential']['id'])
delete_path2 = str(delete_path2_temp)
self.tearDownCredential(delete_path1)
self.tearDownCredential(delete_path2)
LOG.debug("test_list_credentials - END")
def test_create_credential(self):
"""Test create credential."""
LOG.debug("test_create_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
self.assertEqual(200, index_response.status_int)
#CLean Up - Deletion of the Credentials
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
delete_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
delete_path = str(delete_path_temp)
self.tearDownCredential(delete_path)
LOG.debug("test_create_credential - END")
def test_create_credentialBADRequest(self):
"""Test create credential bad request."""
LOG.debug("test_create_credentialBADRequest - START")
index_response = self.test_app.post(
self.credential_path, 'BAD_REQUEST',
content_type=self.contenttype, status='*')
self.assertEqual(400, index_response.status_int)
LOG.debug("test_create_credentialBADRequest - END")
def test_show_credential(self):
"""Test show credential."""
LOG.debug("test_show_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
show_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
show_cred_path = str(show_path_temp)
show_response = self.test_app.get(show_cred_path)
show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
self.contenttype)
self.assertEqual(show_resp_dict['credentials']['credential']['name'],
self.test_credential_data['credential']['user_name'])
self.assertEqual(
show_resp_dict['credentials']['credential']['password'],
self.test_credential_data['credential']['password'])
self.assertEqual(200, show_response.status_int)
LOG.debug("test_show_credential - END")
def test_show_credentialDNE(self, credential_id='100'):
"""Test show credential does not exist."""
LOG.debug("test_show_credentialDNE - START")
show_path_temp = self.cred_second_path + credential_id
show_cred_path = str(show_path_temp)
show_response = self.test_app.get(show_cred_path, status='*')
self.assertEqual(451, show_response.status_int)
LOG.debug("test_show_credentialDNE - END")
def test_update_credential(self):
"""Test update credential."""
LOG.debug("test_update_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
},
})
rename_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(rename_resp_dict['credentials']['credential']['name'],
'cred3')
self.assertEqual(
rename_resp_dict['credentials']['credential']['password'],
self.test_credential_data['credential']['password'])
self.assertEqual(200, rename_response.status_int)
# Clean Up - Delete the Credentials
self.tearDownCredential(rename_path)
LOG.debug("test_update_credential - END")
def test_update_credBADReq(self):
"""Test update credential bad request."""
LOG.debug("test_update_credBADReq - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
rename_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
status='*')
self.assertEqual(400, rename_response.status_int)
LOG.debug("test_update_credBADReq - END")
def test_update_credentialDNE(self, credential_id='100'):
"""Test update credential does not exist."""
LOG.debug("test_update_credentialDNE - START")
rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
},
})
rename_path_temp = self.cred_second_path + credential_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype,
status='*')
self.assertEqual(451, rename_response.status_int)
LOG.debug("test_update_credentialDNE - END")
def test_delete_credential(self):
"""Test delete credential."""
LOG.debug("test_delete_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
delete_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path)
self.assertEqual(200, delete_response.status_int)
LOG.debug("test_delete_credential - END")
def test_delete_credentialDNE(self, credential_id='100'):
"""Test delete credential does not exist."""
LOG.debug("test_delete_credentialDNE - START")
delete_path_temp = self.cred_second_path + credential_id
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path, status='*')
self.assertEqual(451, delete_response.status_int)
LOG.debug("test_delete_credentialDNE - END")
def tearDownCredential(self, delete_path):
self.test_app.delete(delete_path)
def tearDown(self):
db.clear_db()
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
return ExtensionsTestApp(conf)
def setup_extensions_middleware(extension_manager=None):
extension_manager = (extension_manager or
PluginAwareExtensionManager(EXTENSIONS_PATH,
L2Network()))
app = config.load_paste_app('extensions_test_app')
return ExtensionMiddleware(app, ext_mgr=extension_manager)
def setup_extensions_test_app(extension_manager=None):
return TestApp(setup_extensions_middleware(extension_manager))
class SimpleExtensionManager(object):
def __init__(self, resource_ext=None, action_ext=None, request_ext=None):
self.resource_ext = resource_ext
self.action_ext = action_ext
self.request_ext = request_ext
def get_resources(self):
resource_exts = []
if self.resource_ext:
resource_exts.append(self.resource_ext)
return resource_exts
def get_actions(self):
action_exts = []
if self.action_ext:
action_exts.append(self.action_ext)
return action_exts
def get_request_extensions(self):
request_extensions = []
if self.request_ext:
request_extensions.append(self.request_ext)
return request_extensions

View File

@ -1,692 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
"""
test_database.py is an independent test suite
that tests the database api method calls
"""
import mock
import sqlalchemy
from quantum.openstack.common import log as logging
import quantum.plugins.cisco.common.cisco_exceptions as c_exc
import quantum.plugins.cisco.db.api as db
import quantum.plugins.cisco.db.l2network_db as l2network_db
import quantum.plugins.cisco.db.nexus_db_v2 as nexus_db
from quantum.tests import base
LOG = logging.getLogger(__name__)
class NexusDB(object):
"""Class consisting of methods to call nexus db methods."""
def get_all_nexusportbindings(self):
"""Get all nexus port bindings."""
bindings = []
try:
for bind in nexus_db.get_all_nexusport_bindings():
LOG.debug("Getting nexus port binding : %s" % bind.port_id)
bind_dict = {}
bind_dict["port-id"] = str(bind.port_id)
bind_dict["vlan-id"] = str(bind.vlan_id)
bindings.append(bind_dict)
except Exception as exc:
LOG.error("Failed to get all bindings: %s" % str(exc))
return bindings
def get_nexusportbinding(self, vlan_id):
"""Get nexus port binding."""
binding = []
try:
for bind in nexus_db.get_nexusport_binding(vlan_id):
LOG.debug("Getting nexus port binding : %s" % bind.port_id)
bind_dict = {}
bind_dict["port-id"] = str(bind.port_id)
bind_dict["vlan-id"] = str(bind.vlan_id)
binding.append(bind_dict)
except Exception as exc:
LOG.error("Failed to get all bindings: %s" % str(exc))
return binding
def create_nexusportbinding(self, port_id, vlan_id):
"""Create nexus port binding."""
bind_dict = {}
try:
res = nexus_db.add_nexusport_binding(port_id, vlan_id)
LOG.debug("Created nexus port binding : %s" % res.port_id)
bind_dict["port-id"] = str(res.port_id)
bind_dict["vlan-id"] = str(res.vlan_id)
return bind_dict
except Exception as exc:
LOG.error("Failed to create nexus binding: %s" % str(exc))
def delete_nexusportbinding(self, vlan_id):
"""Delete nexus port binding."""
bindings = []
try:
bind = nexus_db.remove_nexusport_binding(vlan_id)
for res in bind:
LOG.debug("Deleted nexus port binding: %s" % res.vlan_id)
bind_dict = {}
bind_dict["port-id"] = res.port_id
bindings.append(bind_dict)
return bindings
except Exception as exc:
raise Exception("Failed to delete nexus port binding: %s"
% str(exc))
def update_nexusport_binding(self, port_id, new_vlan_id):
"""Update nexus port binding."""
try:
res = nexus_db.update_nexusport_binding(port_id, new_vlan_id)
LOG.debug("Updating nexus port binding : %s" % res.port_id)
bind_dict = {}
bind_dict["port-id"] = str(res.port_id)
bind_dict["vlan-id"] = str(res.vlan_id)
return bind_dict
except Exception as exc:
raise Exception("Failed to update nexus port binding vnic: %s"
% str(exc))
class L2networkDB(object):
"""Class conisting of methods to call L2network db methods."""
def get_all_vlan_bindings(self):
"""Get all vlan binding into a list of dict."""
vlans = []
try:
for vlan_bind in l2network_db.get_all_vlan_bindings():
LOG.debug("Getting vlan bindings for vlan: %s" %
vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
vlan_dict["vlan-name"] = vlan_bind.vlan_name
vlan_dict["net-id"] = str(vlan_bind.network_id)
vlans.append(vlan_dict)
except Exception as exc:
LOG.error("Failed to get all vlan bindings: %s" % str(exc))
return vlans
def get_vlan_binding(self, network_id):
"""Get a vlan binding."""
vlan = []
try:
for vlan_bind in l2network_db.get_vlan_binding(network_id):
LOG.debug("Getting vlan binding for vlan: %s" %
vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
vlan_dict["vlan-name"] = vlan_bind.vlan_name
vlan_dict["net-id"] = str(vlan_bind.network_id)
vlan.append(vlan_dict)
except Exception as exc:
LOG.error("Failed to get vlan binding: %s" % str(exc))
return vlan
def create_vlan_binding(self, vlan_id, vlan_name, network_id):
"""Create a vlan binding."""
vlan_dict = {}
try:
res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id)
LOG.debug("Created vlan binding for vlan: %s" % res.vlan_id)
vlan_dict["vlan-id"] = str(res.vlan_id)
vlan_dict["vlan-name"] = res.vlan_name
vlan_dict["net-id"] = str(res.network_id)
return vlan_dict
except Exception as exc:
LOG.error("Failed to create vlan binding: %s" % str(exc))
def delete_vlan_binding(self, network_id):
"""Delete a vlan binding."""
try:
res = l2network_db.remove_vlan_binding(network_id)
LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(res.vlan_id)
return vlan_dict
except Exception as exc:
raise Exception("Failed to delete vlan binding: %s" % str(exc))
def update_vlan_binding(self, network_id, vlan_id, vlan_name):
"""Update a vlan binding."""
try:
res = l2network_db.update_vlan_binding(network_id, vlan_id,
vlan_name)
LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(res.vlan_id)
vlan_dict["vlan-name"] = res.vlan_name
vlan_dict["net-id"] = str(res.network_id)
return vlan_dict
except Exception as exc:
raise Exception("Failed to update vlan binding: %s" % str(exc))
class QuantumDB(object):
"""Class conisting of methods to call Quantum db methods."""
def get_all_networks(self, tenant_id):
"""Get all networks."""
nets = []
try:
for net in db.network_list(tenant_id):
LOG.debug("Getting network: %s" % net.uuid)
net_dict = {}
net_dict["tenant-id"] = net.tenant_id
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
nets.append(net_dict)
except Exception as exc:
LOG.error("Failed to get all networks: %s" % str(exc))
return nets
def get_network(self, network_id):
"""Get a network."""
net = []
try:
for net in db.network_get(network_id):
LOG.debug("Getting network: %s" % net.uuid)
net_dict = {}
net_dict["tenant-id"] = net.tenant_id
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
net.append(net_dict)
except Exception as exc:
LOG.error("Failed to get network: %s" % str(exc))
return net
def create_network(self, tenant_id, net_name):
"""Create a network."""
net_dict = {}
try:
res = db.network_create(tenant_id, net_name)
LOG.debug("Created network: %s" % res.uuid)
net_dict["tenant-id"] = res.tenant_id
net_dict["net-id"] = str(res.uuid)
net_dict["net-name"] = res.name
return net_dict
except Exception as exc:
LOG.error("Failed to create network: %s" % str(exc))
def delete_network(self, net_id):
"""Delete a network."""
try:
net = db.network_destroy(net_id)
LOG.debug("Deleted network: %s" % net.uuid)
net_dict = {}
net_dict["net-id"] = str(net.uuid)
return net_dict
except Exception as exc:
raise Exception("Failed to delete port: %s" % str(exc))
def update_network(self, tenant_id, net_id, **kwargs):
"""Update a network."""
try:
net = db.network_update(net_id, tenant_id, **kwargs)
LOG.debug("Updated network: %s" % net.uuid)
net_dict = {}
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
return net_dict
except Exception as exc:
raise Exception("Failed to update network: %s" % str(exc))
def get_all_ports(self, net_id):
"""Get all ports."""
ports = []
try:
for port in db.port_list(net_id):
LOG.debug("Getting port: %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
port_dict["net"] = port.network
ports.append(port_dict)
return ports
except Exception as exc:
LOG.error("Failed to get all ports: %s" % str(exc))
def get_port(self, net_id, port_id):
"""Get a port."""
port_list = []
port = db.port_get(net_id, port_id)
try:
LOG.debug("Getting port: %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
port_list.append(port_dict)
return port_list
except Exception as exc:
LOG.error("Failed to get port: %s" % str(exc))
def create_port(self, net_id):
"""Add a port."""
port_dict = {}
try:
port = db.port_create(net_id)
LOG.debug("Creating port %s" % port.uuid)
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
LOG.error("Failed to create port: %s" % str(exc))
def delete_port(self, net_id, port_id):
"""Delete a port."""
try:
port = db.port_destroy(net_id, port_id)
LOG.debug("Deleted port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
return port_dict
except Exception as exc:
raise Exception("Failed to delete port: %s" % str(exc))
def update_port(self, net_id, port_id, port_state):
"""Update a port."""
try:
port = db.port_set_state(net_id, port_id, port_state)
LOG.debug("Updated port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to update port state: %s" % str(exc))
def plug_interface(self, net_id, port_id, int_id):
"""Plug interface to a port."""
try:
port = db.port_set_attachment(net_id, port_id, int_id)
LOG.debug("Attached interface to port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to plug interface: %s" % str(exc))
def unplug_interface(self, net_id, port_id):
"""Unplug interface to a port."""
try:
port = db.port_unset_attachment(net_id, port_id)
LOG.debug("Detached interface from port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to unplug interface: %s" % str(exc))
class NexusDBTest(base.BaseTestCase):
"""Class conisting of nexus DB unit tests."""
def setUp(self):
super(NexusDBTest, self).setUp()
"""Setup for nexus db tests."""
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = NexusDB()
LOG.debug("Setup")
def testa_create_nexusportbinding(self):
"""Create nexus port binding."""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
self.assertTrue(binding1["port-id"] == "port1")
self.tearDown_nexusportbinding()
def testb_getall_nexusportbindings(self):
"""Get all nexus port bindings."""
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.create_nexusportbinding("port2", 10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "port" in bind["port-id"]:
count += 1
self.assertTrue(count == 2)
self.tearDown_nexusportbinding()
def testc_delete_nexusportbinding(self):
"""Delete nexus port binding."""
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.delete_nexusportbinding(10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "port " in bind["port-id"]:
count += 1
self.assertTrue(count == 0)
self.tearDown_nexusportbinding()
def testd_update_nexusportbinding(self):
"""Update nexus port binding."""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
binding1 = self.dbtest.update_nexusport_binding(binding1["port-id"],
20)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "20" in str(bind["vlan-id"]):
count += 1
self.assertTrue(count == 1)
self.tearDown_nexusportbinding()
def test_get_nexusport_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
mock_all.return_value = []
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusport_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1',
instance_id=1)
def test_get_nexusvlan_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
mock_all.return_value = []
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusvlan_binding(vlan_id=10,
switch_ip='10.0.0.1')
def test_update_nexusport_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.one') as mock_one:
mock_one.side_effect = sqlalchemy.orm.exc.NoResultFound
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.update_nexusport_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1',
instance_id=1)
def test_get_nexusvm_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.first') as mock_first:
mock_first.return_value = None
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusvm_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1')
def test_nexusport_binding_not_found_exception_message_formatting(self):
try:
raise c_exc.NexusPortBindingNotFound(a=1, b='test')
except c_exc.NexusPortBindingNotFound as e:
self.assertIn('(a=1,b=test)', str(e))
def tearDown_nexusportbinding(self):
"""Tear down nexus port binding table."""
LOG.debug("Tearing Down Nexus port Bindings")
binds = self.dbtest.get_all_nexusportbindings()
for bind in binds:
vlan_id = bind["vlan-id"]
self.dbtest.delete_nexusportbinding(vlan_id)
class L2networkDBTest(base.BaseTestCase):
"""Class conisting of L2network DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(L2networkDBTest, self).setUp()
l2network_db.initialize()
self.dbtest = L2networkDB()
self.quantum = QuantumDB()
self.addCleanup(db.clear_db)
LOG.debug("Setup")
def testa_create_vlanbinding(self):
"""Test add vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
self.teardown_vlanbinding()
self.teardown_network()
def testb_getall_vlanbindings(self):
"""Test get all vlan bindings."""
net1 = self.quantum.create_network("t1", "netid1")
net2 = self.quantum.create_network("t1", "netid2")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", net2["net-id"])
self.assertTrue(vlan2["vlan-id"] == "20")
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "vlan" in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 2)
self.teardown_vlanbinding()
self.teardown_network()
def testc_delete_vlanbinding(self):
"""Test delete vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
self.dbtest.delete_vlan_binding(net1["net-id"])
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "vlan " in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 0)
self.teardown_vlanbinding()
self.teardown_network()
def testd_update_vlanbinding(self):
"""Test update vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
vlan1 = self.dbtest.update_vlan_binding(net1["net-id"], 11, "newvlan1")
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "new" in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 1)
self.teardown_vlanbinding()
self.teardown_network()
def testm_test_vlanids(self):
"""Test vlanid methods."""
l2network_db.create_vlanids()
vlanids = l2network_db.get_all_vlanids()
self.assertTrue(len(vlanids) > 0)
vlanid = l2network_db.reserve_vlanid()
used = l2network_db.is_vlanid_used(vlanid)
self.assertTrue(used)
used = l2network_db.release_vlanid(vlanid)
self.assertFalse(used)
#counting on default teardown here to clear db
def teardown_network(self):
"""tearDown Network table."""
LOG.debug("Tearing Down Network")
nets = self.quantum.get_all_networks("t1")
for net in nets:
netid = net["net-id"]
self.quantum.delete_network(netid)
def teardown_port(self):
"""tearDown Port table."""
LOG.debug("Tearing Down Port")
nets = self.quantum.get_all_networks("t1")
for net in nets:
netid = net["net-id"]
ports = self.quantum.get_all_ports(netid)
for port in ports:
portid = port["port-id"]
self.quantum.delete_port(netid, portid)
def teardown_vlanbinding(self):
"""tearDown VlanBinding table."""
LOG.debug("Tearing Down Vlan Binding")
vlans = self.dbtest.get_all_vlan_bindings()
for vlan in vlans:
netid = vlan["net-id"]
self.dbtest.delete_vlan_binding(netid)
class QuantumDBTest(base.BaseTestCase):
"""Class conisting of Quantum DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(QuantumDBTest, self).setUp()
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = QuantumDB()
self.tenant_id = "t1"
LOG.debug("Setup")
def testa_create_network(self):
"""Test to create network."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
self.teardown_network_port()
def testb_get_networks(self):
"""Test to get all networks."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2")
self.assertTrue(net2["net-name"] == "plugin_test2")
nets = self.dbtest.get_all_networks(self.tenant_id)
count = 0
for net in nets:
if "plugin_test" in net["net-name"]:
count += 1
self.assertTrue(count == 2)
self.teardown_network_port()
def testc_delete_network(self):
"""Test to delete network."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
self.dbtest.delete_network(net1["net-id"])
nets = self.dbtest.get_all_networks(self.tenant_id)
count = 0
for net in nets:
if "plugin_test1" in net["net-name"]:
count += 1
self.assertTrue(count == 0)
self.teardown_network_port()
def testd_update_network(self):
"""Test to update (rename) network."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
net = self.dbtest.update_network(self.tenant_id, net1["net-id"],
name="plugin_test1_renamed")
self.assertTrue(net["net-name"] == "plugin_test1_renamed")
self.teardown_network_port()
def teste_create_port(self):
"""Test to create port."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
port = self.dbtest.create_port(net1["net-id"])
self.assertTrue(port["net-id"] == net1["net-id"])
ports = self.dbtest.get_all_ports(net1["net-id"])
count = 0
for por in ports:
count += 1
self.assertTrue(count == 1)
self.teardown_network_port()
def testf_delete_port(self):
"""Test to delete port."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
port = self.dbtest.create_port(net1["net-id"])
self.assertTrue(port["net-id"] == net1["net-id"])
ports = self.dbtest.get_all_ports(net1["net-id"])
count = 0
for por in ports:
count += 1
self.assertTrue(count == 1)
for por in ports:
self.dbtest.delete_port(net1["net-id"], por["port-id"])
ports = self.dbtest.get_all_ports(net1["net-id"])
count = 0
for por in ports:
count += 1
self.assertTrue(count == 0)
self.teardown_network_port()
def testg_plug_unplug_interface(self):
"""Test to plug/unplug interface."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
port1 = self.dbtest.create_port(net1["net-id"])
self.dbtest.plug_interface(net1["net-id"], port1["port-id"], "vif1.1")
port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
self.assertTrue(port[0]["int-id"] == "vif1.1")
self.dbtest.unplug_interface(net1["net-id"], port1["port-id"])
port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
self.assertTrue(port[0]["int-id"] is None)
self.teardown_network_port()
def testh_joined_test(self):
"""Test to get network and port."""
net1 = self.dbtest.create_network("t1", "net1")
port1 = self.dbtest.create_port(net1["net-id"])
self.assertTrue(port1["net-id"] == net1["net-id"])
port2 = self.dbtest.create_port(net1["net-id"])
self.assertTrue(port2["net-id"] == net1["net-id"])
ports = self.dbtest.get_all_ports(net1["net-id"])
for port in ports:
net = port["net"]
LOG.debug("Port id %s Net id %s" % (port["port-id"], net.uuid))
self.teardown_joined_test()
def teardown_network_port(self):
"""tearDown for Network and Port table."""
networks = self.dbtest.get_all_networks(self.tenant_id)
for net in networks:
netid = net["net-id"]
name = net["net-name"]
if "plugin_test" in name:
ports = self.dbtest.get_all_ports(netid)
for por in ports:
self.dbtest.delete_port(netid, por["port-id"])
self.dbtest.delete_network(netid)
def teardown_joined_test(self):
"""tearDown for joined Network and Port test."""
LOG.debug("Tearing Down Network and Ports")
nets = self.dbtest.get_all_networks("t1")
for net in nets:
netid = net["net-id"]
ports = self.dbtest.get_all_ports(netid)
for port in ports:
self.dbtest.delete_port(port["net-id"], port["port-id"])
self.dbtest.delete_network(netid)

View File

@ -1,8 +0,0 @@
[pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app
[filter:extensions]
paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app]
paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory

View File

@ -1,19 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import __builtin__
setattr(__builtin__, '_', lambda x: x)

View File

@ -1,43 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
# Address to bind the API server
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 9696
# Path to the extensions
api_extensions_path = ../../../../../extensions
# Paste configuration file
api_paste_config = api-paste.ini.cisco.test
core_plugin = quantum.plugins.cisco.network_plugin.PluginV2
# The messaging module to use, defaults to kombu.
rpc_backend = quantum.openstack.common.rpc.impl_fake
[QUOTAS]
# resource name(s) that are supported in quota features
quota_items = network,subnet,port
# default number of resource allowed per tenant, minus for unlimited
default_quota = -1
# number of networks allowed per tenant, and minus means unlimited
# quota_network = 10
# number of subnets allowed per tenant, and minus means unlimited
# quota_subnet = 10
# number of ports allowed per tenant, and minus means unlimited
# quota_port = 50
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
quota_driver = quantum.db.quota_db.DbQuotaDriver

View File

@ -1,66 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import logging
import os
import mock
from oslo.config import cfg
import webtest
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api.v2 import router
from quantum.common import config
from quantum.manager import QuantumManager
from quantum.tests.unit import test_api_v2
LOG = logging.getLogger(__name__)
def curdir(*p):
return os.path.join(os.path.dirname(__file__), *p)
class APIv2TestCase(test_api_v2.APIv2TestCase):
def setUp(self):
super(APIv2TestCase, self).setUp()
plugin = 'quantum.plugins.cisco.network_plugin.PluginV2'
# Ensure 'stale' patched copies of the plugin are never returned
QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
PluginAwareExtensionManager._instance = None
# Create the default configurations
args = ['--config-file', curdir('quantumv2.conf.cisco.test')]
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
api = router.APIRouter()
self.api = webtest.TestApp(api)
LOG.debug("%s.%s.%s done" % (__name__, self.__class__.__name__,
inspect.stack()[0][3]))
class JSONV2TestCase(APIv2TestCase, test_api_v2.JSONV2TestCase):
pass

View File

@ -92,7 +92,6 @@ msg_format_checkers = [
]
file_black_list = ["./quantum/plugins/cisco/tests/unit",
"./quantum/tests/unit",
file_black_list = ["./quantum/tests/unit",
"./quantum/openstack",
"./quantum/plugins/bigswitch/tests"]