Merge "Cancelling thread start while unit tests running"
This commit is contained in:
commit
1e7fa51b60
@ -177,7 +177,7 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
plugin)
|
||||
self.workflow_templates_exists = False
|
||||
self.completion_handler.setDaemon(True)
|
||||
self.completion_handler.start()
|
||||
self.completion_handler_started = False
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
LOG.debug(_('create_vip. vip: %s'), str(vip))
|
||||
@ -340,6 +340,12 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
context, extended_vip['subnet_id'])
|
||||
return subnet['network_id']
|
||||
|
||||
def _start_completion_handling_thread(self):
|
||||
if not self.completion_handler_started:
|
||||
LOG.info(_('Starting operation completion handling thread'))
|
||||
self.completion_handler.start()
|
||||
self.completion_handler_started = True
|
||||
|
||||
@call_log.log
|
||||
def _update_workflow(self, wf_name, action,
|
||||
wf_params, context,
|
||||
@ -371,6 +377,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
entity_id,
|
||||
delete=delete)
|
||||
LOG.debug(_('Pushing operation %s to the queue'), oper)
|
||||
|
||||
self._start_completion_handling_thread()
|
||||
self.queue.put_nowait(oper)
|
||||
|
||||
def _remove_workflow(self, ids, context):
|
||||
@ -391,6 +399,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
ids['vip'],
|
||||
delete=True)
|
||||
LOG.debug(_('Pushing operation %s to the queue'), oper)
|
||||
|
||||
self._start_completion_handling_thread()
|
||||
self.queue.put_nowait(oper)
|
||||
|
||||
def _remove_service(self, service_name):
|
||||
@ -619,24 +629,52 @@ class OperationCompletionHandler(threading.Thread):
|
||||
self.stoprequest = threading.Event()
|
||||
self.opers_to_handle_before_rest = 0
|
||||
|
||||
def _get_db_status(self, operation, success, messages=None):
|
||||
"""Get the db_status based on the status of the vdirect operation."""
|
||||
if not success:
|
||||
# we have a failure - log it and set the return ERROR as DB state
|
||||
msg = ', '.join(messages) if messages else "unknown"
|
||||
error_params = {"operation": operation, "msg": msg}
|
||||
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
|
||||
error_params)
|
||||
return constants.ERROR
|
||||
if operation.delete:
|
||||
return None
|
||||
else:
|
||||
return constants.ACTIVE
|
||||
|
||||
def join(self, timeout=None):
|
||||
self.stoprequest.set()
|
||||
super(OperationCompletionHandler, self).join(timeout)
|
||||
|
||||
def handle_operation_completion(self, oper):
|
||||
result = self.rest_client.call('GET',
|
||||
oper.operation_url,
|
||||
None,
|
||||
None)
|
||||
completed = result[RESP_DATA]['complete']
|
||||
reason = result[RESP_REASON],
|
||||
description = result[RESP_STR]
|
||||
if completed:
|
||||
# operation is done - update the DB with the status
|
||||
# or delete the entire graph from DB
|
||||
success = result[RESP_DATA]['success']
|
||||
sec_to_completion = time.time() - oper.creation_time
|
||||
debug_data = {'oper': oper,
|
||||
'sec_to_completion': sec_to_completion,
|
||||
'success': success}
|
||||
LOG.debug(_('Operation %(oper)s is completed after '
|
||||
'%(sec_to_completion)d sec '
|
||||
'with success status: %(success)s :'),
|
||||
debug_data)
|
||||
db_status = None
|
||||
if not success:
|
||||
# failure - log it and set the return ERROR as DB state
|
||||
if reason or description:
|
||||
msg = 'Reason:%s. Description:%s' % (reason, description)
|
||||
else:
|
||||
msg = "unknown"
|
||||
error_params = {"operation": oper, "msg": msg}
|
||||
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
|
||||
error_params)
|
||||
db_status = constants.ERROR
|
||||
else:
|
||||
if oper.delete:
|
||||
_remove_object_from_db(self.plugin, oper)
|
||||
else:
|
||||
db_status = constants.ACTIVE
|
||||
|
||||
if db_status:
|
||||
_update_vip_graph_status(self.plugin, oper, db_status)
|
||||
|
||||
return completed
|
||||
|
||||
def run(self):
|
||||
oper = None
|
||||
while not self.stoprequest.isSet():
|
||||
@ -653,31 +691,7 @@ class OperationCompletionHandler(threading.Thread):
|
||||
str(oper))
|
||||
# check the status - if oper is done: update the db ,
|
||||
# else push the oper again to the queue
|
||||
result = self.rest_client.call('GET',
|
||||
oper.operation_url,
|
||||
None,
|
||||
None)
|
||||
completed = result[RESP_DATA]['complete']
|
||||
if completed:
|
||||
# operation is done - update the DB with the status
|
||||
# or delete the entire graph from DB
|
||||
success = result[RESP_DATA]['success']
|
||||
sec_to_completion = time.time() - oper.creation_time
|
||||
debug_data = {'oper': oper,
|
||||
'sec_to_completion': sec_to_completion,
|
||||
'success': success}
|
||||
LOG.debug(_('Operation %(oper)s is completed after '
|
||||
'%(sec_to_completion)d sec '
|
||||
'with success status: %(success)s :'),
|
||||
debug_data)
|
||||
db_status = self._get_db_status(oper, success)
|
||||
if db_status:
|
||||
_update_vip_graph_status(
|
||||
self.plugin, oper, db_status)
|
||||
else:
|
||||
_remove_object_from_db(
|
||||
self.plugin, oper)
|
||||
else:
|
||||
if not self.handle_operation_completion(oper):
|
||||
LOG.debug(_('Operation %s is not completed yet..') % oper)
|
||||
# Not completed - push to the queue again
|
||||
self.queue.put_nowait(oper)
|
||||
|
@ -16,10 +16,10 @@
|
||||
#
|
||||
# @author: Avishay Balderman, Radware
|
||||
|
||||
import Queue
|
||||
import re
|
||||
|
||||
import contextlib
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from neutron import context
|
||||
@ -34,8 +34,16 @@ from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
||||
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
|
||||
|
||||
|
||||
def rest_call_function_mock(action, resource, data, headers, binary=False):
|
||||
class QueueMock(Queue.Queue):
|
||||
def __init__(self, completion_handler):
|
||||
self.completion_handler = completion_handler
|
||||
super(QueueMock, self).__init__()
|
||||
|
||||
def put_nowait(self, oper):
|
||||
self.completion_handler(oper)
|
||||
|
||||
|
||||
def rest_call_function_mock(action, resource, data, headers, binary=False):
|
||||
if rest_call_function_mock.RESPOND_WITH_ERROR:
|
||||
return 400, 'error_status', 'error_description', None
|
||||
|
||||
@ -107,13 +115,24 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
rest_call_function_mock.__dict__.update(
|
||||
{'TEMPLATES_MISSING': False})
|
||||
|
||||
self.rest_call_mock = mock.Mock(name='rest_call_mock',
|
||||
side_effect=rest_call_function_mock,
|
||||
spec=self.plugin_instance.
|
||||
drivers['radware'].
|
||||
rest_client.call)
|
||||
self.operation_completer_start_mock = mock.Mock(
|
||||
return_value=None)
|
||||
self.operation_completer_join_mock = mock.Mock(
|
||||
return_value=None)
|
||||
self.driver_rest_call_mock = mock.Mock(
|
||||
side_effect=rest_call_function_mock)
|
||||
|
||||
radware_driver = self.plugin_instance.drivers['radware']
|
||||
radware_driver.rest_client.call = self.rest_call_mock
|
||||
radware_driver.completion_handler.start = (
|
||||
self.operation_completer_start_mock)
|
||||
radware_driver.completion_handler.join = (
|
||||
self.operation_completer_join_mock)
|
||||
radware_driver.rest_client.call = self.driver_rest_call_mock
|
||||
radware_driver.completion_handler.rest_client.call = (
|
||||
self.driver_rest_call_mock)
|
||||
|
||||
radware_driver.queue = QueueMock(
|
||||
radware_driver.completion_handler.handle_operation_completion)
|
||||
|
||||
self.addCleanup(radware_driver.completion_handler.join)
|
||||
|
||||
@ -128,7 +147,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
|
||||
def test_create_vip_failure(self):
|
||||
"""Test the rest call failure handling by Exception raising."""
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.network(do_delete=False) as network:
|
||||
with self.subnet(network=network, do_delete=False) as subnet:
|
||||
with self.pool(no_delete=True, provider='radware') as pool:
|
||||
@ -155,9 +173,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
{'vip': vip_data})
|
||||
|
||||
def test_create_vip(self):
|
||||
self.skipTest("Skipping test till bug 1288312 is fixed")
|
||||
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware') as pool:
|
||||
vip_data = {
|
||||
@ -211,10 +226,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.call('GET', '/api/workflow/' +
|
||||
pool['pool']['id'], None, None)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls, any_order=True)
|
||||
|
||||
# sleep to wait for the operation completion
|
||||
eventlet.greenthread.sleep(0)
|
||||
self.driver_rest_call_mock.assert_has_calls(calls,
|
||||
any_order=True)
|
||||
|
||||
#Test DB
|
||||
new_vip = self.plugin_instance.get_vip(
|
||||
@ -232,12 +245,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'],
|
||||
None, None)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls, any_order=True)
|
||||
# need to wait some time to allow driver to delete vip
|
||||
eventlet.greenthread.sleep(1)
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
def test_update_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware', no_delete=True) as pool:
|
||||
vip_data = {
|
||||
@ -268,15 +279,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
'/action/BaseCreate',
|
||||
mock.ANY, driver.TEMPLATE_HEADER),
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls, any_order=True)
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
updated_vip = self.plugin_instance.get_vip(
|
||||
context.get_admin_context(), vip['id'])
|
||||
self.assertEqual(updated_vip['status'],
|
||||
constants.PENDING_UPDATE)
|
||||
|
||||
# sleep to wait for the operation completion
|
||||
eventlet.greenthread.sleep(1)
|
||||
updated_vip = self.plugin_instance.get_vip(
|
||||
context.get_admin_context(), vip['id'])
|
||||
self.assertEqual(updated_vip['status'], constants.ACTIVE)
|
||||
@ -286,7 +291,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
context.get_admin_context(), vip['id'])
|
||||
|
||||
def test_delete_vip_failure(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
plugin = self.plugin_instance
|
||||
|
||||
with self.network(do_delete=False) as network:
|
||||
@ -306,8 +310,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
context.get_admin_context(), hm, pool['pool']['id']
|
||||
)
|
||||
|
||||
eventlet.greenthread.sleep(1)
|
||||
|
||||
rest_call_function_mock.__dict__.update(
|
||||
{'RESPOND_WITH_ERROR': True})
|
||||
|
||||
@ -333,7 +335,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
self.assertEqual(u_phm['status'], constants.ACTIVE)
|
||||
|
||||
def test_delete_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware', no_delete=True) as pool:
|
||||
vip_data = {
|
||||
@ -360,14 +361,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.call('DELETE', '/api/workflow/' + pool['pool']['id'],
|
||||
None, None)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls, any_order=True)
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
self.assertRaises(loadbalancer.VipNotFound,
|
||||
self.plugin_instance.get_vip,
|
||||
context.get_admin_context(), vip['id'])
|
||||
|
||||
def test_update_pool(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet():
|
||||
with self.pool() as pool:
|
||||
del pool['pool']['provider']
|
||||
@ -380,7 +381,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
self.assertEqual(pool_db['status'], constants.PENDING_UPDATE)
|
||||
|
||||
def test_delete_pool_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware', no_delete=True) as pool:
|
||||
with self.vip(pool=pool, subnet=subnet):
|
||||
@ -390,7 +390,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
pool['pool']['id'])
|
||||
|
||||
def test_create_member_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware') as p:
|
||||
with self.vip(pool=p, subnet=subnet):
|
||||
@ -407,11 +406,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.ANY, driver.TEMPLATE_HEADER
|
||||
)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls,
|
||||
any_order=True)
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
def test_update_member_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware') as p:
|
||||
with self.member(pool_id=p['pool']['id']) as member:
|
||||
@ -432,16 +430,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.ANY, driver.TEMPLATE_HEADER
|
||||
)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(calls,
|
||||
any_order=True)
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
updated_member = self.plugin_instance.get_member(
|
||||
context.get_admin_context(),
|
||||
member['member']['id']
|
||||
)
|
||||
|
||||
# sleep to wait for the operation completion
|
||||
eventlet.greenthread.sleep(0)
|
||||
updated_member = self.plugin_instance.get_member(
|
||||
context.get_admin_context(),
|
||||
member['member']['id']
|
||||
@ -450,7 +446,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
constants.ACTIVE)
|
||||
|
||||
def test_update_member_without_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet():
|
||||
with self.pool(provider='radware') as pool:
|
||||
with self.member(pool_id=pool['pool']['id']) as member:
|
||||
@ -463,7 +458,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
constants.PENDING_UPDATE)
|
||||
|
||||
def test_delete_member_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.pool(provider='radware') as p:
|
||||
with self.member(pool_id=p['pool']['id'],
|
||||
@ -474,21 +468,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
# wait for being sure the member
|
||||
# Changed status from PENDING-CREATE
|
||||
# to ACTIVE
|
||||
self.rest_call_mock.reset_mock()
|
||||
eventlet.greenthread.sleep(1)
|
||||
|
||||
self.plugin_instance.delete_member(
|
||||
context.get_admin_context(),
|
||||
m['member']['id']
|
||||
)
|
||||
|
||||
args, kwargs = self.rest_call_mock.call_args
|
||||
name, args, kwargs = (
|
||||
self.driver_rest_call_mock.mock_calls[-2]
|
||||
)
|
||||
deletion_post_graph = str(args[2])
|
||||
|
||||
self.assertTrue(re.search(
|
||||
r'.*\'member_address_array\': \[\].*',
|
||||
deletion_post_graph
|
||||
))
|
||||
|
||||
calls = [
|
||||
mock.call(
|
||||
'POST', '/api/workflow/' + p['pool']['id'] +
|
||||
@ -496,17 +491,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.ANY, driver.TEMPLATE_HEADER
|
||||
)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
eventlet.greenthread.sleep(1)
|
||||
self.assertRaises(loadbalancer.MemberNotFound,
|
||||
self.plugin_instance.get_member,
|
||||
context.get_admin_context(),
|
||||
m['member']['id'])
|
||||
|
||||
def test_delete_member_without_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet():
|
||||
with self.pool(provider='radware') as p:
|
||||
with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
|
||||
@ -519,7 +512,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
m['member']['id'])
|
||||
|
||||
def test_create_hm_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.health_monitor() as hm:
|
||||
with self.pool(provider='radware') as pool:
|
||||
@ -543,11 +535,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.ANY, driver.TEMPLATE_HEADER
|
||||
)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
eventlet.greenthread.sleep(1)
|
||||
|
||||
phm = self.plugin_instance.get_pool_health_monitor(
|
||||
context.get_admin_context(),
|
||||
hm['health_monitor']['id'], pool['pool']['id']
|
||||
@ -555,7 +545,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
self.assertEqual(phm['status'], constants.ACTIVE)
|
||||
|
||||
def test_delete_pool_hm_with_vip(self):
|
||||
self.rest_call_mock.reset_mock()
|
||||
with self.subnet() as subnet:
|
||||
with self.health_monitor(no_delete=True) as hm:
|
||||
with self.pool(provider='radware') as pool:
|
||||
@ -565,21 +554,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
hm, pool['pool']['id']
|
||||
)
|
||||
|
||||
# Reset mock and
|
||||
# wait for being sure that status
|
||||
# changed from PENDING-CREATE
|
||||
# to ACTIVE
|
||||
self.rest_call_mock.reset_mock()
|
||||
eventlet.greenthread.sleep(1)
|
||||
|
||||
self.plugin_instance.delete_pool_health_monitor(
|
||||
context.get_admin_context(),
|
||||
hm['health_monitor']['id'],
|
||||
pool['pool']['id']
|
||||
)
|
||||
|
||||
eventlet.greenthread.sleep(1)
|
||||
name, args, kwargs = self.rest_call_mock.mock_calls[-2]
|
||||
name, args, kwargs = (
|
||||
self.driver_rest_call_mock.mock_calls[-2]
|
||||
)
|
||||
deletion_post_graph = str(args[2])
|
||||
|
||||
self.assertTrue(re.search(
|
||||
@ -594,7 +577,7 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
||||
mock.ANY, driver.TEMPLATE_HEADER
|
||||
)
|
||||
]
|
||||
self.rest_call_mock.assert_has_calls(
|
||||
self.driver_rest_call_mock.assert_has_calls(
|
||||
calls, any_order=True)
|
||||
|
||||
self.assertRaises(
|
||||
|
Loading…
x
Reference in New Issue
Block a user