Add resource claim for zun.
Add resource claim when create container and resource tracker to update periodically. Now only track and claim memory and cpu resource. The following work need to do: 1, Remove claim resource when delete container. 2, Track the numa of compute node. 3, Filter scheduler set the Claim limits. 4, Handle the inventory and allocation. Change-Id: I54199742921ce71b58f23d968a966d4b269f51ef Close-Bug: #1688175 Partially-Implements: blueprint resource-claim
This commit is contained in:
parent
a44b05edae
commit
67eedb8072
@ -32,3 +32,4 @@ WSME>=0.8 # MIT
|
||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
docker-py>=1.8.1 # Apache-2.0
|
||||
netaddr!=0.7.16,>=0.7.13 # BSD
|
||||
|
@ -495,3 +495,7 @@ class InvalidWebsocketToken(ZunException):
|
||||
|
||||
class ValidationError(ZunException):
|
||||
msg_fmt = _("Validation error")
|
||||
|
||||
|
||||
class ResourcesUnavailable(ZunException):
|
||||
message = _("Insufficient compute resources: %(reason)s.")
|
||||
|
@ -20,6 +20,7 @@ import functools
|
||||
import mimetypes
|
||||
import time
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_context import context as common_context
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
@ -34,6 +35,7 @@ import zun.conf
|
||||
CONF = zun.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('zun-')
|
||||
|
||||
VALID_STATES = {
|
||||
'commit': [consts.RUNNING, consts.STOPPED, consts.PAUSED],
|
||||
|
170
zun/compute/claims.py
Normal file
170
zun/compute/claims.py
Normal file
@ -0,0 +1,170 @@
|
||||
# Copyright (c) 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Claim objects for use with resource tracking.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.common import exception
|
||||
from zun.common.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NopClaim(object):
|
||||
"""For use with compute drivers that do not support resource tracking."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.claimed_numa_topology = None
|
||||
|
||||
@property
|
||||
def memory(self):
|
||||
return 0
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
return 0
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
self.abort()
|
||||
|
||||
def abort(self):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "[Claim: %s memory, %.2f VCPU]" % (self.memory,
|
||||
self.cpu)
|
||||
|
||||
|
||||
class Claim(NopClaim):
|
||||
"""A declaration that a compute host operation will require free resources.
|
||||
|
||||
Claims serve as marker objects that resources are being held until the
|
||||
update_available_resource audit process runs to do a full reconciliation
|
||||
of resource usage.
|
||||
|
||||
This information will be used to help keep the local compute hosts's
|
||||
ComputeNode model in sync to aid the scheduler in making efficient / more
|
||||
correct decisions with respect to host selection.
|
||||
"""
|
||||
|
||||
def __init__(self, context, container, hostname, tracker, resources,
|
||||
limits=None):
|
||||
super(Claim, self).__init__()
|
||||
# Stash a copy of the container at the current point of time
|
||||
self.container = container.obj_clone()
|
||||
self.hostname = hostname
|
||||
self._numa_topology_loaded = False
|
||||
self.tracker = tracker
|
||||
self.context = context
|
||||
|
||||
# Check claim at constructor to avoid mess code
|
||||
# Raise exception ComputeResourcesUnavailable if claim failed
|
||||
self._claim_test(resources, limits)
|
||||
|
||||
@property
|
||||
def memory(self):
|
||||
return self.container.memory or 0
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
return self.container.cpu or 0
|
||||
|
||||
def abort(self):
|
||||
"""Requiring claimed resources has failed or been aborted."""
|
||||
LOG.debug("Aborting claim: %s", self)
|
||||
self.tracker.abort_container_claim(self.container, self.hostname)
|
||||
|
||||
def _claim_test(self, resources, limits=None):
|
||||
"""Test if this claim can be satisfied.
|
||||
|
||||
With given available resources and optional oversubscription limits
|
||||
|
||||
This should be called before the compute node actually consumes the
|
||||
resources required to execute the claim.
|
||||
|
||||
:param resources: available local compute node resources
|
||||
:returns: Return true if resources are available to claim.
|
||||
"""
|
||||
if not limits:
|
||||
limits = {}
|
||||
|
||||
# If an individual limit is None, the resource will be considered
|
||||
# unlimited:
|
||||
memory_limit = limits.get('memory')
|
||||
cpu_limit = limits.get('cpu')
|
||||
|
||||
LOG.info(_("Attempting claim: memory %(memory)s, "
|
||||
"cpu %(cpu).02f CPU"),
|
||||
{'memory': self.memory, 'cpu': self.cpu})
|
||||
|
||||
reasons = [self._test_memory(resources, memory_limit),
|
||||
self._test_cpu(resources, cpu_limit)]
|
||||
# TODO(Shunli): test numa here
|
||||
reasons = [r for r in reasons if r is not None]
|
||||
if len(reasons) > 0:
|
||||
raise exception.ResourcesUnavailable(reason="; ".join(reasons))
|
||||
|
||||
LOG.info('Claim successful')
|
||||
|
||||
def _test_memory(self, resources, limit):
|
||||
type_ = _("memory")
|
||||
unit = "MB"
|
||||
total = resources.mem_total
|
||||
used = resources.mem_used
|
||||
requested = self.memory
|
||||
|
||||
return self._test(type_, unit, total, used, requested, limit)
|
||||
|
||||
def _test_cpu(self, resources, limit):
|
||||
type_ = _("vcpu")
|
||||
unit = "VCPU"
|
||||
total = resources.cpus
|
||||
used = resources.cpu_used
|
||||
requested = self.cpu
|
||||
|
||||
return self._test(type_, unit, total, used, requested, limit)
|
||||
|
||||
def _test(self, type_, unit, total, used, requested, limit):
|
||||
"""Test if the type resource needed for a claim can be allocated."""
|
||||
|
||||
LOG.info(_('Total %(type)s: %(total)d %(unit)s, used: %(used).02f '
|
||||
'%(unit)s'),
|
||||
{'type': type_, 'total': total, 'unit': unit, 'used': used})
|
||||
|
||||
if limit is None:
|
||||
# treat resource as unlimited:
|
||||
LOG.info(_('%(type)s limit not specified, defaulting to '
|
||||
'unlimited'), {'type': type_})
|
||||
return
|
||||
|
||||
free = limit - used
|
||||
|
||||
# Oversubscribed resource policy info:
|
||||
LOG.info(_('%(type)s limit: %(limit).02f %(unit)s, '
|
||||
'free: %(free).02f %(unit)s'),
|
||||
{'type': type_, 'limit': limit, 'free': free, 'unit': unit})
|
||||
|
||||
if requested > free:
|
||||
return (_('Free %(type)s %(free).02f '
|
||||
'%(unit)s < requested %(requested)s %(unit)s') %
|
||||
{'type': type_, 'free': free, 'unit': unit,
|
||||
'requested': requested})
|
@ -12,18 +12,30 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.common import exception
|
||||
from zun.common import utils
|
||||
from zun.compute import claims
|
||||
from zun import objects
|
||||
from zun.objects import base as obj_base
|
||||
from zun.scheduler import client as scheduler_client
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
|
||||
|
||||
|
||||
class ComputeNodeTracker(object):
|
||||
def __init__(self, host, container_driver):
|
||||
self.host = host
|
||||
self.container_driver = container_driver
|
||||
self.compute_node = None
|
||||
self.tracked_containers = {}
|
||||
self.old_resources = collections.defaultdict(objects.ComputeNode)
|
||||
self.scheduler_client = scheduler_client.SchedulerClient()
|
||||
|
||||
def update_available_resources(self, context):
|
||||
# Check if the compute_node is already registered
|
||||
@ -37,7 +49,8 @@ class ComputeNodeTracker(object):
|
||||
node.create(context)
|
||||
LOG.info('Node created for :%(host)s', {'host': self.host})
|
||||
self.container_driver.get_available_resources(node)
|
||||
node.save()
|
||||
self.compute_node = node
|
||||
self._update_available_resource(context)
|
||||
# NOTE(sbiswas7): Consider removing the return statement if not needed
|
||||
return node
|
||||
|
||||
@ -48,3 +61,184 @@ class ComputeNodeTracker(object):
|
||||
except exception.ComputeNodeNotFound:
|
||||
LOG.warning("No compute node record for: %(host)s",
|
||||
{'host': self.host})
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def container_claim(self, context, container, hostname, limits=None):
|
||||
"""Indicate resources are needed for an upcoming container build.
|
||||
|
||||
This should be called before the compute node is about to perform
|
||||
an container build operation that will consume additional resources.
|
||||
|
||||
:param context: security context
|
||||
:param container: container to reserve resources for.
|
||||
:type container: zun.objects.container.Container object
|
||||
:param hostname: The zun hostname selected by the scheduler
|
||||
:param limits: Dict of oversubscription limits for memory, disk,
|
||||
and CPUs.
|
||||
:returns: A Claim ticket representing the reserved resources. It can
|
||||
be used to revert the resource usage if an error occurs
|
||||
during the container build.
|
||||
"""
|
||||
# No memory and cpu specified, no need to claim resource now.
|
||||
if not (container.memory or container.cpu):
|
||||
return claims.NopClaim()
|
||||
|
||||
# We should have the compute node created here, just get it.
|
||||
self.compute_node = self._get_compute_node(context)
|
||||
if self.disabled(hostname):
|
||||
self._set_container_host(container)
|
||||
return claims.NopClaim()
|
||||
|
||||
claim = claims.Claim(context, container, hostname, self,
|
||||
self.compute_node, limits=limits)
|
||||
|
||||
self._set_container_host(container)
|
||||
self._update_usage_from_container(container, hostname)
|
||||
# persist changes to the compute node:
|
||||
self._update(self.compute_node)
|
||||
|
||||
return claim
|
||||
|
||||
def disabled(self, hostname):
|
||||
return not self.container_driver.node_is_available(hostname)
|
||||
|
||||
def _set_container_host(self, container):
|
||||
"""Tag the container as belonging to this host.
|
||||
|
||||
This should be done while the COMPUTE_RESOURCES_SEMAPHORE is held so
|
||||
the resource claim will not be lost if the audit process starts.
|
||||
"""
|
||||
container.host = self.host
|
||||
container.save()
|
||||
|
||||
def _update_usage_from_container(self, container, hostname,
|
||||
is_removed=False):
|
||||
"""Update usage for a single container."""
|
||||
|
||||
uuid = container.uuid
|
||||
is_new_container = uuid not in self.tracked_containers
|
||||
is_removed_container = not is_new_container and is_removed
|
||||
|
||||
if is_new_container:
|
||||
self.tracked_containers[uuid] = \
|
||||
obj_base.obj_to_primitive(container)
|
||||
sign = 1
|
||||
|
||||
if is_removed_container:
|
||||
self.tracked_containers.pop(uuid)
|
||||
sign = -1
|
||||
|
||||
if is_new_container or is_removed_container:
|
||||
# TODO(Shunli): Handle pci, scheduler allocation here.
|
||||
|
||||
# new container, update compute node resource usage:
|
||||
self._update_usage(self._get_usage_dict(container),
|
||||
hostname, sign=sign)
|
||||
|
||||
def _update_usage_from_containers(self, context, containers, hostname):
|
||||
"""Calculate resource usage based on container utilization.
|
||||
|
||||
This is different than the conatiner daemon view as it will account
|
||||
for all containers assigned to the local compute host, even if they
|
||||
are not currently powered on.
|
||||
"""
|
||||
self.tracked_containers.clear()
|
||||
|
||||
cn = self.compute_node
|
||||
# set some initial values, reserve room for host
|
||||
cn.cpu_used = 0
|
||||
cn.mem_free = cn.mem_total
|
||||
cn.mem_used = 0
|
||||
cn.running_containers = 0
|
||||
|
||||
for cnt in containers:
|
||||
self._update_usage_from_container(cnt, hostname)
|
||||
|
||||
cn.mem_free = max(0, cn.mem_free)
|
||||
|
||||
def _update_usage(self, usage, hostname, sign=1):
|
||||
mem_usage = usage['memory']
|
||||
cpus_usage = usage.get('cpu', 0)
|
||||
|
||||
cn = self.compute_node
|
||||
cn.mem_used += sign * mem_usage
|
||||
cn.cpu_used += sign * cpus_usage
|
||||
|
||||
# free ram may be negative, depending on policy:
|
||||
cn.mem_free = cn.mem_total - cn.mem_used
|
||||
|
||||
cn.running_containers += sign * 1
|
||||
|
||||
# TODO(Shunli): Calculate the numa usage here
|
||||
|
||||
def _update(self, compute_node):
|
||||
if not self._resource_change(compute_node):
|
||||
return
|
||||
# Persist the stats to the Scheduler
|
||||
self.scheduler_client.update_resource(compute_node)
|
||||
# Update pci tracker here
|
||||
|
||||
def _resource_change(self, compute_node):
|
||||
"""Check to see if any resources have changed."""
|
||||
hostname = compute_node.hostname
|
||||
old_compute = self.old_resources[hostname]
|
||||
if not obj_base.obj_equal_prims(
|
||||
compute_node, old_compute, ['updated_at']):
|
||||
self.old_resources[hostname] = copy.deepcopy(compute_node)
|
||||
return True
|
||||
return False
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def _update_available_resource(self, context):
|
||||
|
||||
# if we could not init the compute node the tracker will be
|
||||
# disabled and we should quit now
|
||||
if self.disabled(self.host):
|
||||
return
|
||||
|
||||
# Grab all containers assigned to this node:
|
||||
containers = objects.Container.list_by_host(context, self.host)
|
||||
|
||||
# Now calculate usage based on container utilization:
|
||||
self._update_usage_from_containers(context, containers, self.host)
|
||||
|
||||
# No migration for docker, is there will be orphan container? Nova has.
|
||||
|
||||
cn = self.compute_node
|
||||
|
||||
# update the compute_node
|
||||
self._update(cn)
|
||||
LOG.debug('Compute_service record updated for %(host)s',
|
||||
{'host': self.host})
|
||||
|
||||
def _get_usage_dict(self, container, **updates):
|
||||
"""Make a usage dict _update methods expect.
|
||||
|
||||
Accepts an Container, and a set of updates.
|
||||
Converts the object to a dict and applies the updates.
|
||||
|
||||
:param container: container as an object
|
||||
:param updates: key-value pairs to update the passed object.
|
||||
|
||||
:returns: a dict with all the information from container updated
|
||||
with updates
|
||||
"""
|
||||
usage = {}
|
||||
# (Fixme): The Container.memory is string.
|
||||
memory = 0
|
||||
if container.memory:
|
||||
memory = int(container.memory[:-1])
|
||||
usage = {'memory': memory,
|
||||
'cpu': container.cpu or 0}
|
||||
|
||||
# update numa usage here
|
||||
|
||||
return usage
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def abort_container_claim(self, container, hostname):
|
||||
"""Remove usage from the given container."""
|
||||
self._update_usage_from_container(container, hostname,
|
||||
is_removed=True)
|
||||
|
||||
self._update(self.compute_node)
|
||||
|
@ -22,6 +22,7 @@ from zun.common import consts
|
||||
from zun.common import exception
|
||||
from zun.common import utils
|
||||
from zun.common.utils import translate_exception
|
||||
from zun.compute import compute_node_tracker
|
||||
import zun.conf
|
||||
from zun.container import driver
|
||||
from zun.image import driver as image_driver
|
||||
@ -37,6 +38,8 @@ class Manager(object):
|
||||
def __init__(self, container_driver=None):
|
||||
super(Manager, self).__init__()
|
||||
self.driver = driver.load_container_driver(container_driver)
|
||||
self.host = CONF.host
|
||||
self._resource_tracker = None
|
||||
|
||||
def _fail_container(self, context, container, error):
|
||||
container.status = consts.ERROR
|
||||
@ -134,11 +137,16 @@ class Manager(object):
|
||||
container.image_driver = image.get('driver')
|
||||
container.save(context)
|
||||
try:
|
||||
container = self.driver.create(context, container,
|
||||
sandbox_id, image)
|
||||
container.task_state = None
|
||||
container.save(context)
|
||||
return container
|
||||
# TODO(Shunli): No limits now, claim just update compute usage.
|
||||
limits = None
|
||||
rt = self._get_resource_tracker()
|
||||
with rt.container_claim(context, container, container.host,
|
||||
limits):
|
||||
container = self.driver.create(context, container,
|
||||
sandbox_id, image)
|
||||
container.task_state = None
|
||||
container.save(context)
|
||||
return container
|
||||
except exception.DockerError as e:
|
||||
with excutils.save_and_reraise_exception(reraise=reraise):
|
||||
LOG.error("Error occurred while calling Docker create API: %s",
|
||||
@ -579,3 +587,10 @@ class Manager(object):
|
||||
LOG.exception("Unexpected exception while searching image: %s",
|
||||
six.text_type(e))
|
||||
raise
|
||||
|
||||
def _get_resource_tracker(self):
|
||||
if not self._resource_tracker:
|
||||
rt = compute_node_tracker.ComputeNodeTracker(self.host,
|
||||
self.driver)
|
||||
self._resource_tracker = rt
|
||||
return self._resource_tracker
|
||||
|
@ -28,6 +28,7 @@ from zun.common import nova
|
||||
from zun.common import utils
|
||||
from zun.common.utils import check_container_id
|
||||
import zun.conf
|
||||
from zun.container.docker import host
|
||||
from zun.container.docker import utils as docker_utils
|
||||
from zun.container import driver
|
||||
from zun.network import network as zun_network
|
||||
@ -44,6 +45,7 @@ class DockerDriver(driver.ContainerDriver):
|
||||
|
||||
def __init__(self):
|
||||
super(DockerDriver, self).__init__()
|
||||
self._host = host.Host()
|
||||
|
||||
def load_image(self, image_path=None):
|
||||
with docker_utils.docker_client() as docker:
|
||||
@ -689,6 +691,9 @@ class DockerDriver(driver.ContainerDriver):
|
||||
network_api.add_security_groups_to_ports(
|
||||
sandbox, network, security_group_ids)
|
||||
|
||||
def get_available_nodes(self):
|
||||
return [self._host.get_hostname()]
|
||||
|
||||
|
||||
class NovaDockerDriver(DockerDriver):
|
||||
def add_security_group(self, context, sandbox_id, security_group):
|
||||
|
43
zun/container/docker/host.py
Normal file
43
zun/container/docker/host.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Manages information about the host.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.container.docker import utils as docker_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Host(object):
|
||||
|
||||
def __init__(self):
|
||||
self._hostname = None
|
||||
|
||||
def get_hostname(self):
|
||||
"""Returns the hostname of the host."""
|
||||
with docker_utils.docker_client() as docker:
|
||||
hostname = docker.info()['Name']
|
||||
if self._hostname is None:
|
||||
self._hostname = hostname
|
||||
elif hostname != self._hostname:
|
||||
self._hostname = hostname
|
||||
LOG.warning(_('Hostname has changed from %(old)s '
|
||||
'to %(new)s. A restart is required '
|
||||
'to take effect.'),
|
||||
{'old': self._hostname,
|
||||
'new': hostname})
|
||||
return self._hostname
|
@ -207,10 +207,11 @@ class ContainerDriver(object):
|
||||
numa_topo_obj = self.get_host_numa_topology()
|
||||
node.numa_topology = numa_topo_obj
|
||||
meminfo = self.get_host_mem()
|
||||
(mem_total, mem_free, mem_ava) = meminfo
|
||||
(mem_total, mem_free, mem_ava, mem_used) = meminfo
|
||||
node.mem_total = mem_total // units.Ki
|
||||
node.mem_free = mem_free // units.Ki
|
||||
node.mem_available = mem_ava // units.Ki
|
||||
node.mem_used = mem_used // units.Ki
|
||||
info = self.get_host_info()
|
||||
(total, running, paused, stopped, cpus,
|
||||
architecture, os_type, os, kernel_version, labels) = info
|
||||
@ -226,3 +227,9 @@ class ContainerDriver(object):
|
||||
cpu_used = self.get_cpu_used()
|
||||
node.cpu_used = cpu_used
|
||||
node.labels = labels
|
||||
|
||||
def node_is_available(self, nodename):
|
||||
"""Return whether this compute service manages a particular node."""
|
||||
if nodename in self.get_available_nodes():
|
||||
return True
|
||||
return False
|
||||
|
@ -65,4 +65,5 @@ class Host(object):
|
||||
idx32 = m.index('Cached:')
|
||||
cached = m[idx32+1]
|
||||
mem_ava = int(mem_free) + int(buffers) + int(cached)
|
||||
return int(mem_total), int(mem_free), int(mem_ava)
|
||||
mem_used = int(mem_total) - int(mem_ava)
|
||||
return int(mem_total), int(mem_free), int(mem_ava), int(mem_used)
|
||||
|
@ -0,0 +1,33 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""add mem used to compute node
|
||||
|
||||
Revision ID: 648c25faa0be
|
||||
Revises: 174cafda0857
|
||||
Create Date: 2017-05-24 09:39:42.021441
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '648c25faa0be'
|
||||
down_revision = '174cafda0857'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('compute_node',
|
||||
sa.Column('mem_used', sa.Integer(), nullable=False))
|
@ -276,6 +276,7 @@ class ComputeNode(Base):
|
||||
mem_total = Column(Integer, nullable=False, default=0)
|
||||
mem_free = Column(Integer, nullable=False, default=0)
|
||||
mem_available = Column(Integer, nullable=False, default=0)
|
||||
mem_used = Column(Integer, nullable=False, default=0)
|
||||
total_containers = Column(Integer, nullable=False, default=0)
|
||||
running_containers = Column(Integer, nullable=False, default=0)
|
||||
paused_containers = Column(Integer, nullable=False, default=0)
|
||||
|
@ -14,9 +14,12 @@
|
||||
|
||||
"""Zun common internal object model"""
|
||||
|
||||
import netaddr
|
||||
|
||||
from oslo_versionedobjects import base as ovoo_base
|
||||
from oslo_versionedobjects import fields as ovoo_fields
|
||||
|
||||
from zun.objects import fields as obj_fields
|
||||
|
||||
remotable_classmethod = ovoo_base.remotable_classmethod
|
||||
remotable = ovoo_base.remotable
|
||||
@ -57,3 +60,77 @@ class ZunPersistentObject(object):
|
||||
class ZunObjectSerializer(ovoo_base.VersionedObjectSerializer):
|
||||
# Base class to use for object hydration
|
||||
OBJ_BASE_CLASS = ZunObject
|
||||
|
||||
|
||||
class ObjectListBase(ovoo_base.ObjectListBase):
|
||||
# NOTE: These are for transition to using the oslo
|
||||
# base object and can be removed when we move to it.
|
||||
@classmethod
|
||||
def _obj_primitive_key(cls, field):
|
||||
return 'zun_object.%s' % field
|
||||
|
||||
@classmethod
|
||||
def _obj_primitive_field(cls, primitive, field,
|
||||
default=obj_fields.UnspecifiedDefault):
|
||||
key = cls._obj_primitive_key(field)
|
||||
if default == obj_fields.UnspecifiedDefault:
|
||||
return primitive[key]
|
||||
else:
|
||||
return primitive.get(key, default)
|
||||
|
||||
|
||||
def obj_to_primitive(obj):
|
||||
"""Recursively turn an object into a python primitive.
|
||||
|
||||
A ZunObject becomes a dict, and anything that implements ObjectListBase
|
||||
becomes a list.
|
||||
"""
|
||||
if isinstance(obj, ObjectListBase):
|
||||
return [obj_to_primitive(x) for x in obj]
|
||||
elif isinstance(obj, ZunObject):
|
||||
result = {}
|
||||
for key in obj.obj_fields:
|
||||
if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields:
|
||||
result[key] = obj_to_primitive(getattr(obj, key))
|
||||
return result
|
||||
elif isinstance(obj, netaddr.IPAddress):
|
||||
return str(obj)
|
||||
elif isinstance(obj, netaddr.IPNetwork):
|
||||
return str(obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
def obj_equal_prims(obj_1, obj_2, ignore=None):
|
||||
"""Compare two primitives for equivalence ignoring some keys.
|
||||
|
||||
This operation tests the primitives of two objects for equivalence.
|
||||
Object primitives may contain a list identifying fields that have been
|
||||
changed - this is ignored in the comparison. The ignore parameter lists
|
||||
any other keys to be ignored.
|
||||
|
||||
:param:obj1: The first object in the comparison
|
||||
:param:obj2: The second object in the comparison
|
||||
:param:ignore: A list of fields to ignore
|
||||
:returns: True if the primitives are equal ignoring changes
|
||||
and specified fields, otherwise False.
|
||||
"""
|
||||
|
||||
def _strip(prim, keys):
|
||||
if isinstance(prim, dict):
|
||||
for k in keys:
|
||||
prim.pop(k, None)
|
||||
for v in prim.values():
|
||||
_strip(v, keys)
|
||||
if isinstance(prim, list):
|
||||
for v in prim:
|
||||
_strip(v, keys)
|
||||
return prim
|
||||
|
||||
if ignore is not None:
|
||||
keys = ['zun_object.changes'] + ignore
|
||||
else:
|
||||
keys = ['zun_object.changes']
|
||||
prim_1 = _strip(obj_1.obj_to_primitive(), keys)
|
||||
prim_2 = _strip(obj_2.obj_to_primitive(), keys)
|
||||
return prim_1 == prim_2
|
||||
|
@ -25,7 +25,8 @@ class ComputeNode(base.ZunPersistentObject, base.ZunObject):
|
||||
# Version 1.3: Add cpus, cpu_used
|
||||
# Version 1.4: Add host operating system info
|
||||
# Version 1.5: Add host labels info
|
||||
VERSION = '1.5'
|
||||
# Version 1.6: Add mem_used to compute node
|
||||
VERSION = '1.6'
|
||||
|
||||
fields = {
|
||||
'uuid': fields.UUIDField(read_only=True, nullable=False),
|
||||
@ -34,6 +35,7 @@ class ComputeNode(base.ZunPersistentObject, base.ZunObject):
|
||||
'mem_total': fields.IntegerField(nullable=False),
|
||||
'mem_free': fields.IntegerField(nullable=False),
|
||||
'mem_available': fields.IntegerField(nullable=False),
|
||||
'mem_used': fields.IntegerField(nullable=False),
|
||||
'total_containers': fields.IntegerField(nullable=False),
|
||||
'running_containers': fields.IntegerField(nullable=False),
|
||||
'paused_containers': fields.IntegerField(nullable=False),
|
||||
|
@ -17,6 +17,8 @@ from oslo_versionedobjects import fields
|
||||
|
||||
from zun.common import consts
|
||||
|
||||
UnspecifiedDefault = fields.UnspecifiedDefault
|
||||
|
||||
|
||||
class ContainerStatus(fields.Enum):
|
||||
ALL = consts.CONTAINER_STATUSES
|
||||
|
@ -31,3 +31,7 @@ class SchedulerClient(object):
|
||||
|
||||
def select_destinations(self, context, containers):
|
||||
return self.driver.select_destinations(context, containers)
|
||||
|
||||
def update_resource(self, node):
|
||||
node.save()
|
||||
# TODO(Shunli): Update the inventory here
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
from io import StringIO
|
||||
from zun.common import consts
|
||||
from zun.common import exception
|
||||
from zun.compute import claims
|
||||
from zun.compute import manager
|
||||
import zun.conf
|
||||
from zun.objects.container import Container
|
||||
@ -26,6 +27,12 @@ from zun.tests.unit.container.fake_driver import FakeDriver as fake_driver
|
||||
from zun.tests.unit.db import utils
|
||||
|
||||
|
||||
class FakeResourceTracker(object):
|
||||
|
||||
def container_claim(self, context, container, host, limits):
|
||||
return claims.NopClaim()
|
||||
|
||||
|
||||
class TestManager(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -54,6 +61,7 @@ class TestManager(base.TestCase):
|
||||
image = {'image': 'repo', 'path': 'out_path', 'driver': 'glance'}
|
||||
mock_pull.return_value = image, False
|
||||
mock_create_sandbox.return_value = 'fake_id'
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_create(self.context, container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
mock_pull.assert_any_call(self.context, container.image, 'latest',
|
||||
@ -115,6 +123,7 @@ class TestManager(base.TestCase):
|
||||
mock_pull.return_value = image, False
|
||||
mock_create.side_effect = exception.DockerError("Creation Failed")
|
||||
mock_create_sandbox.return_value = mock.MagicMock()
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_create(self.context, container)
|
||||
mock_fail.assert_called_once_with(self.context,
|
||||
container, "Creation Failed")
|
||||
@ -130,6 +139,7 @@ class TestManager(base.TestCase):
|
||||
mock_create.return_value = container
|
||||
mock_pull.return_value = image, False
|
||||
container.status = 'Stopped'
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_run(self.context, container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
mock_pull.assert_any_call(self.context, container.image, 'latest',
|
||||
@ -197,6 +207,7 @@ class TestManager(base.TestCase):
|
||||
mock_pull.return_value = {'name': 'nginx', 'path': None}, True
|
||||
mock_create.side_effect = exception.DockerError(
|
||||
message="Docker Error occurred")
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_run(self.context,
|
||||
container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
|
@ -606,7 +606,8 @@ class TestNovaDockerDriver(base.DriverTestCase):
|
||||
self.driver = DockerDriver()
|
||||
mock_output.return_value = LSCPU_ON
|
||||
conf.CONF.set_override('floating_cpu_set', "0")
|
||||
mock_mem.return_value = (100 * units.Ki, 50 * units.Ki, 50 * units.Ki)
|
||||
mock_mem.return_value = (100 * units.Ki, 50 * units.Ki, 50 * units.Ki,
|
||||
50 * units.Ki)
|
||||
mock_info.return_value = (10, 8, 0, 2, 48, 'x86_64', 'linux',
|
||||
'CentOS', '3.10.0-123',
|
||||
{'dev.type': 'product'})
|
||||
|
@ -71,4 +71,5 @@ class TestOSCapability(base.BaseTestCase):
|
||||
with mock.patch.object(six.moves.builtins, "open", m_open,
|
||||
create=True):
|
||||
output = os_capability_linux.LinuxHost().get_host_mem()
|
||||
self.assertEqual((3882464, 3514608, 3556372), output)
|
||||
used = (3882464 - 3556372)
|
||||
self.assertEqual((3882464, 3514608, 3556372, used), output)
|
||||
|
@ -265,9 +265,10 @@ def get_test_compute_node(**kw):
|
||||
'uuid': kw.get('uuid', '24a5b17a-f2eb-4556-89db-5f4169d13982'),
|
||||
'hostname': kw.get('hostname', 'localhost'),
|
||||
'numa_topology': kw.get('numa_topology', get_test_numa_topology()),
|
||||
'mem_total': kw.get('mem_total', 123),
|
||||
'mem_free': kw.get('mem_free', 456),
|
||||
'mem_available': kw.get('mem_available', 789),
|
||||
'mem_total': kw.get('mem_total', 1024),
|
||||
'mem_free': kw.get('mem_free', 512),
|
||||
'mem_available': kw.get('mem_available', 512),
|
||||
'mem_used': kw.get('mem_used', 512),
|
||||
'total_containers': kw.get('total_containers', 10),
|
||||
'running_containers': kw.get('running_containers', 8),
|
||||
'paused_containers': kw.get('paused_containers', 0),
|
||||
|
@ -362,7 +362,7 @@ object_data = {
|
||||
'ResourceClass': '1.1-d661c7675b3cd5b8c3618b68ba64324e',
|
||||
'ResourceProvider': '1.0-92b427359d5a4cf9ec6c72cbe630ee24',
|
||||
'ZunService': '1.1-b1549134bfd5271daec417ca8cabc77e',
|
||||
'ComputeNode': '1.5-0d6ff86fbdc03859b77f1092e785ce10',
|
||||
'ComputeNode': '1.6-33a173b969781644a95ea2925eb5cca2',
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user