Merge "Ensure no "agent" functional tests are skipped in the gate"
This commit is contained in:
commit
87cb6cb0ad
@ -11,8 +11,14 @@
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import functools
|
||||
import unittest.case
|
||||
|
||||
import testtools.testcase
|
||||
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.tests import base
|
||||
from neutron.tests import tools
|
||||
|
||||
|
||||
def create_resource(prefix, creation_func, *args, **kwargs):
|
||||
@ -40,3 +46,24 @@ def create_resource(prefix, creation_func, *args, **kwargs):
|
||||
return creation_func(name, *args, **kwargs)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
|
||||
def no_skip_on_missing_deps(wrapped):
|
||||
"""Do not allow a method/test to skip on missing dependencies.
|
||||
|
||||
This decorator raises an error if a skip is raised by wrapped method when
|
||||
OS_FAIL_ON_MISSING_DEPS is evaluated to True. This decorator should be used
|
||||
only for missing dependencies (including missing system requirements).
|
||||
"""
|
||||
|
||||
@functools.wraps(wrapped)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return wrapped(*args, **kwargs)
|
||||
except (testtools.TestCase.skipException, unittest.case.SkipTest) as e:
|
||||
if base.bool_from_env('OS_FAIL_ON_MISSING_DEPS'):
|
||||
tools.fail(
|
||||
'%s cannot be skipped because OS_FAIL_ON_MISSING_DEPS '
|
||||
'is enabled, skip reason: %s' % (wrapped.__name__, e))
|
||||
raise
|
||||
return wrapper
|
||||
|
@ -23,6 +23,7 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.cmd.sanity import checks
|
||||
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
|
||||
from neutron.plugins.openvswitch.common import constants
|
||||
from neutron.tests.common import base as common_base
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional.agent import test_ovs_lib
|
||||
from neutron.tests.functional import base
|
||||
@ -85,12 +86,11 @@ class _OVSAgentOFCtlTestBase(_OVSAgentTestBase):
|
||||
|
||||
class _ARPSpoofTestCase(object):
|
||||
def setUp(self):
|
||||
if not checks.arp_header_match_supported():
|
||||
self.skipTest("ARP header matching not supported")
|
||||
# NOTE(kevinbenton): it would be way cooler to use scapy for
|
||||
# these but scapy requires the python process to be running as
|
||||
# root to bind to the ports.
|
||||
super(_ARPSpoofTestCase, self).setUp()
|
||||
self.skip_without_arp_support()
|
||||
self.src_addr = '192.168.0.1'
|
||||
self.dst_addr = '192.168.0.2'
|
||||
self.src_namespace = self.useFixture(
|
||||
@ -104,6 +104,11 @@ class _ARPSpoofTestCase(object):
|
||||
# wait to add IPs until after anti-spoof rules to ensure ARP doesn't
|
||||
# happen before
|
||||
|
||||
@common_base.no_skip_on_missing_deps
|
||||
def skip_without_arp_support(self):
|
||||
if not checks.arp_header_match_supported():
|
||||
self.skipTest("ARP header matching not supported")
|
||||
|
||||
def test_arp_spoof_doesnt_block_normal_traffic(self):
|
||||
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr])
|
||||
self._setup_arp_spoof_for_port(self.dst_p.name, [self.dst_addr])
|
||||
|
@ -20,6 +20,7 @@ from oslo_config import cfg
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import base as common_base
|
||||
|
||||
SUDO_CMD = 'sudo -n'
|
||||
|
||||
@ -51,9 +52,6 @@ class BaseSudoTestCase(base.BaseTestCase):
|
||||
if not base.bool_from_env('OS_SUDO_TESTING'):
|
||||
self.skipTest('Testing with sudo is not enabled')
|
||||
|
||||
self.fail_on_missing_deps = (
|
||||
base.bool_from_env('OS_FAIL_ON_MISSING_DEPS'))
|
||||
|
||||
config.register_root_helper(cfg.CONF)
|
||||
self.config(group='AGENT',
|
||||
root_helper=os.environ.get('OS_ROOTWRAP_CMD', SUDO_CMD))
|
||||
@ -61,10 +59,11 @@ class BaseSudoTestCase(base.BaseTestCase):
|
||||
root_helper_daemon=os.environ.get(
|
||||
'OS_ROOTWRAP_DAEMON_CMD'))
|
||||
|
||||
@common_base.no_skip_on_missing_deps
|
||||
def check_command(self, cmd, error_text, skip_msg, run_as_root=False):
|
||||
try:
|
||||
utils.execute(cmd, run_as_root=run_as_root)
|
||||
except RuntimeError as e:
|
||||
if error_text in str(e) and not self.fail_on_missing_deps:
|
||||
if error_text in str(e):
|
||||
self.skipTest(skip_msg)
|
||||
raise
|
||||
|
Loading…
Reference in New Issue
Block a user