Merge "Claim resources when updating cpu/memory"
This commit is contained in:
commit
32036c56d4
@ -21,6 +21,8 @@ from oslo_log import log as logging
|
||||
|
||||
from zun.common import exception
|
||||
from zun.common.i18n import _
|
||||
from zun import objects
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -179,3 +181,47 @@ class Claim(NopClaim):
|
||||
'%(unit)s < requested %(requested)s %(unit)s') %
|
||||
{'type': type_, 'free': free, 'unit': unit,
|
||||
'requested': requested})
|
||||
|
||||
|
||||
class UpdateClaim(Claim):
|
||||
"""A declaration that a compute host operation will require free resources.
|
||||
|
||||
Claims serve as marker objects that resources are being held until the
|
||||
update_available_resource audit process runs to do a full reconciliation
|
||||
of resource usage.
|
||||
|
||||
This information will be used to help keep the local compute hosts's
|
||||
ComputeNode model in sync to aid the scheduler in making efficient / more
|
||||
correct decisions with respect to host selection.
|
||||
"""
|
||||
|
||||
def __init__(self, context, new_container, old_container, tracker,
|
||||
resources, limits=None):
|
||||
# Stash a copy of the container at the current point of time
|
||||
self.new_container = new_container.obj_clone()
|
||||
self.old_container = old_container.obj_clone()
|
||||
super(UpdateClaim, self).__init__(
|
||||
context, new_container, tracker, resources,
|
||||
objects.ContainerPCIRequests(requests=[]), limits)
|
||||
|
||||
@property
|
||||
def memory(self):
|
||||
new_mem_str = "0"
|
||||
if self.new_container.memory:
|
||||
new_mem_str = self.new_container.memory[:-1]
|
||||
old_mem_str = "0"
|
||||
if self.old_container.memory:
|
||||
old_mem_str = self.old_container.memory[:-1]
|
||||
return int(new_mem_str) - int(old_mem_str)
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
new_cpu = self.new_container.cpu or 0
|
||||
old_cpu = self.old_container.cpu or 0
|
||||
return new_cpu - old_cpu
|
||||
|
||||
def abort(self):
|
||||
"""Requiring claimed resources has failed or been aborted."""
|
||||
LOG.debug("Aborting claim: %s", self)
|
||||
self.tracker.abort_container_update_claim(
|
||||
self.context, self.new_container, self.old_container)
|
||||
|
@ -115,6 +115,42 @@ class ComputeNodeTracker(object):
|
||||
|
||||
return claim
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def container_update_claim(self, context, new_container, old_container,
|
||||
limits=None):
|
||||
"""Indicate resources are needed for an upcoming container update.
|
||||
|
||||
This should be called before the compute node is about to perform
|
||||
an container update operation that will consume additional resources.
|
||||
|
||||
:param context: security context
|
||||
:param new_container: container to be updated to.
|
||||
:type new_container: zun.objects.container.Container object
|
||||
:param old_container: container to be updated from.
|
||||
:type old_container: zun.objects.container.Container object
|
||||
:param limits: Dict of oversubscription limits for memory, disk,
|
||||
and CPUs.
|
||||
:returns: A Claim ticket representing the reserved resources. It can
|
||||
be used to revert the resource usage if an error occurs
|
||||
during the container update.
|
||||
"""
|
||||
if (new_container.cpu == old_container.cpu and
|
||||
new_container.memory == old_container.memory):
|
||||
return claims.NopClaim()
|
||||
|
||||
# We should have the compute node created here, just get it.
|
||||
self.compute_node = self._get_compute_node(context)
|
||||
|
||||
claim = claims.UpdateClaim(context, new_container, old_container,
|
||||
self, self.compute_node, limits=limits)
|
||||
|
||||
self._update_usage_from_container_update(context, new_container,
|
||||
old_container)
|
||||
# persist changes to the compute node:
|
||||
self._update(self.compute_node)
|
||||
|
||||
return claim
|
||||
|
||||
def disabled(self, hostname):
|
||||
return not self.container_driver.node_is_available(hostname)
|
||||
|
||||
@ -152,6 +188,16 @@ class ComputeNodeTracker(object):
|
||||
# new container, update compute node resource usage:
|
||||
self._update_usage(self._get_usage_dict(container), sign=sign)
|
||||
|
||||
def _update_usage_from_container_update(self, context, new_container,
|
||||
old_container):
|
||||
"""Update usage for a container update."""
|
||||
uuid = new_container.uuid
|
||||
self.tracked_containers[uuid] = obj_base.obj_to_primitive(
|
||||
new_container)
|
||||
# update compute node resource usage
|
||||
self._update_usage(self._get_usage_dict(old_container), sign=-1)
|
||||
self._update_usage(self._get_usage_dict(new_container))
|
||||
|
||||
def _update_usage_from_containers(self, context, containers):
|
||||
"""Calculate resource usage based on container utilization.
|
||||
|
||||
@ -261,6 +307,14 @@ class ComputeNodeTracker(object):
|
||||
|
||||
self._update(self.compute_node)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def abort_container_update_claim(self, context, new_container,
|
||||
old_container):
|
||||
"""Remove usage from the given container."""
|
||||
self._update_usage_from_container_update(context, old_container,
|
||||
new_container)
|
||||
self._update(self.compute_node)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def remove_usage_from_container(self, context, container,
|
||||
is_removed=True):
|
||||
|
@ -629,15 +629,25 @@ class Manager(periodic_task.PeriodicTasks):
|
||||
@translate_exception
|
||||
def container_update(self, context, container, patch):
|
||||
LOG.debug('Updating a container: %s', container.uuid)
|
||||
old_container = container.obj_clone()
|
||||
# Update only the fields that have changed
|
||||
for field, patch_val in patch.items():
|
||||
if getattr(container, field) != patch_val:
|
||||
setattr(container, field, patch_val)
|
||||
|
||||
try:
|
||||
self.driver.update(context, container)
|
||||
container.save(context)
|
||||
rt = self._get_resource_tracker()
|
||||
# TODO(hongbin): limits should be populated by scheduler
|
||||
# FIXME(hongbin): rt.compute_node could be None
|
||||
limits = {'cpu': rt.compute_node.cpus,
|
||||
'memory': rt.compute_node.mem_total}
|
||||
with rt.container_update_claim(context, container, old_container,
|
||||
limits):
|
||||
self.driver.update(context, container)
|
||||
container.save(context)
|
||||
return container
|
||||
except exception.ResourcesUnavailable as e:
|
||||
raise exception.Conflict(six.text_type(e))
|
||||
except exception.DockerError as e:
|
||||
LOG.error("Error occurred while calling docker API: %s",
|
||||
six.text_type(e))
|
||||
|
@ -936,13 +936,20 @@ class TestContainerController(api_base.FunctionalTest):
|
||||
self.assertEqual(test_container['uuid'],
|
||||
response.json['uuid'])
|
||||
|
||||
@patch('zun.objects.ComputeNode.get_by_name')
|
||||
@patch('zun.compute.api.API.container_update')
|
||||
@patch('zun.objects.Container.get_by_uuid')
|
||||
def test_patch_by_uuid(self, mock_container_get_by_uuid, mock_update):
|
||||
def test_patch_by_uuid(self, mock_container_get_by_uuid, mock_update,
|
||||
mock_computenode):
|
||||
test_container = utils.get_test_container()
|
||||
test_container_obj = objects.Container(self.context, **test_container)
|
||||
mock_container_get_by_uuid.return_value = test_container_obj
|
||||
mock_update.return_value = test_container_obj
|
||||
test_host = utils.get_test_compute_node()
|
||||
numa = objects.numa.NUMATopology._from_dict(test_host['numa_topology'])
|
||||
test_host['numa_topology'] = numa
|
||||
test_host_obj = objects.ComputeNode(self.context, **test_host)
|
||||
mock_computenode.return_value = test_host_obj
|
||||
|
||||
params = {'cpu': 1}
|
||||
container_uuid = test_container.get('uuid')
|
||||
|
@ -32,9 +32,16 @@ from zun.tests.unit.db import utils
|
||||
|
||||
class FakeResourceTracker(object):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.compute_node = mock.MagicMock()
|
||||
|
||||
def container_claim(self, context, container, pci_requests, limits):
|
||||
return claims.NopClaim()
|
||||
|
||||
def container_update_claim(self, context, container, old_container,
|
||||
limits):
|
||||
return claims.NopClaim()
|
||||
|
||||
def remove_usage_from_container(self, contxt, context, is_remmoved=True):
|
||||
pass
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user