Add housekeeping module and staging cleaner

As noted in previous discussions, glance should clean its staging
directory on startup. This is important for scenarios where we
started an import operation, but failed in the middle. If, when we
recover, the image has already been deleted from the database, then
we will never remove the (potentially very large) residue from disk
in our staging directory.

This is currently a problem with web-download, but will also occur
with glance-direct once we have the non-shared distributed import
functionality merged.

Closes-Bug: #1913625
Change-Id: Ib80e9cfb58680f9e8ead5993dc206f4da882dd09
This commit is contained in:
Dan Smith 2021-02-22 12:52:03 -08:00
parent e9852fb625
commit 232177e68c
6 changed files with 515 additions and 1 deletions

View File

@ -54,6 +54,8 @@ from glance.common import config
from glance.common import exception from glance.common import exception
from glance.common import store_utils from glance.common import store_utils
from glance.common import utils from glance.common import utils
import glance.db
from glance import housekeeping
from glance import i18n from glance import i18n
from glance.i18n import _, _LE, _LI, _LW from glance.i18n import _, _LE, _LI, _LW
@ -501,14 +503,18 @@ class BaseServer(object):
self.default_port = default_port self.default_port = default_port
self.configure() self.configure()
self.start_wsgi() self.start_wsgi()
cleaner = housekeeping.StagingStoreCleaner(glance.db.get_api())
self.pool.spawn_n(cleaner.clean_orphaned_staging_residue)
if self.initialize_prefetcher: if self.initialize_prefetcher:
self.cache_images() self.cache_images()
def start_wsgi(self): def start_wsgi(self):
workers = get_num_workers() workers = get_num_workers()
self.pool = self.create_pool()
if workers == 0: if workers == 0:
# Useful for profiling, test, debug etc. # Useful for profiling, test, debug etc.
self.pool = self.create_pool()
self.pool.spawn_n(self._single_run, self.application, self.sock) self.pool.spawn_n(self._single_run, self.application, self.sock)
return return
else: else:

View File

@ -12,6 +12,7 @@
import atexit import atexit
import os import os
import threading
import glance_store import glance_store
from oslo_config import cfg from oslo_config import cfg
@ -23,6 +24,7 @@ import glance.async_
from glance.common import config from glance.common import config
from glance.common import exception from glance.common import exception
from glance.common import store_utils from glance.common import store_utils
from glance import housekeeping
from glance.i18n import _ from glance.i18n import _
from glance import notifier from glance import notifier
@ -90,6 +92,17 @@ def drain_threadpools():
pool_model.pool.shutdown() pool_model.pool.shutdown()
def run_staging_cleanup():
cleaner = housekeeping.StagingStoreCleaner(glance.db.get_api())
# NOTE(danms): Start thread as a daemon. It is still a
# single-shot, but this will not block our shutdown if it is
# running.
cleanup_thread = threading.Thread(
target=cleaner.clean_orphaned_staging_residue,
daemon=True)
cleanup_thread.start()
def init_app(): def init_app():
config.set_config_defaults() config.set_config_defaults()
config_files = _get_config_files() config_files = _get_config_files()
@ -122,6 +135,8 @@ def init_app():
glance_store.create_stores(CONF) glance_store.create_stores(CONF)
glance_store.verify_default_store() glance_store.verify_default_store()
run_staging_cleanup()
_setup_os_profiler() _setup_os_profiler()
_validate_policy_enforcement_configuration() _validate_policy_enforcement_configuration()
return config.load_paste_app('glance-api') return config.load_paste_app('glance-api')

126
glance/housekeeping.py Normal file
View File

