Merge "Add resource claim for zun."
This commit is contained in:
commit
950e4b65f3
@ -32,3 +32,4 @@ WSME>=0.8 # MIT
|
||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
docker-py>=1.8.1 # Apache-2.0
|
||||
netaddr!=0.7.16,>=0.7.13 # BSD
|
||||
|
@ -495,3 +495,7 @@ class InvalidWebsocketToken(ZunException):
|
||||
|
||||
class ValidationError(ZunException):
|
||||
msg_fmt = _("Validation error")
|
||||
|
||||
|
||||
class ResourcesUnavailable(ZunException):
|
||||
message = _("Insufficient compute resources: %(reason)s.")
|
||||
|
@ -20,6 +20,7 @@ import functools
|
||||
import mimetypes
|
||||
import time
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_context import context as common_context
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
@ -34,6 +35,7 @@ import zun.conf
|
||||
CONF = zun.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('zun-')
|
||||
|
||||
VALID_STATES = {
|
||||
'commit': [consts.RUNNING, consts.STOPPED, consts.PAUSED],
|
||||
|
170
zun/compute/claims.py
Normal file
170
zun/compute/claims.py
Normal file
@ -0,0 +1,170 @@
|
||||
# Copyright (c) 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Claim objects for use with resource tracking.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.common import exception
|
||||
from zun.common.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NopClaim(object):
|
||||
"""For use with compute drivers that do not support resource tracking."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.claimed_numa_topology = None
|
||||
|
||||
@property
|
||||
def memory(self):
|
||||
return 0
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
return 0
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
self.abort()
|
||||
|
||||
def abort(self):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "[Claim: %s memory, %.2f VCPU]" % (self.memory,
|
||||
self.cpu)
|
||||
|
||||
|
||||
class Claim(NopClaim):
|
||||
"""A declaration that a compute host operation will require free resources.
|
||||
|
||||
Claims serve as marker objects that resources are being held until the
|
||||
update_available_resource audit process runs to do a full reconciliation
|
||||
of resource usage.
|
||||
|
||||
This information will be used to help keep the local compute hosts's
|
||||
ComputeNode model in sync to aid the scheduler in making efficient / more
|
||||
correct decisions with respect to host selection.
|
||||
"""
|
||||
|
||||
def __init__(self, context, container, hostname, tracker, resources,
|
||||
limits=None):
|
||||
super(Claim, self).__init__()
|
||||
# Stash a copy of the container at the current point of time
|
||||
self.container = container.obj_clone()
|
||||
self.hostname = hostname
|
||||
self._numa_topology_loaded = False
|
||||
self.tracker = tracker
|
||||
self.context = context
|
||||
|
||||
# Check claim at constructor to avoid mess code
|
||||
# Raise exception ComputeResourcesUnavailable if claim failed
|
||||
self._claim_test(resources, limits)
|
||||
|
||||
@property
|
||||
def memory(self):
|
||||
return self.container.memory or 0
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
return self.container.cpu or 0
|
||||
|
||||
def abort(self):
|
||||
"""Requiring claimed resources has failed or been aborted."""
|
||||
LOG.debug("Aborting claim: %s", self)
|
||||
self.tracker.abort_container_claim(self.container, self.hostname)
|
||||
|
||||
def _claim_test(self, resources, limits=None):
|
||||
"""Test if this claim can be satisfied.
|
||||
|
||||
With given available resources and optional oversubscription limits
|
||||
|
||||
This should be called before the compute node actually consumes the
|
||||
resources required to execute the claim.
|
||||
|
||||
:param resources: available local compute node resources
|
||||
:returns: Return true if resources are available to claim.
|
||||
"""
|
||||
if not limits:
|
||||
limits = {}
|
||||
|
||||
# If an individual limit is None, the resource will be considered
|
||||
# unlimited:
|
||||
memory_limit = limits.get('memory')
|
||||
cpu_limit = limits.get('cpu')
|
||||
|
||||
LOG.info(_("Attempting claim: memory %(memory)s, "
|
||||
"cpu %(cpu).02f CPU"),
|
||||
{'memory': self.memory, 'cpu': self.cpu})
|
||||
|
||||
reasons = [self._test_memory(resources, memory_limit),
|
||||
self._test_cpu(resources, cpu_limit)]
|
||||
# TODO(Shunli): test numa here
|
||||
reasons = [r for r in reasons if r is not None]
|
||||
if len(reasons) > 0:
|
||||
raise exception.ResourcesUnavailable(reason="; ".join(reasons))
|
||||
|
||||
LOG.info('Claim successful')
|
||||
|
||||
def _test_memory(self, resources, limit):
|
||||
type_ = _("memory")
|
||||
unit = "MB"
|
||||
total = resources.mem_total
|
||||
used = resources.mem_used
|
||||
requested = self.memory
|
||||
|
||||
return self._test(type_, unit, total, used, requested, limit)
|
||||
|
||||
def _test_cpu(self, resources, limit):
|
||||
type_ = _("vcpu")
|
||||
unit = "VCPU"
|
||||
total = resources.cpus
|
||||
used = resources.cpu_used
|
||||
requested = self.cpu
|
||||
|
||||
return self._test(type_, unit, total, used, requested, limit)
|
||||
|
||||
def _test(self, type_, unit, total, used, requested, limit):
|
||||
"""Test if the type resource needed for a claim can be allocated."""
|
||||
|
||||
LOG.info(_('Total %(type)s: %(total)d %(unit)s, used: %(used).02f '
|
||||
'%(unit)s'),
|
||||
{'type': type_, 'total': total, 'unit': unit, 'used': used})
|
||||
|
||||
if limit is None:
|
||||
# treat resource as unlimited:
|
||||
LOG.info(_('%(type)s limit not specified, defaulting to '
|
||||
'unlimited'), {'type': type_})
|
||||
return
|
||||
|
||||
free = limit - used
|
||||
|
||||
# Oversubscribed resource policy info:
|
||||
LOG.info(_('%(type)s limit: %(limit).02f %(unit)s, '
|
||||
'free: %(free).02f %(unit)s'),
|
||||
{'type': type_, 'limit': limit, 'free': free, 'unit': unit})
|
||||
|
||||
if requested > free:
|
||||
return (_('Free %(type)s %(free).02f '
|
||||
'%(unit)s < requested %(requested)s %(unit)s') %
|
||||
{'type': type_, 'free': free, 'unit': unit,
|
||||
'requested': requested})
|
@ -12,18 +12,30 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.common import exception
|
||||
from zun.common import utils
|
||||
from zun.compute import claims
|
||||
from zun import objects
|
||||
from zun.objects import base as obj_base
|
||||
from zun.scheduler import client as scheduler_client
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
|
||||
|
||||
|
||||
class ComputeNodeTracker(object):
|
||||
def __init__(self, host, container_driver):
|
||||
self.host = host
|
||||
self.container_driver = container_driver
|
||||
self.compute_node = None
|
||||
self.tracked_containers = {}
|
||||
self.old_resources = collections.defaultdict(objects.ComputeNode)
|
||||
self.scheduler_client = scheduler_client.SchedulerClient()
|
||||
|
||||
def update_available_resources(self, context):
|
||||
# Check if the compute_node is already registered
|
||||
@ -37,7 +49,8 @@ class ComputeNodeTracker(object):
|
||||
node.create(context)
|
||||
LOG.info('Node created for :%(host)s', {'host': self.host})
|
||||
self.container_driver.get_available_resources(node)
|
||||
node.save()
|
||||
self.compute_node = node
|
||||
self._update_available_resource(context)
|
||||
# NOTE(sbiswas7): Consider removing the return statement if not needed
|
||||
return node
|
||||
|
||||
@ -48,3 +61,184 @@ class ComputeNodeTracker(object):
|
||||
except exception.ComputeNodeNotFound:
|
||||
LOG.warning("No compute node record for: %(host)s",
|
||||
{'host': self.host})
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def container_claim(self, context, container, hostname, limits=None):
|
||||
"""Indicate resources are needed for an upcoming container build.
|
||||
|
||||
This should be called before the compute node is about to perform
|
||||
an container build operation that will consume additional resources.
|
||||
|
||||
:param context: security context
|
||||
:param container: container to reserve resources for.
|
||||
:type container: zun.objects.container.Container object
|
||||
:param hostname: The zun hostname selected by the scheduler
|
||||
:param limits: Dict of oversubscription limits for memory, disk,
|
||||
and CPUs.
|
||||
:returns: A Claim ticket representing the reserved resources. It can
|
||||
be used to revert the resource usage if an error occurs
|
||||
during the container build.
|
||||
"""
|
||||
# No memory and cpu specified, no need to claim resource now.
|
||||
if not (container.memory or container.cpu):
|
||||
return claims.NopClaim()
|
||||
|
||||
# We should have the compute node created here, just get it.
|
||||
self.compute_node = self._get_compute_node(context)
|
||||
if self.disabled(hostname):
|
||||
self._set_container_host(container)
|
||||
return claims.NopClaim()
|
||||
|
||||
claim = claims.Claim(context, container, hostname, self,
|
||||
self.compute_node, limits=limits)
|
||||
|
||||
self._set_container_host(container)
|
||||
self._update_usage_from_container(container, hostname)
|
||||
# persist changes to the compute node:
|
||||
self._update(self.compute_node)
|
||||
|
||||
return claim
|
||||
|
||||
def disabled(self, hostname):
|
||||
return not self.container_driver.node_is_available(hostname)
|
||||
|
||||
def _set_container_host(self, container):
|
||||
"""Tag the container as belonging to this host.
|
||||
|
||||
This should be done while the COMPUTE_RESOURCES_SEMAPHORE is held so
|
||||
the resource claim will not be lost if the audit process starts.
|
||||
"""
|
||||
container.host = self.host
|
||||
container.save()
|
||||
|
||||
def _update_usage_from_container(self, container, hostname,
|
||||
is_removed=False):
|
||||
"""Update usage for a single container."""
|
||||
|
||||
uuid = container.uuid
|
||||
is_new_container = uuid not in self.tracked_containers
|
||||
is_removed_container = not is_new_container and is_removed
|
||||
|
||||
if is_new_container:
|
||||
self.tracked_containers[uuid] = \
|
||||
obj_base.obj_to_primitive(container)
|
||||
sign = 1
|
||||
|
||||
if is_removed_container:
|
||||
self.tracked_containers.pop(uuid)
|
||||
sign = -1
|
||||
|
||||
if is_new_container or is_removed_container:
|
||||
# TODO(Shunli): Handle pci, scheduler allocation here.
|
||||
|
||||
# new container, update compute node resource usage:
|
||||
self._update_usage(self._get_usage_dict(container),
|
||||
hostname, sign=sign)
|
||||
|
||||
def _update_usage_from_containers(self, context, containers, hostname):
|
||||
"""Calculate resource usage based on container utilization.
|
||||
|
||||
This is different than the conatiner daemon view as it will account
|
||||
for all containers assigned to the local compute host, even if they
|
||||
are not currently powered on.
|
||||
"""
|
||||
self.tracked_containers.clear()
|
||||
|
||||
cn = self.compute_node
|
||||
# set some initial values, reserve room for host
|
||||
cn.cpu_used = 0
|
||||
cn.mem_free = cn.mem_total
|
||||
cn.mem_used = 0
|
||||
cn.running_containers = 0
|
||||
|
||||
for cnt in containers:
|
||||
self._update_usage_from_container(cnt, hostname)
|
||||
|
||||
cn.mem_free = max(0, cn.mem_free)
|
||||
|
||||
def _update_usage(self, usage, hostname, sign=1):
|
||||
mem_usage = usage['memory']
|
||||
cpus_usage = usage.get('cpu', 0)
|
||||
|
||||
cn = self.compute_node
|
||||
cn.mem_used += sign * mem_usage
|
||||
cn.cpu_used += sign * cpus_usage
|
||||
|
||||
# free ram may be negative, depending on policy:
|
||||
cn.mem_free = cn.mem_total - cn.mem_used
|
||||
|
||||
cn.running_containers += sign * 1
|
||||
|
||||
# TODO(Shunli): Calculate the numa usage here
|
||||
|
||||
def _update(self, compute_node):
|
||||
if not self._resource_change(compute_node):
|
||||
return
|
||||
# Persist the stats to the Scheduler
|
||||
self.scheduler_client.update_resource(compute_node)
|
||||
# Update pci tracker here
|
||||
|
||||
def _resource_change(self, compute_node):
|
||||
"""Check to see if any resources have changed."""
|
||||
hostname = compute_node.hostname
|
||||
old_compute = self.old_resources[hostname]
|
||||
if not obj_base.obj_equal_prims(
|
||||
compute_node, old_compute, ['updated_at']):
|
||||
self.old_resources[hostname] = copy.deepcopy(compute_node)
|
||||
return True
|
||||
return False
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def _update_available_resource(self, context):
|
||||
|
||||
# if we could not init the compute node the tracker will be
|
||||
# disabled and we should quit now
|
||||
if self.disabled(self.host):
|
||||
return
|
||||
|
||||
# Grab all containers assigned to this node:
|
||||
containers = objects.Container.list_by_host(context, self.host)
|
||||
|
||||
# Now calculate usage based on container utilization:
|
||||
self._update_usage_from_containers(context, containers, self.host)
|
||||
|
||||
# No migration for docker, is there will be orphan container? Nova has.
|
||||
|
||||
cn = self.compute_node
|
||||
|
||||
# update the compute_node
|
||||
self._update(cn)
|
||||
LOG.debug('Compute_service record updated for %(host)s',
|
||||
{'host': self.host})
|
||||
|
||||
def _get_usage_dict(self, container, **updates):
|
||||
"""Make a usage dict _update methods expect.
|
||||
|
||||
Accepts an Container, and a set of updates.
|
||||
Converts the object to a dict and applies the updates.
|
||||
|
||||
:param container: container as an object
|
||||
:param updates: key-value pairs to update the passed object.
|
||||
|
||||
:returns: a dict with all the information from container updated
|
||||
with updates
|
||||
"""
|
||||
usage = {}
|
||||
# (Fixme): The Container.memory is string.
|
||||
memory = 0
|
||||
if container.memory:
|
||||
memory = int(container.memory[:-1])
|
||||
usage = {'memory': memory,
|
||||
'cpu': container.cpu or 0}
|
||||
|
||||
# update numa usage here
|
||||
|
||||
return usage
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def abort_container_claim(self, container, hostname):
|
||||
"""Remove usage from the given container."""
|
||||
self._update_usage_from_container(container, hostname,
|
||||
is_removed=True)
|
||||
|
||||
self._update(self.compute_node)
|
||||
|
@ -22,6 +22,7 @@ from zun.common import consts
|
||||
from zun.common import exception
|
||||
from zun.common import utils
|
||||
from zun.common.utils import translate_exception
|
||||
from zun.compute import compute_node_tracker
|
||||
import zun.conf
|
||||
from zun.container import driver
|
||||
from zun.image import driver as image_driver
|
||||
@ -37,6 +38,8 @@ class Manager(object):
|
||||
def __init__(self, container_driver=None):
|
||||
super(Manager, self).__init__()
|
||||
self.driver = driver.load_container_driver(container_driver)
|
||||
self.host = CONF.host
|
||||
self._resource_tracker = None
|
||||
|
||||
def _fail_container(self, context, container, error):
|
||||
container.status = consts.ERROR
|
||||
@ -134,11 +137,16 @@ class Manager(object):
|
||||
container.image_driver = image.get('driver')
|
||||
container.save(context)
|
||||
try:
|
||||
container = self.driver.create(context, container,
|
||||
sandbox_id, image)
|
||||
container.task_state = None
|
||||
container.save(context)
|
||||
return container
|
||||
# TODO(Shunli): No limits now, claim just update compute usage.
|
||||
limits = None
|
||||
rt = self._get_resource_tracker()
|
||||
with rt.container_claim(context, container, container.host,
|
||||
limits):
|
||||
container = self.driver.create(context, container,
|
||||
sandbox_id, image)
|
||||
container.task_state = None
|
||||
container.save(context)
|
||||
return container
|
||||
except exception.DockerError as e:
|
||||
with excutils.save_and_reraise_exception(reraise=reraise):
|
||||
LOG.error("Error occurred while calling Docker create API: %s",
|
||||
@ -579,3 +587,10 @@ class Manager(object):
|
||||
LOG.exception("Unexpected exception while searching image: %s",
|
||||
six.text_type(e))
|
||||
raise
|
||||
|
||||
def _get_resource_tracker(self):
|
||||
if not self._resource_tracker:
|
||||
rt = compute_node_tracker.ComputeNodeTracker(self.host,
|
||||
self.driver)
|
||||
self._resource_tracker = rt
|
||||
return self._resource_tracker
|
||||
|
@ -28,6 +28,7 @@ from zun.common import nova
|
||||
from zun.common import utils
|
||||
from zun.common.utils import check_container_id
|
||||
import zun.conf
|
||||
from zun.container.docker import host
|
||||
from zun.container.docker import utils as docker_utils
|
||||
from zun.container import driver
|
||||
from zun.network import network as zun_network
|
||||
@ -44,6 +45,7 @@ class DockerDriver(driver.ContainerDriver):
|
||||
|
||||
def __init__(self):
|
||||
super(DockerDriver, self).__init__()
|
||||
self._host = host.Host()
|
||||
|
||||
def load_image(self, image_path=None):
|
||||
with docker_utils.docker_client() as docker:
|
||||
@ -689,6 +691,9 @@ class DockerDriver(driver.ContainerDriver):
|
||||
network_api.add_security_groups_to_ports(
|
||||
sandbox, network, security_group_ids)
|
||||
|
||||
def get_available_nodes(self):
|
||||
return [self._host.get_hostname()]
|
||||
|
||||
|
||||
class NovaDockerDriver(DockerDriver):
|
||||
def add_security_group(self, context, sandbox_id, security_group):
|
||||
|
43
zun/container/docker/host.py
Normal file
43
zun/container/docker/host.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Manages information about the host.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from zun.container.docker import utils as docker_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Host(object):
|
||||
|
||||
def __init__(self):
|
||||
self._hostname = None
|
||||
|
||||
def get_hostname(self):
|
||||
"""Returns the hostname of the host."""
|
||||
with docker_utils.docker_client() as docker:
|
||||
hostname = docker.info()['Name']
|
||||
if self._hostname is None:
|
||||
self._hostname = hostname
|
||||
elif hostname != self._hostname:
|
||||
self._hostname = hostname
|
||||
LOG.warning(_('Hostname has changed from %(old)s '
|
||||
'to %(new)s. A restart is required '
|
||||
'to take effect.'),
|
||||
{'old': self._hostname,
|
||||
'new': hostname})
|
||||
return self._hostname
|
@ -207,10 +207,11 @@ class ContainerDriver(object):
|
||||
numa_topo_obj = self.get_host_numa_topology()
|
||||
node.numa_topology = numa_topo_obj
|
||||
meminfo = self.get_host_mem()
|
||||
(mem_total, mem_free, mem_ava) = meminfo
|
||||
(mem_total, mem_free, mem_ava, mem_used) = meminfo
|
||||
node.mem_total = mem_total // units.Ki
|
||||
node.mem_free = mem_free // units.Ki
|
||||
node.mem_available = mem_ava // units.Ki
|
||||
node.mem_used = mem_used // units.Ki
|
||||
info = self.get_host_info()
|
||||
(total, running, paused, stopped, cpus,
|
||||
architecture, os_type, os, kernel_version, labels) = info
|
||||
@ -226,3 +227,9 @@ class ContainerDriver(object):
|
||||
cpu_used = self.get_cpu_used()
|
||||
node.cpu_used = cpu_used
|
||||
node.labels = labels
|
||||
|
||||
def node_is_available(self, nodename):
|
||||
"""Return whether this compute service manages a particular node."""
|
||||
if nodename in self.get_available_nodes():
|
||||
return True
|
||||
return False
|
||||
|
@ -65,4 +65,5 @@ class Host(object):
|
||||
idx32 = m.index('Cached:')
|
||||
cached = m[idx32+1]
|
||||
mem_ava = int(mem_free) + int(buffers) + int(cached)
|
||||
return int(mem_total), int(mem_free), int(mem_ava)
|
||||
mem_used = int(mem_total) - int(mem_ava)
|
||||
return int(mem_total), int(mem_free), int(mem_ava), int(mem_used)
|
||||
|
@ -0,0 +1,33 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""add mem used to compute node
|
||||
|
||||
Revision ID: 648c25faa0be
|
||||
Revises: 174cafda0857
|
||||
Create Date: 2017-05-24 09:39:42.021441
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '648c25faa0be'
|
||||
down_revision = '174cafda0857'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('compute_node',
|
||||
sa.Column('mem_used', sa.Integer(), nullable=False))
|
@ -276,6 +276,7 @@ class ComputeNode(Base):
|
||||
mem_total = Column(Integer, nullable=False, default=0)
|
||||
mem_free = Column(Integer, nullable=False, default=0)
|
||||
mem_available = Column(Integer, nullable=False, default=0)
|
||||
mem_used = Column(Integer, nullable=False, default=0)
|
||||
total_containers = Column(Integer, nullable=False, default=0)
|
||||
running_containers = Column(Integer, nullable=False, default=0)
|
||||
paused_containers = Column(Integer, nullable=False, default=0)
|
||||
|
@ -14,9 +14,12 @@
|
||||
|
||||
"""Zun common internal object model"""
|
||||
|
||||
import netaddr
|
||||
|
||||
from oslo_versionedobjects import base as ovoo_base
|
||||
from oslo_versionedobjects import fields as ovoo_fields
|
||||
|
||||
from zun.objects import fields as obj_fields
|
||||
|
||||
remotable_classmethod = ovoo_base.remotable_classmethod
|
||||
remotable = ovoo_base.remotable
|
||||
@ -57,3 +60,77 @@ class ZunPersistentObject(object):
|
||||
class ZunObjectSerializer(ovoo_base.VersionedObjectSerializer):
|
||||
# Base class to use for object hydration
|
||||
OBJ_BASE_CLASS = ZunObject
|
||||
|
||||
|
||||
class ObjectListBase(ovoo_base.ObjectListBase):
|
||||
# NOTE: These are for transition to using the oslo
|
||||
# base object and can be removed when we move to it.
|
||||
@classmethod
|
||||
def _obj_primitive_key(cls, field):
|
||||
return 'zun_object.%s' % field
|
||||
|
||||
@classmethod
|
||||
def _obj_primitive_field(cls, primitive, field,
|
||||
default=obj_fields.UnspecifiedDefault):
|
||||
key = cls._obj_primitive_key(field)
|
||||
if default == obj_fields.UnspecifiedDefault:
|
||||
return primitive[key]
|
||||
else:
|
||||
return primitive.get(key, default)
|
||||
|
||||
|
||||
def obj_to_primitive(obj):
|
||||
"""Recursively turn an object into a python primitive.
|
||||
|
||||
A ZunObject becomes a dict, and anything that implements ObjectListBase
|
||||
becomes a list.
|
||||
"""
|
||||
if isinstance(obj, ObjectListBase):
|
||||
return [obj_to_primitive(x) for x in obj]
|
||||
elif isinstance(obj, ZunObject):
|
||||
result = {}
|
||||
for key in obj.obj_fields:
|
||||
if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields:
|
||||
result[key] = obj_to_primitive(getattr(obj, key))
|
||||
return result
|
||||
elif isinstance(obj, netaddr.IPAddress):
|
||||
return str(obj)
|
||||
elif isinstance(obj, netaddr.IPNetwork):
|
||||
return str(obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
def obj_equal_prims(obj_1, obj_2, ignore=None):
|
||||
"""Compare two primitives for equivalence ignoring some keys.
|
||||
|
||||
This operation tests the primitives of two objects for equivalence.
|
||||
Object primitives may contain a list identifying fields that have been
|
||||
changed - this is ignored in the comparison. The ignore parameter lists
|
||||
any other keys to be ignored.
|
||||
|
||||
:param:obj1: The first object in the comparison
|
||||
:param:obj2: The second object in the comparison
|
||||
:param:ignore: A list of fields to ignore
|
||||
:returns: True if the primitives are equal ignoring changes
|
||||
and specified fields, otherwise False.
|
||||
"""
|
||||
|
||||
def _strip(prim, keys):
|
||||
if isinstance(prim, dict):
|
||||
for k in keys:
|
||||
prim.pop(k, None)
|
||||
for v in prim.values():
|
||||
_strip(v, keys)
|
||||
if isinstance(prim, list):
|
||||
for v in prim:
|
||||
_strip(v, keys)
|
||||
return prim
|
||||
|
||||
if ignore is not None:
|
||||
keys = ['zun_object.changes'] + ignore
|
||||
else:
|
||||
keys = ['zun_object.changes']
|
||||
prim_1 = _strip(obj_1.obj_to_primitive(), keys)
|
||||
prim_2 = _strip(obj_2.obj_to_primitive(), keys)
|
||||
return prim_1 == prim_2
|
||||
|
@ -25,7 +25,8 @@ class ComputeNode(base.ZunPersistentObject, base.ZunObject):
|
||||
# Version 1.3: Add cpus, cpu_used
|
||||
# Version 1.4: Add host operating system info
|
||||
# Version 1.5: Add host labels info
|
||||
VERSION = '1.5'
|
||||
# Version 1.6: Add mem_used to compute node
|
||||
VERSION = '1.6'
|
||||
|
||||
fields = {
|
||||
'uuid': fields.UUIDField(read_only=True, nullable=False),
|
||||
@ -34,6 +35,7 @@ class ComputeNode(base.ZunPersistentObject, base.ZunObject):
|
||||
'mem_total': fields.IntegerField(nullable=False),
|
||||
'mem_free': fields.IntegerField(nullable=False),
|
||||
'mem_available': fields.IntegerField(nullable=False),
|
||||
'mem_used': fields.IntegerField(nullable=False),
|
||||
'total_containers': fields.IntegerField(nullable=False),
|
||||
'running_containers': fields.IntegerField(nullable=False),
|
||||
'paused_containers': fields.IntegerField(nullable=False),
|
||||
|
@ -17,6 +17,8 @@ from oslo_versionedobjects import fields
|
||||
|
||||
from zun.common import consts
|
||||
|
||||
UnspecifiedDefault = fields.UnspecifiedDefault
|
||||
|
||||
|
||||
class ContainerStatus(fields.Enum):
|
||||
ALL = consts.CONTAINER_STATUSES
|
||||
|
@ -31,3 +31,7 @@ class SchedulerClient(object):
|
||||
|
||||
def select_destinations(self, context, containers):
|
||||
return self.driver.select_destinations(context, containers)
|
||||
|
||||
def update_resource(self, node):
|
||||
node.save()
|
||||
# TODO(Shunli): Update the inventory here
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
from io import StringIO
|
||||
from zun.common import consts
|
||||
from zun.common import exception
|
||||
from zun.compute import claims
|
||||
from zun.compute import manager
|
||||
import zun.conf
|
||||
from zun.objects.container import Container
|
||||
@ -26,6 +27,12 @@ from zun.tests.unit.container.fake_driver import FakeDriver as fake_driver
|
||||
from zun.tests.unit.db import utils
|
||||
|
||||
|
||||
class FakeResourceTracker(object):
|
||||
|
||||
def container_claim(self, context, container, host, limits):
|
||||
return claims.NopClaim()
|
||||
|
||||
|
||||
class TestManager(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -54,6 +61,7 @@ class TestManager(base.TestCase):
|
||||
image = {'image': 'repo', 'path': 'out_path', 'driver': 'glance'}
|
||||
mock_pull.return_value = image, False
|
||||
mock_create_sandbox.return_value = 'fake_id'
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_create(self.context, container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
mock_pull.assert_any_call(self.context, container.image, 'latest',
|
||||
@ -115,6 +123,7 @@ class TestManager(base.TestCase):
|
||||
mock_pull.return_value = image, False
|
||||
mock_create.side_effect = exception.DockerError("Creation Failed")
|
||||
mock_create_sandbox.return_value = mock.MagicMock()
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_create(self.context, container)
|
||||
mock_fail.assert_called_once_with(self.context,
|
||||
container, "Creation Failed")
|
||||
@ -130,6 +139,7 @@ class TestManager(base.TestCase):
|
||||
mock_create.return_value = container
|
||||
mock_pull.return_value = image, False
|
||||
container.status = 'Stopped'
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_run(self.context, container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
mock_pull.assert_any_call(self.context, container.image, 'latest',
|
||||
@ -197,6 +207,7 @@ class TestManager(base.TestCase):
|
||||
mock_pull.return_value = {'name': 'nginx', 'path': None}, True
|
||||
mock_create.side_effect = exception.DockerError(
|
||||
message="Docker Error occurred")
|
||||
self.compute_manager._resource_tracker = FakeResourceTracker()
|
||||
self.compute_manager._do_container_run(self.context,
|
||||
container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
|
@ -606,7 +606,8 @@ class TestNovaDockerDriver(base.DriverTestCase):
|
||||
self.driver = DockerDriver()
|
||||
mock_output.return_value = LSCPU_ON
|
||||
conf.CONF.set_override('floating_cpu_set', "0")
|
||||
mock_mem.return_value = (100 * units.Ki, 50 * units.Ki, 50 * units.Ki)
|
||||
mock_mem.return_value = (100 * units.Ki, 50 * units.Ki, 50 * units.Ki,
|
||||
50 * units.Ki)
|
||||
mock_info.return_value = (10, 8, 0, 2, 48, 'x86_64', 'linux',
|
||||
'CentOS', '3.10.0-123',
|
||||
{'dev.type': 'product'})
|
||||
|
@ -71,4 +71,5 @@ class TestOSCapability(base.BaseTestCase):
|
||||
with mock.patch.object(six.moves.builtins, "open", m_open,
|
||||
create=True):
|
||||
output = os_capability_linux.LinuxHost().get_host_mem()
|
||||
self.assertEqual((3882464, 3514608, 3556372), output)
|
||||
used = (3882464 - 3556372)
|
||||
self.assertEqual((3882464, 3514608, 3556372, used), output)
|
||||
|
@ -265,9 +265,10 @@ def get_test_compute_node(**kw):
|
||||
'uuid': kw.get('uuid', '24a5b17a-f2eb-4556-89db-5f4169d13982'),
|
||||
'hostname': kw.get('hostname', 'localhost'),
|
||||
'numa_topology': kw.get('numa_topology', get_test_numa_topology()),
|
||||
'mem_total': kw.get('mem_total', 123),
|
||||
'mem_free': kw.get('mem_free', 456),
|
||||
'mem_available': kw.get('mem_available', 789),
|
||||
'mem_total': kw.get('mem_total', 1024),
|
||||
'mem_free': kw.get('mem_free', 512),
|
||||
'mem_available': kw.get('mem_available', 512),
|
||||
'mem_used': kw.get('mem_used', 512),
|
||||
'total_containers': kw.get('total_containers', 10),
|
||||
'running_containers': kw.get('running_containers', 8),
|
||||
'paused_containers': kw.get('paused_containers', 0),
|
||||
|
@ -362,7 +362,7 @@ object_data = {
|
||||
'ResourceClass': '1.1-d661c7675b3cd5b8c3618b68ba64324e',
|
||||
'ResourceProvider': '1.0-92b427359d5a4cf9ec6c72cbe630ee24',
|
||||
'ZunService': '1.1-b1549134bfd5271daec417ca8cabc77e',
|
||||
'ComputeNode': '1.5-0d6ff86fbdc03859b77f1092e785ce10',
|
||||
'ComputeNode': '1.6-33a173b969781644a95ea2925eb5cca2',
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user