diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 509208e30f..d8d2645918 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -40,7 +40,7 @@ class WorkerService(service.Service): or expect replies from these messages. """ - RPC_API_VERSION = '1.2' + RPC_API_VERSION = '1.3' def __init__(self, host, @@ -60,7 +60,7 @@ class WorkerService(service.Service): def start(self): target = oslo_messaging.Target( version=self.RPC_API_VERSION, - server=self.host, + server=self.engine_id, topic=self.topic) self.target = target LOG.info(_LI("Starting %(topic)s (%(version)s) in engine %(engine)s."), @@ -112,3 +112,13 @@ class WorkerService(service.Service): cr.check(cnxt, resource_id, current_traversal, resource_data, is_update, adopt_stack_data, rsrc, stack) + + @context.request_context + def cancel_check_resource(self, cnxt, stack_id): + """Cancel check_resource for given stack. + + All the workers running for the given stack will be + cancelled. + """ + # TODO(ananta): Implement cancel check-resource + LOG.debug('Cancelling workers for stack [%s]', stack_id) diff --git a/heat/rpc/worker_client.py b/heat/rpc/worker_client.py index d1224de591..d0256492e8 100644 --- a/heat/rpc/worker_client.py +++ b/heat/rpc/worker_client.py @@ -27,6 +27,7 @@ class WorkerClient(object): 1.0 - Initial version. 1.1 - Added check_resource. 1.2 - Add adopt data argument to check_resource. + 1.3 - Added cancel_check_resource API. """ BASE_RPC_API_VERSION = '1.0' @@ -56,3 +57,19 @@ class WorkerClient(object): current_traversal=current_traversal, data=data, is_update=is_update, adopt_stack_data=adopt_stack_data), version='1.2') + + def cancel_check_resource(self, ctxt, stack_id, engine_id): + """Send check-resource cancel message. + + Sends a cancel message to given heat engine worker. + """ + + _client = messaging.get_rpc_client( + topic=worker_api.TOPIC, + version=self.BASE_RPC_API_VERSION, + server=engine_id) + + method, kwargs = self.make_msg('cancel_check_resource', + stack_id=stack_id) + cl = _client.prepare(version='1.3') + cl.cast(ctxt, method, **kwargs) diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index a93b45102a..684285131e 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -25,7 +25,7 @@ class WorkerServiceTest(common.HeatTestCase): def test_make_sure_rpc_version(self): self.assertEqual( - '1.2', + '1.3', worker.WorkerService.RPC_API_VERSION, ('RPC version is changed, please update this test to new version ' 'and make sure additional test cases are added for RPC APIs ' @@ -52,7 +52,7 @@ class WorkerServiceTest(common.HeatTestCase): # Make sure target is called with proper parameters target_class.assert_called_once_with( version=worker.WorkerService.RPC_API_VERSION, - server=self.worker.host, + server=self.worker.engine_id, topic=self.worker.topic) # Make sure rpc server creation with proper target diff --git a/heat/tests/test_rpc_worker_client.py b/heat/tests/test_rpc_worker_client.py index 182917bff9..730e189a20 100644 --- a/heat/tests/test_rpc_worker_client.py +++ b/heat/tests/test_rpc_worker_client.py @@ -24,6 +24,7 @@ class WorkerClientTest(common.HeatTestCase): def setUp(self): super(WorkerClientTest, self).setUp() + self.fake_engine_id = 'fake-engine-id' def test_make_msg(self): method = 'sample_method' @@ -44,8 +45,8 @@ class WorkerClientTest(common.HeatTestCase): worker_client = rpc_client.WorkerClient() rpc_client_method.assert_called_once_with( version=rpc_client.WorkerClient.BASE_RPC_API_VERSION, - topic=rpc_api.TOPIC - ) + topic=rpc_api.TOPIC) + self.assertEqual(mock_rpc_client, worker_client._client, "Failed to create RPC client") @@ -60,15 +61,39 @@ class WorkerClientTest(common.HeatTestCase): # go with default version return_value = worker_client.cast(mock_cnxt, msg) self.assertIsNone(return_value) - mock_rpc_client.cast.assert_called_once_with(mock_cnxt, - method, - **kwargs) + mock_rpc_client.cast.assert_called_with(mock_cnxt, + method, + **kwargs) # Check cast in given version - version = '1.2' + version = '1.3' return_value = worker_client.cast(mock_cnxt, msg, version) self.assertIsNone(return_value) mock_rpc_client.prepare.assert_called_once_with(version=version) mock_rpc_client.cast.assert_called_once_with(mock_cnxt, method, **kwargs) + + def test_cancel_check_resource(self): + mock_stack_id = 'dummy-stack-id' + mock_cnxt = mock.Mock() + method = 'cancel_check_resource' + kwargs = {'stack_id': mock_stack_id} + mock_rpc_client = mock.MagicMock() + mock_cast = mock.MagicMock() + with mock.patch('heat.common.messaging.get_rpc_client') as mock_grc: + mock_grc.return_value = mock_rpc_client + mock_rpc_client.prepare.return_value = mock_cast + wc = rpc_client.WorkerClient() + ret_val = wc.cancel_check_resource(mock_cnxt, mock_stack_id, + self.fake_engine_id) + # ensure called with fanout=True + mock_grc.assert_called_with( + version=wc.BASE_RPC_API_VERSION, + topic=rpc_api.TOPIC, + server=self.fake_engine_id) + self.assertIsNone(ret_val) + mock_rpc_client.prepare.assert_called_with( + version='1.3') + # ensure correct rpc method is called + mock_cast.cast.assert_called_with(mock_cnxt, method, **kwargs)