Merge "Convergence: Cancel message"

This commit is contained in:
Jenkins 2016-07-06 17:36:59 +00:00 committed by Gerrit Code Review
commit 6f96e75bb4
4 changed files with 62 additions and 10 deletions

View File

@ -40,7 +40,7 @@ class WorkerService(service.Service):
or expect replies from these messages.
"""
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
def __init__(self,
host,
@ -60,7 +60,7 @@ class WorkerService(service.Service):
def start(self):
target = oslo_messaging.Target(
version=self.RPC_API_VERSION,
server=self.host,
server=self.engine_id,
topic=self.topic)
self.target = target
LOG.info(_LI("Starting %(topic)s (%(version)s) in engine %(engine)s."),
@ -112,3 +112,13 @@ class WorkerService(service.Service):
cr.check(cnxt, resource_id, current_traversal, resource_data,
is_update, adopt_stack_data, rsrc, stack)
@context.request_context
def cancel_check_resource(self, cnxt, stack_id):
"""Cancel check_resource for given stack.
All the workers running for the given stack will be
cancelled.
"""
# TODO(ananta): Implement cancel check-resource
LOG.debug('Cancelling workers for stack [%s]', stack_id)

View File

@ -27,6 +27,7 @@ class WorkerClient(object):
1.0 - Initial version.
1.1 - Added check_resource.
1.2 - Add adopt data argument to check_resource.
1.3 - Added cancel_check_resource API.
"""
BASE_RPC_API_VERSION = '1.0'
@ -56,3 +57,19 @@ class WorkerClient(object):
current_traversal=current_traversal, data=data,
is_update=is_update, adopt_stack_data=adopt_stack_data),
version='1.2')
def cancel_check_resource(self, ctxt, stack_id, engine_id):
"""Send check-resource cancel message.
Sends a cancel message to given heat engine worker.
"""
_client = messaging.get_rpc_client(
topic=worker_api.TOPIC,
version=self.BASE_RPC_API_VERSION,
server=engine_id)
method, kwargs = self.make_msg('cancel_check_resource',
stack_id=stack_id)
cl = _client.prepare(version='1.3')
cl.cast(ctxt, method, **kwargs)

View File

@ -25,7 +25,7 @@ class WorkerServiceTest(common.HeatTestCase):
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.2',
'1.3',
worker.WorkerService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
@ -52,7 +52,7 @@ class WorkerServiceTest(common.HeatTestCase):
# Make sure target is called with proper parameters
target_class.assert_called_once_with(
version=worker.WorkerService.RPC_API_VERSION,
server=self.worker.host,
server=self.worker.engine_id,
topic=self.worker.topic)
# Make sure rpc server creation with proper target

View File

@ -24,6 +24,7 @@ class WorkerClientTest(common.HeatTestCase):
def setUp(self):
super(WorkerClientTest, self).setUp()
self.fake_engine_id = 'fake-engine-id'
def test_make_msg(self):
method = 'sample_method'
@ -44,8 +45,8 @@ class WorkerClientTest(common.HeatTestCase):
worker_client = rpc_client.WorkerClient()
rpc_client_method.assert_called_once_with(
version=rpc_client.WorkerClient.BASE_RPC_API_VERSION,
topic=rpc_api.TOPIC
)
topic=rpc_api.TOPIC)
self.assertEqual(mock_rpc_client,
worker_client._client,
"Failed to create RPC client")
@ -60,15 +61,39 @@ class WorkerClientTest(common.HeatTestCase):
# go with default version
return_value = worker_client.cast(mock_cnxt, msg)
self.assertIsNone(return_value)
mock_rpc_client.cast.assert_called_once_with(mock_cnxt,
method,
**kwargs)
mock_rpc_client.cast.assert_called_with(mock_cnxt,
method,
**kwargs)
# Check cast in given version
version = '1.2'
version = '1.3'
return_value = worker_client.cast(mock_cnxt, msg, version)
self.assertIsNone(return_value)
mock_rpc_client.prepare.assert_called_once_with(version=version)
mock_rpc_client.cast.assert_called_once_with(mock_cnxt,
method,
**kwargs)
def test_cancel_check_resource(self):
mock_stack_id = 'dummy-stack-id'
mock_cnxt = mock.Mock()
method = 'cancel_check_resource'
kwargs = {'stack_id': mock_stack_id}
mock_rpc_client = mock.MagicMock()
mock_cast = mock.MagicMock()
with mock.patch('heat.common.messaging.get_rpc_client') as mock_grc:
mock_grc.return_value = mock_rpc_client
mock_rpc_client.prepare.return_value = mock_cast
wc = rpc_client.WorkerClient()
ret_val = wc.cancel_check_resource(mock_cnxt, mock_stack_id,
self.fake_engine_id)
# ensure called with fanout=True
mock_grc.assert_called_with(
version=wc.BASE_RPC_API_VERSION,
topic=rpc_api.TOPIC,
server=self.fake_engine_id)
self.assertIsNone(ret_val)
mock_rpc_client.prepare.assert_called_with(
version='1.3')
# ensure correct rpc method is called
mock_cast.cast.assert_called_with(mock_cnxt, method, **kwargs)