From a99bfe784bafbc0a34dc971840c06d80d3afc228 Mon Sep 17 00:00:00 2001
From: Feng Shengqin <feng.shengqin@zte.com.cn>
Date: Mon, 8 Jan 2018 13:34:30 +0800
Subject: [PATCH] Claim resources when updating cpu/memory

Currently, we update the container's cpu/memory without a resource
claim. This leads to issue that a container can be updated to
get more resources than available. This patch adds a step to
claim cpu/memory when updating the container.

Co-Authored-By: Hongbin Lu <hongbin034@gmail.com>

Partial-Implements: blueprint resize-via-scheduler
Change-Id: I35646123757cf9b7e0be7dcb4ea6b3d18cd00ecc
---
 zun/compute/claims.py                         | 46 ++++++++++++++++
 zun/compute/compute_node_tracker.py           | 54 +++++++++++++++++++
 zun/compute/manager.py                        | 14 ++++-
 .../api/controllers/v1/test_containers.py     |  9 +++-
 .../unit/compute/test_compute_manager.py      |  7 +++
 5 files changed, 127 insertions(+), 3 deletions(-)

diff --git a/zun/compute/claims.py b/zun/compute/claims.py
index e848f7ee6..d3c08e054 100644
--- a/zun/compute/claims.py
+++ b/zun/compute/claims.py
@@ -21,6 +21,8 @@ from oslo_log import log as logging
 
 from zun.common import exception
 from zun.common.i18n import _
+from zun import objects
+
 
 LOG = logging.getLogger(__name__)
 
@@ -179,3 +181,47 @@ class Claim(NopClaim):
                       '%(unit)s < requested %(requested)s %(unit)s') %
                     {'type': type_, 'free': free, 'unit': unit,
                      'requested': requested})
+
+
+class UpdateClaim(Claim):
+    """A declaration that a compute host operation will require free resources.
+
+    Claims serve as marker objects that resources are being held until the
+    update_available_resource audit process runs to do a full reconciliation
+    of resource usage.
+
+    This information will be used to help keep the local compute hosts's
+    ComputeNode model in sync to aid the scheduler in making efficient / more
+    correct decisions with respect to host selection.
+    """
+
+    def __init__(self, context, new_container, old_container, tracker,
+                 resources, limits=None):
+        # Stash a copy of the container at the current point of time
+        self.new_container = new_container.obj_clone()
+        self.old_container = old_container.obj_clone()
+        super(UpdateClaim, self).__init__(
+            context, new_container, tracker, resources,
+            objects.ContainerPCIRequests(requests=[]), limits)
+
+    @property
+    def memory(self):
+        new_mem_str = "0"
+        if self.new_container.memory:
+            new_mem_str = self.new_container.memory[:-1]
+        old_mem_str = "0"
+        if self.old_container.memory:
+            old_mem_str = self.old_container.memory[:-1]
+        return int(new_mem_str) - int(old_mem_str)
+
+    @property
+    def cpu(self):
+        new_cpu = self.new_container.cpu or 0
+        old_cpu = self.old_container.cpu or 0
+        return new_cpu - old_cpu
+
+    def abort(self):
+        """Requiring claimed resources has failed or been aborted."""
+        LOG.debug("Aborting claim: %s", self)
+        self.tracker.abort_container_update_claim(
+            self.context, self.new_container, self.old_container)
diff --git a/zun/compute/compute_node_tracker.py b/zun/compute/compute_node_tracker.py
index 9df8bae53..ff7037bb7 100644
--- a/zun/compute/compute_node_tracker.py
+++ b/zun/compute/compute_node_tracker.py
@@ -115,6 +115,42 @@ class ComputeNodeTracker(object):
 
         return claim
 
+    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+    def container_update_claim(self, context, new_container, old_container,
+                               limits=None):
+        """Indicate resources are needed for an upcoming container update.
+
+        This should be called before the compute node is about to perform
+        an container update operation that will consume additional resources.
+
+        :param context: security context
+        :param new_container: container to be updated to.
+        :type new_container: zun.objects.container.Container object
+        :param old_container: container to be updated from.
+        :type old_container: zun.objects.container.Container object
+        :param limits: Dict of oversubscription limits for memory, disk,
+                       and CPUs.
+        :returns: A Claim ticket representing the reserved resources.  It can
+                  be used to revert the resource usage if an error occurs
+                  during the container update.
+        """
+        if (new_container.cpu == old_container.cpu and
+                new_container.memory == old_container.memory):
+            return claims.NopClaim()
+
+        # We should have the compute node created here, just get it.
+        self.compute_node = self._get_compute_node(context)
+
+        claim = claims.UpdateClaim(context, new_container, old_container,
+                                   self, self.compute_node, limits=limits)
+
+        self._update_usage_from_container_update(context, new_container,
+                                                 old_container)
+        # persist changes to the compute node:
+        self._update(self.compute_node)
+
+        return claim
+
     def disabled(self, hostname):
         return not self.container_driver.node_is_available(hostname)
 