@ -0,0 +1,126 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from glance.common import exception
from glance.common import store_utils
from glance import context
from glance.i18n import _LE
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def staging_store_path():
"""Return the local path to the staging store.
:raises: GlanceException if staging store is not configured to be
a file:// URI
"""
if CONF.enabled_backends:
separator, staging_dir = store_utils.get_dir_separator()
else:
staging_dir = CONF.node_staging_uri
expected_prefix = 'file://'
if not staging_dir.startswith(expected_prefix):
raise exception.GlanceException(
'Unexpected scheme in staging store; '
'unable to scan for residue')
return staging_dir[len(expected_prefix):]
class StagingStoreCleaner:
def __init__(self, db):
self.db = db
self.context = context.get_admin_context()
@staticmethod
def get_image_id(filename):
if '.' in filename:
filename, ext = filename.split('.', 1)
if uuidutils.is_uuid_like(filename):
return filename
def is_valid_image(self, image_id):
try:
image = self.db.image_get(self.context, image_id)
# FIXME(danms): Maybe check that it's not deleted or
# something else like state, size, etc
return not image['deleted']
except exception.ImageNotFound:
return False
@staticmethod
def delete_file(path):
try:
os.remove(path)
except FileNotFoundError:
# NOTE(danms): We must have raced with something else, so this
# is not a problem
pass
except Exception as e:
LOG.error(_LE('Failed to delete stale staging '
'path %(path)r: %(err)s'),
{'path': path, 'err': str(e)})
return False
return True
def clean_orphaned_staging_residue(self):
try:
files = os.listdir(staging_store_path())
except FileNotFoundError:
# NOTE(danms): If we cannot list the staging dir, there is
# clearly nothing left from a previous run, so nothing to
# clean up.
files = []
if not files:
return
LOG.debug('Found %i files in staging directory for potential cleanup',
len(files))
cleaned = ignored = error = 0
for filename in files:
image_id = self.get_image_id(filename)
if not image_id:
# NOTE(danms): We should probably either have a config option
# that decides what to do here (i.e. reap or ignore), or decide
# that this is not okay and just nuke anything we find.
LOG.debug('Staging directory contains unexpected non-image '
'file %r; ignoring',
filename)
ignored += 1
continue
if self.is_valid_image(image_id):
# NOTE(danms): We found a non-deleted image for this
# file, so leave it in place.
ignored += 1
continue
path = os.path.join(staging_store_path(), filename)
LOG.debug('Stale staging residue found for image '
'%(uuid)s: %(file)r; deleting now.',
{'uuid': image_id, 'file': path})
if self.delete_file(path):
cleaned += 1
else:
error += 1
LOG.debug('Cleaned %(cleaned)i stale staging files, '
'%(ignored)i ignored (%(error)i errors)',
{'cleaned': cleaned, 'ignored': ignored, 'error': error})

View File

