Unify assertEqual for empty usages

Update previous assertEqual(observed, *empty) usages to
assertEqual(*empty*, observed).

This patch aslo update hacking check for assertEqual with
empty types.

Change-Id: I981277618f92254a5beb9d3308a317d8c14e125c
This commit is contained in:
lzklibj 2015-12-21 17:11:56 +08:00 committed by Kevin Benton
parent e3468ed8a2
commit 3491cbc0c5
17 changed files with 61 additions and 25 deletions

View File

@ -208,6 +208,17 @@ def check_assertfalse(logical_line, filename):
yield (0, msg)
def check_assertempty(logical_line, filename):
if 'neutron/tests/' in filename:
msg = ("N330: Use assertEqual(*empty*, observed) instead of "
"assertEqual(observed, *empty*). *empty* contains "
"{}, [], (), set(), '', \"\"")
empties = r"(\[\s*\]|\{\s*\}|\(\s*\)|set\(\s*\)|'\s*'|\"\s*\")"
reg = r"assertEqual\(([^,]*,\s*)+?%s\)\s*$" % empties
if re.search(reg, logical_line):
yield (0, msg)
def factory(register):
register(validate_log_translations)
register(use_jsonutils)
@ -221,3 +232,4 @@ def factory(register):
register(check_asserttrue)
register(no_mutable_default_args)
register(check_assertfalse)
register(check_assertempty)

View File

@ -92,8 +92,8 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
self.assertEqual(tag, self.ovs.db_get_val('Port', port_name, 'tag'))
self.assertEqual(tag, self.br.get_port_tag_dict()[port_name])
self.ovs.clear_db_attribute('Port', port_name, 'tag')
self.assertEqual(self.ovs.db_get_val('Port', port_name, 'tag'), [])
self.assertEqual(self.br.get_port_tag_dict()[port_name], [])
self.assertEqual([], self.ovs.db_get_val('Port', port_name, 'tag'))
self.assertEqual([], self.br.get_port_tag_dict()[port_name])
def test_get_bridge_external_bridge_id(self):
self.ovs.set_db_attribute('Bridge', self.br.br_name,

View File

@ -432,7 +432,7 @@ class TestDhcpAgent(base.BaseTestCase):
mock.patch.object(dhcp, "run"):
report_state.return_value = const.AGENT_ALIVE
dhcp._report_state()
self.assertEqual(dhcp.needs_resync_reasons, {})
self.assertEqual({}, dhcp.needs_resync_reasons)
report_state.return_value = const.AGENT_REVIVED
dhcp._report_state()

View File

@ -147,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase):
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
False))
self.assertEqual(result, [])
self.assertEqual([], result)
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()

View File

@ -410,7 +410,7 @@ class TestIpWrapper(base.BaseTestCase):
ip_ns_cmd_cls.assert_has_calls([mock.call().exists('ns')])
self.assertNotIn(mock.call().delete('ns'),
ip_ns_cmd_cls.return_value.mock_calls)
self.assertEqual(mock_is_empty.mock_calls, [])
self.assertEqual([], mock_is_empty.mock_calls)
def test_garbage_collect_namespace_existing_empty_ns(self):
with mock.patch.object(ip_lib, 'IpNetnsCommand') as ip_ns_cmd_cls:
@ -485,7 +485,7 @@ class TestIpWrapper(base.BaseTestCase):
def test_add_device_to_namespace_is_none(self):
dev = mock.Mock()
ip_lib.IPWrapper().add_device_to_namespace(dev)
self.assertEqual(dev.mock_calls, [])
self.assertEqual([], dev.mock_calls)
class TestIPDevice(base.BaseTestCase):

View File

@ -61,7 +61,7 @@ class AgentUtilsExecuteTest(base.BaseTestCase):
self.mock_popen.return_value = ["", ""]
stdout = utils.execute(["ls", self.test_file[:-1]],
check_exit_code=False)
self.assertEqual(stdout, "")
self.assertEqual("", stdout)
def test_execute_raises(self):
self.mock_popen.side_effect = RuntimeError
@ -216,11 +216,11 @@ class TestFindChildPids(base.BaseTestCase):
def test_returns_empty_list_for_exit_code_1(self):
with mock.patch.object(utils, 'execute',
side_effect=RuntimeError('Exit code: 1')):
self.assertEqual(utils.find_child_pids(-1), [])
self.assertEqual([], utils.find_child_pids(-1))
def test_returns_empty_list_for_no_output(self):
with mock.patch.object(utils, 'execute', return_value=''):
self.assertEqual(utils.find_child_pids(-1), [])
self.assertEqual([], utils.find_child_pids(-1))
def test_returns_list_of_child_process_ids_for_good_ouput(self):
with mock.patch.object(utils, 'execute', return_value=' 123 \n 185\n'):

View File