@@ -152,6 +188,16 @@ class ComputeNodeTracker(object):
             # new container, update compute node resource usage:
             self._update_usage(self._get_usage_dict(container), sign=sign)
 
+    def _update_usage_from_container_update(self, context, new_container,
+                                            old_container):
+        """Update usage for a container update."""
+        uuid = new_container.uuid
+        self.tracked_containers[uuid] = obj_base.obj_to_primitive(
+            new_container)
+        # update compute node resource usage
+        self._update_usage(self._get_usage_dict(old_container), sign=-1)
+        self._update_usage(self._get_usage_dict(new_container))
+
     def _update_usage_from_containers(self, context, containers):
         """Calculate resource usage based on container utilization.
 
@@ -261,6 +307,14 @@ class ComputeNodeTracker(object):
 
         self._update(self.compute_node)
 
+    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
+    def abort_container_update_claim(self, context, new_container,
+                                     old_container):
+        """Remove usage from the given container."""
+        self._update_usage_from_container_update(context, old_container,
+                                                 new_container)
+        self._update(self.compute_node)
+
     @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
     def remove_usage_from_container(self, context, container,
                                     is_removed=True):
diff --git a/zun/compute/manager.py b/zun/compute/manager.py
index 90738cf5c..27263ed4a 100644
--- a/zun/compute/manager.py
+++ b/zun/compute/manager.py
@@ -652,15 +652,25 @@ class Manager(periodic_task.PeriodicTasks):
     @translate_exception
     def container_update(self, context, container, patch):
         LOG.debug('Updating a container: %s', container.uuid)
+        old_container = container.obj_clone()
         # Update only the fields that have changed
         for field, patch_val in patch.items():
             if getattr(container, field) != patch_val:
                 setattr(container, field, patch_val)
 
         try:
-            self.driver.update(context, container)
-            container.save(context)
+            rt = self._get_resource_tracker()
+            # TODO(hongbin): limits should be populated by scheduler
+            # FIXME(hongbin): rt.compute_node could be None
+            limits = {'cpu': rt.compute_node.cpus,
+                      'memory': rt.compute_node.mem_total}
+            with rt.container_update_claim(context, container, old_container,
+                                           limits):
+                self.driver.update(context, container)
+                container.save(context)
             return container
+        except exception.ResourcesUnavailable as e:
+            raise exception.Conflict(six.text_type(e))
         except exception.DockerError as e:
             LOG.error("Error occurred while calling docker API: %s",
                       six.text_type(e))
diff --git a/zun/tests/unit/api/controllers/v1/test_containers.py b/zun/tests/unit/api/controllers/v1/test_containers.py
index 88a92f572..0ac506d39 100644
--- a/zun/tests/unit/api/controllers/v1/test_containers.py
+++ b/zun/tests/unit/api/controllers/v1/test_containers.py
@@ -936,13 +936,20 @@ class TestContainerController(api_base.FunctionalTest):
         self.assertEqual(test_container['uuid'],
                          response.json['uuid'])
 
+    @patch('zun.objects.ComputeNode.get_by_name')
     @patch('zun.compute.api.API.container_update')
     @patch('zun.objects.Container.get_by_uuid')
-    def test_patch_by_uuid(self, mock_container_get_by_uuid, mock_update):
+    def test_patch_by_uuid(self, mock_container_get_by_uuid, mock_update,
+                           mock_computenode):
         test_container = utils.get_test_container()
         test_container_obj = objects.Container(self.context, **test_container)
         mock_container_get_by_uuid.return_value = test_container_obj
         mock_update.return_value = test_container_obj
+        test_host = utils.get_test_compute_node()
+        numa = objects.numa.NUMATopology._from_dict(test_host['numa_topology'])
+        test_host['numa_topology'] = numa
+        test_host_obj = objects.ComputeNode(self.context, **test_host)
+        mock_computenode.return_value = test_host_obj
 
         params = {'cpu': 1}
         container_uuid = test_container.get('uuid')
diff --git a/zun/tests/unit/compute/test_compute_manager.py b/zun/tests/unit/compute/test_compute_manager.py
index b1f0008cf..2fadee1c4 100644
--- a/zun/tests/unit/compute/test_compute_manager.py
+++ b/zun/tests/unit/compute/test_compute_manager.py
@@ -32,9 +32,16 @@ from zun.tests.unit.db import utils
 
 class FakeResourceTracker(object):
 
+    def __init__(self, *args, **kwargs):
+        self.compute_node = mock.MagicMock()
+
     def container_claim(self, context, container, pci_requests, limits):
         return claims.NopClaim()
 
+    def container_update_claim(self, context, container, old_container,
+                               limits):
+        return claims.NopClaim()
+
     def remove_usage_from_container(self, contxt, context, is_remmoved=True):
         pass