@ -16,9 +16,14 @@
"""Tests for `glance.wsgi`.""" """Tests for `glance.wsgi`."""
import os import os
from six.moves import http_client as http
import socket import socket
import time import time
from oslo_serialization import jsonutils
from oslo_utils.fixture import uuidsentinel as uuids
import requests
from glance.common import wsgi from glance.common import wsgi
from glance.tests import functional from glance.tests import functional
@ -57,3 +62,116 @@ class TestWSGIServer(functional.FunctionalTest):
self.assertIn(greetings, get_request()) self.assertIn(greetings, get_request())
# Should fail - connection timed out so we get nothing from the server # Should fail - connection timed out so we get nothing from the server
self.assertFalse(get_request(delay=1.1)) self.assertFalse(get_request(delay=1.1))
class StagingCleanupBase:
def _url(self, path):
return 'http://127.0.0.1:%d%s' % (self.api_port, path)
def _headers(self, custom_headers=None):
base_headers = {
'X-Identity-Status': 'Confirmed',
'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96',
'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e',
'X-Tenant-Id': uuids.tenant1,
'X-Roles': 'member',
}
base_headers.update(custom_headers or {})
return base_headers
def test_clean_on_start(self):
staging = os.path.join(self.test_dir, 'staging')
# Start the server
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json'})
data = jsonutils.dumps({'name': 'image-1', 'type': 'kernel',
'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.CREATED, response.status_code)
image = jsonutils.loads(response.text)
image_id = image['id']
# Stage data for the image
path = self._url('/v2/images/%s/stage' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
image_data = b'ZZZZZ'
response = requests.put(path, headers=headers, data=image_data)
self.assertEqual(http.NO_CONTENT, response.status_code)
# Stop the server
self.my_api_server.stop()
# Create more files in staging, one unrecognized one, and one
# uuid that matches nothing in the database, and some residue
# like we would see from failed conversions and decompressions
# for the image we created above.
open(os.path.join(staging, 'foo'), 'w')
open(os.path.join(staging, uuids.stale), 'w')
open(os.path.join(staging, uuids.converting), 'w')
converting_fn = os.path.join(staging, '%s.qcow2' % uuids.converting)
decompressing_fn = os.path.join(staging, '%s.uc' % uuids.decompressing)
open(converting_fn, 'w')
open(decompressing_fn, 'w')
# Restart the server. We set "needs_database" to False here to avoid
# recreating the database during startup, thus causing the server to
# think there are no valid images and deleting everything.
self.my_api_server.needs_database = False
self.start_with_retry(self.my_api_server,
'api_port', 3, **self.__dict__.copy())
# Poll to give it time to come up and do the work. Use the presence
# of the extra files to determine if the cleanup has run yet.
for i in range(0, 10):
try:
requests.get(self._url('/v2/images'))
except Exception:
# Not even answering queries yet
pass
else:
files = os.listdir(staging)
if len(files) == 2:
break
time.sleep(1)
# We should still find the not-an-image file...
self.assertTrue(os.path.exists(os.path.join(staging, 'foo')))
# ...and make sure the actually-staged image file is still present....
self.assertTrue(os.path.exists(os.path.join(staging, image_id)))
# ... but the stale image should be gone,
self.assertFalse(os.path.exists(os.path.join(staging,
uuids.stale)))
# ... along with the residue of the conversion ...
self.assertFalse(os.path.exists(converting_fn))
# ... and the residue of the decompression.
self.assertFalse(os.path.exists(decompressing_fn))
self.stop_servers()
class TestStagingCleanupMultistore(functional.MultipleBackendFunctionalTest,
StagingCleanupBase):
"""Test for staging store cleanup on API server startup.
This tests the multistore case.
"""
def setUp(self):
super(TestStagingCleanupMultistore, self).setUp()
self.my_api_server = self.api_server_multiple_backend
class TestStagingCleanupSingleStore(functional.FunctionalTest,
StagingCleanupBase):
"""Test for staging store cleanup on API server startup.
This tests the single store case.
"""
def setUp(self):
super(TestStagingCleanupSingleStore, self).setUp()
self.my_api_server = self.api_server

View File

@ -98,3 +98,17 @@ class TestWsgiAppInit(test_utils.BaseTestCase):
self.config(enforce_new_defaults=False, group='oslo_policy') self.config(enforce_new_defaults=False, group='oslo_policy')
self.config(enforce_secure_rbac=False) self.config(enforce_secure_rbac=False)
self.assertTrue(wsgi_app.init_app()) self.assertTrue(wsgi_app.init_app())
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
@mock.patch('glance.common.config.load_paste_app')
@mock.patch('glance.common.wsgi_app._get_config_files')
@mock.patch('threading.Thread')
@mock.patch('glance.housekeeping.StagingStoreCleaner')
def test_runs_staging_cleanup(self, mock_cleaner, mock_Thread, mock_conf,
mock_load):
mock_conf.return_value = []
wsgi_app.init_app()
mock_Thread.assert_called_once_with(
target=mock_cleaner().clean_orphaned_staging_residue,
daemon=True)
mock_Thread.return_value.start.assert_called_once_with()

View File

@ -0,0 +1,235 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from unittest import mock
import glance_store
from oslo_config import cfg
from oslo_utils.fixture import uuidsentinel as uuids
from glance.common import exception
from glance import context
from glance import housekeeping
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
CONF = cfg.CONF
class TestStagingStoreHousekeeping(test_utils.BaseTestCase):
def _store_dir(self, store):
return os.path.join(self.test_dir, store)
def setUp(self):
super(TestStagingStoreHousekeeping, self).setUp()
self.config(enabled_backends={'store1': 'file'})
glance_store.register_store_opts(
CONF,
reserved_stores={'os_glance_staging_store': 'file'})
self.config(default_backend='store1',
group='glance_store')
self.config(filesystem_store_datadir=self._store_dir('store1'),
group='store1')
self.config(filesystem_store_datadir=self._store_dir('staging'),
group='os_glance_staging_store')
glance_store.create_multi_stores(
CONF,
reserved_stores={'os_glance_staging_store': 'file'})
self.db = unit_test_utils.FakeDB(initialize=False)
self.cleaner = housekeeping.StagingStoreCleaner(self.db)
self.context = context.get_admin_context()
def test_get_staging_path(self):
expected = os.path.join(self.test_dir, 'staging')
self.assertEqual(expected, housekeeping.staging_store_path())
def test_get_staging_path_single_store(self):
self.config(enabled_backends={})
expected = '/tmp/staging/'
self.assertEqual(expected, housekeeping.staging_store_path())
@mock.patch('glance.common.store_utils.get_dir_separator')
def test_assert_staging_scheme(self, mock_get_dir_separator):
# NOTE(danms): This cannot happen now, but since we need to be
# opinionated about the fact that the URL is a file path, better
# to check for it, in case it changes in the future.
mock_get_dir_separator.return_value = ('/', 'http://foo')
self.assertRaises(exception.GlanceException,
lambda: housekeeping.staging_store_path())
def test_assert_staging_scheme_on_init(self):
# NOTE(danms): Make this a single-store scenario, which will cover
# our assertion about node_staging_uri while we test for the
# assert-on-init behavior.
self.config(enabled_backends={},
node_staging_uri='http://good.luck')
self.assertRaises(exception.GlanceException,
housekeeping.staging_store_path)
def test_get_image_id(self):
self.assertEqual(uuids.some_random_uuid,
self.cleaner.get_image_id(uuids.some_random_uuid))
self.assertEqual(uuids.some_random_uuid,
self.cleaner.get_image_id(
'%s.qcow2' % uuids.some_random_uuid))
self.assertEqual(uuids.some_random_uuid,
self.cleaner.get_image_id(
'%s.uc' % uuids.some_random_uuid))
self.assertEqual(uuids.some_random_uuid,
self.cleaner.get_image_id(
'%s.blah' % uuids.some_random_uuid))
self.assertIsNone(self.cleaner.get_image_id('foo'))
self.assertIsNone(self.cleaner.get_image_id('foo.bar'))
def test_is_valid_image(self):
image = self.db.image_create(self.context, {'status': 'queued'})
self.assertTrue(self.cleaner.is_valid_image(image['id']))
self.assertFalse(self.cleaner.is_valid_image('foo'))
def test_is_valid_image_deleted(self):
image = self.db.image_create(self.context, {'status': 'queued'})
self.db.image_destroy(self.context, image['id'])
self.assertFalse(self.cleaner.is_valid_image(image['id']))
@mock.patch('os.remove')
def test_delete_file(self, mock_remove):
self.assertTrue(self.cleaner.delete_file('foo'))
os.remove.assert_called_once_with('foo')
@mock.patch('os.remove')
@mock.patch.object(housekeeping, 'LOG')
def test_delete_file_not_found(self, mock_LOG, mock_remove):
os.remove.side_effect = FileNotFoundError('foo is gone')
# We should ignore a file-not-found error
self.assertTrue(self.cleaner.delete_file('foo'))
os.remove.assert_called_once_with('foo')
mock_LOG.error.assert_not_called()
@mock.patch('os.remove')
@mock.patch.object(housekeeping, 'LOG')
def test_delete_file_failed(self, mock_LOG, mock_remove):
# Any other error should report failure and log
os.remove.side_effect = Exception('insufficient plutonium')
self.assertFalse(self.cleaner.delete_file('foo'))
os.remove.assert_called_once_with('foo')
mock_LOG.error.assert_called_once_with(
'Failed to delete stale staging path %(path)r: %(err)s',
{'path': 'foo', 'err': 'insufficient plutonium'})
@mock.patch('os.listdir')
@mock.patch('os.remove')
@mock.patch.object(housekeeping, 'LOG')
def test_clean_orphaned_staging_residue_empty(self, mock_LOG, mock_remove,
mock_listdir):
mock_listdir.return_value = []
self.cleaner.clean_orphaned_staging_residue()
mock_listdir.assert_called_once_with(housekeeping.staging_store_path())
mock_remove.assert_not_called()
mock_LOG.assert_not_called()
@mock.patch('os.remove')
@mock.patch('os.listdir')
@mock.patch.object(housekeeping, 'LOG')
def test_clean_orphaned_staging_residue(self, mock_LOG, mock_listdir,
mock_remove):
staging = housekeeping.staging_store_path()
image = self.db.image_create(self.context, {'status': 'queued'})
mock_listdir.return_value = ['notanimageid', image['id'], uuids.stale,
uuids.midconvert,
'%s.qcow2' % uuids.midconvert]
self.cleaner.clean_orphaned_staging_residue()
# NOTE(danms): We should have deleted the stale image file
expected_stale = os.path.join(staging, uuids.stale)
# NOTE(danms): We should have deleted the mid-convert base image and
# the target file
expected_mc = os.path.join(staging, uuids.midconvert)
expected_mc_target = os.path.join(staging,
'%s.qcow2' % uuids.midconvert)
mock_remove.assert_has_calls([
mock.call(expected_stale),
mock.call(expected_mc),
mock.call(expected_mc_target),
])
# NOTE(danms): We should have cleaned the one (which we os.remove()'d)
# above, and ignore the invalid and active ones. No errors this time.
mock_LOG.debug.assert_has_calls([
mock.call('Found %i files in staging directory for potential '
'cleanup', 5),
mock.call('Staging directory contains unexpected non-image file '
'%r; ignoring',
'notanimageid'),
mock.call('Stale staging residue found for image %(uuid)s: '
'%(file)r; deleting now.',
{'uuid': uuids.stale, 'file': expected_stale}),
mock.call('Stale staging residue found for image %(uuid)s: '
'%(file)r; deleting now.',
{'uuid': uuids.midconvert, 'file': expected_mc}),
mock.call('Stale staging residue found for image %(uuid)s: '
'%(file)r; deleting now.',
{'uuid': uuids.midconvert, 'file': expected_mc_target}),
mock.call('Cleaned %(cleaned)i stale staging files, '
'%(ignored)i ignored (%(error)i errors)',
{'cleaned': 3, 'ignored': 2, 'error': 0}),
])
@mock.patch('os.listdir')
@mock.patch('os.remove')
@mock.patch.object(housekeeping, 'LOG')
def test_clean_orphaned_staging_residue_handles_errors(self, mock_LOG,
mock_remove,
mock_listdir):
staging = housekeeping.staging_store_path()
mock_listdir.return_value = [uuids.gone, uuids.error]
mock_remove.side_effect = [FileNotFoundError('gone'),
PermissionError('not yours')]
self.cleaner.clean_orphaned_staging_residue()
# NOTE(danms): We should only have logged an error for the
# permission failure
mock_LOG.error.assert_called_once_with(
'Failed to delete stale staging path %(path)r: %(err)s',
{'path': os.path.join(staging, uuids.error),
'err': 'not yours'})
# NOTE(danms): We should report the permission failure as an error,
# but not the already-gone or invalid ones.
mock_LOG.debug.assert_has_calls([
mock.call('Found %i files in staging directory for potential '
'cleanup', 2),
mock.call('Stale staging residue found for image %(uuid)s: '
'%(file)r; deleting now.',
{'uuid': uuids.gone,
'file': os.path.join(staging, uuids.gone)}),
mock.call('Stale staging residue found for image %(uuid)s: '
'%(file)r; deleting now.',
{'uuid': uuids.error,
'file': os.path.join(staging, uuids.error)}),
mock.call('Cleaned %(cleaned)i stale staging files, '
'%(ignored)i ignored (%(error)i errors)',
{'cleaned': 1, 'ignored': 0, 'error': 1}),
])