@ -665,7 +665,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(res['networks'], [])
self.assertEqual([], res['networks'])
previous_links = []
if 'networks_links' in res:
@ -728,7 +728,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
'page_reverse': ['True']}
res = self.api.get(_get_path('networks'),
params=params).json
self.assertEqual(res['networks'], [])
self.assertEqual([], res['networks'])
next_links = []
if 'networks_links' in res:

View File

@ -136,7 +136,7 @@ class TestNetnsCleanup(base.BaseTestCase):
util.unplug_device(conf, device)
mock_get_bridge_for_iface.assert_called_once_with('tap1')
self.assertEqual(ovs_br_cls.mock_calls, [])
self.assertEqual([], ovs_br_cls.mock_calls)
self.assertTrue(debug.called)
def _test_destroy_namespace_helper(self, force, num_devices):

View File

@ -72,7 +72,7 @@ class TestParseMappings(base.BaseTestCase):
{'key1': 'val', 'key2': 'val'})
def test_parse_mappings_succeeds_for_no_mappings(self):
self.assertEqual(self.parse(['']), {})
self.assertEqual({}, self.parse(['']))
class TestParseTunnelRangesMixin(object):

View File

@ -317,5 +317,5 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
req = self.new_update_request('ports', update_port,
port['port']['id'])
port = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], [])
self.assertEqual([], port['port'][addr_pair.ADDRESS_PAIRS])
self._delete('ports', port['port']['id'])

View File

@ -98,7 +98,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
plugin = manager.NeutronManager.get_plugin()
ctx = context.Context(None, None, is_admin=True)
result = plugin.get_networks(ctx, filters=None)
self.assertEqual(result, [])
self.assertEqual([], result)
def test_update_network_set_external_non_admin_fails(self):
# Assert that a non-admin user cannot update the
@ -117,7 +117,7 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
ctx = context.Context(None, None, is_admin=True)
model = models_v2.Network
conditions = plugin._network_filter_hook(ctx, model, [])
self.assertEqual(conditions, [])
self.assertEqual([], conditions)
def test_network_filter_hook_nonadmin_context(self):
plugin = manager.NeutronManager.get_plugin()

View File

@ -95,7 +95,7 @@ class ExtraRouteDBTestCaseBase(object):
None, p['port']['id'], routes)
body = self._update('routers', r['router']['id'],
{'router': {'routes': None}})
self.assertEqual(body['router']['routes'], [])
self.assertEqual([], body['router']['routes'])
self._routes_update_cleanup(p['port']['id'],
None, r['router']['id'], [])

View File

@ -1248,8 +1248,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),
[])
self.assertEqual([],
res['port'].get(ext_sg.SECURITYGROUPS))
self._delete('ports', port['port']['id'])
def test_update_port_remove_security_group_none(self):
@ -1269,8 +1269,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),
[])
self.assertEqual([],
res['port'].get(ext_sg.SECURITYGROUPS))
self._delete('ports', port['port']['id'])
def test_create_port_with_bad_security_group(self):

View File

@ -216,3 +216,27 @@ class HackingTestCase(base.BaseTestCase):
self.assertEqual(
0, len(list(checks.check_assertfalse(pass_code,
"neutron/tests/test_assert.py"))))
def test_assertempty(self):
fail_code = """
test_empty = %s
self.assertEqual(test_empty, %s)
"""
pass_code1 = """
test_empty = %s
self.assertEqual(%s, test_empty)
"""
pass_code2 = """
self.assertEqual(123, foo(abc, %s))
"""
empty_cases = ['{}', '[]', '""', "''", '()', 'set()']
for ec in empty_cases:
self.assertEqual(
1, len(list(checks.check_assertempty(fail_code % (ec, ec),
"neutron/tests/test_assert.py"))))
self.assertEqual(
0, len(list(checks.check_assertfalse(pass_code1 % (ec, ec),
"neutron/tests/test_assert.py"))))
self.assertEqual(
0, len(list(checks.check_assertfalse(pass_code2 % ec,
"neutron/tests/test_assert.py"))))

View File

@ -63,4 +63,4 @@ class PSExtDriverTestCase(test_plugin.Ml2PluginV2TestCase,
self.assertEqual(res.status_int, 201)
port = self.deserialize('json', res)
self.assertFalse(port['port'][psec.PORTSECURITY])
self.assertEqual(port['port']['security_groups'], [])
self.assertEqual([], port['port']['security_groups'])

View File

@ -1303,8 +1303,8 @@ class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(2, rs.call_count)
self.assertEqual(ml2_db.get_network_segments(
self.context.session, network_id), [])
self.assertEqual([], ml2_db.get_network_segments(
self.context.session, network_id))
self.assertIsNone(ml2_db.get_dynamic_segment(
self.context.session, network_id, 'physnet2'))

View File

@ -35,7 +35,7 @@ class ParseServiceProviderConfigurationTestCase(base.BaseTestCase):
def test_default_service_provider_configuration(self):
providers = cfg.CONF.service_providers.service_provider
self.assertEqual(providers, [])
self.assertEqual([], providers)
def test_parse_single_service_provider_opt(self):
self._set_override([constants.LOADBALANCER +