From 85075f4b116e485fa0bc9a9d28a6c1901bf7c703 Mon Sep 17 00:00:00 2001 From: Zhi Yan Liu Date: Tue, 30 Jul 2013 23:05:10 +0800 Subject: [PATCH] Scrubber refactoring * Adding multiple locations image support. * Adding lock protection to prevent race condition between glance-api and glance-scrubber service. * Refactoring scrub queue code. Implement bp: glance-scrubber-refactoring docImpact Change-Id: I050ff212d73ace8e84dcd800245b608210d6b29a Signed-off-by: Zhi Yan Liu --- etc/glance-api.conf | 4 + etc/glance-scrubber.conf | 5 + glance/api/v1/images.py | 2 - glance/api/v1/upload_utils.py | 5 +- glance/cmd/scrubber.py | 2 +- glance/openstack/common/fileutils.py | 110 +++++ glance/openstack/common/lockutils.py | 276 +++++++++++ glance/store/__init__.py | 38 +- glance/store/scrubber.py | 533 +++++++++++++++++----- glance/tests/functional/__init__.py | 6 +- glance/tests/unit/base.py | 3 +- glance/tests/unit/test_http_store.py | 2 +- glance/tests/unit/test_scrubber.py | 11 +- glance/tests/unit/utils.py | 10 +- glance/tests/unit/v1/test_upload_utils.py | 5 +- openstack-common.conf | 2 + 16 files changed, 862 insertions(+), 152 deletions(-) create mode 100644 glance/openstack/common/fileutils.py create mode 100644 glance/openstack/common/lockutils.py diff --git a/etc/glance-api.conf b/etc/glance-api.conf index d205a7e912..bff598272e 100644 --- a/etc/glance-api.conf +++ b/etc/glance-api.conf @@ -103,6 +103,10 @@ workers = 1 # Supported values for the 'disk_format' image attribute #disk_formats=ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso +# Directory to use for lock files. Default to a temp directory +# (string value). This setting needs to be the same for both +# glance-scrubber and glance-api. +#lock_path= # Set a system wide quota for every user. This value is the total number # of bytes that a user can use across all storage systems. A value of diff --git a/etc/glance-scrubber.conf b/etc/glance-scrubber.conf index 4906910c8c..d02095054f 100644 --- a/etc/glance-scrubber.conf +++ b/etc/glance-scrubber.conf @@ -40,6 +40,11 @@ registry_port = 9191 # admin_user = %SERVICE_USER% # admin_password = %SERVICE_PASSWORD% +# Directory to use for lock files. Default to a temp directory +# (string value). This setting needs to be the same for both +# glance-scrubber and glance-api. +#lock_path= + # ================= Security Options ========================== # AES key for encrypting store 'location' metadata, including diff --git a/glance/api/v1/images.py b/glance/api/v1/images.py index 5eb47cd690..269b0906ef 100644 --- a/glance/api/v1/images.py +++ b/glance/api/v1/images.py @@ -48,8 +48,6 @@ from glance.openstack.common import strutils import glance.registry.client.v1.api as registry from glance.store import (get_from_backend, get_size_from_backend, - safe_delete_from_backend, - schedule_delayed_delete_from_backend, get_store_from_location, get_store_from_scheme) diff --git a/glance/api/v1/upload_utils.py b/glance/api/v1/upload_utils.py index 47124e3550..b718c39955 100644 --- a/glance/api/v1/upload_utils.py +++ b/glance/api/v1/upload_utils.py @@ -41,9 +41,10 @@ def initiate_deletion(req, location, id, delayed_delete=False): :param delayed_delete: whether data deletion will be delayed """ if delayed_delete: - glance.store.schedule_delayed_delete_from_backend(location, id) + glance.store.schedule_delayed_delete_from_backend(req.context, + location, id) else: - glance.store.safe_delete_from_backend(location, req.context, id) + glance.store.safe_delete_from_backend(req.context, location, id) def _kill(req, image_id): diff --git a/glance/cmd/scrubber.py b/glance/cmd/scrubber.py index 593a5bc7bb..3aec00d65d 100755 --- a/glance/cmd/scrubber.py +++ b/glance/cmd/scrubber.py @@ -61,7 +61,7 @@ def main(): glance.store.create_stores() glance.store.verify_default_store() - app = glance.store.scrubber.Scrubber() + app = glance.store.scrubber.Scrubber(glance.store) if CONF.daemon: server = glance.store.scrubber.Daemon(CONF.wakeup_time) diff --git a/glance/openstack/common/fileutils.py b/glance/openstack/common/fileutils.py new file mode 100644 index 0000000000..72ae50c37e --- /dev/null +++ b/glance/openstack/common/fileutils.py @@ -0,0 +1,110 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import errno +import os + +from glance.openstack.common import excutils +from glance.openstack.common.gettextutils import _ # noqa +from glance.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +_FILE_CACHE = {} + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise + + +def read_cached_file(filename, force_reload=False): + """Read from a file if it has been modified. + + :param force_reload: Whether to reload the file. + :returns: A tuple with a boolean specifying if the data is fresh + or not. + """ + global _FILE_CACHE + + if force_reload and filename in _FILE_CACHE: + del _FILE_CACHE[filename] + + reloaded = False + mtime = os.path.getmtime(filename) + cache_info = _FILE_CACHE.setdefault(filename, {}) + + if not cache_info or mtime > cache_info.get('mtime', 0): + LOG.debug(_("Reloading cached file %s") % filename) + with open(filename) as fap: + cache_info['data'] = fap.read() + cache_info['mtime'] = mtime + reloaded = True + return (reloaded, cache_info['data']) + + +def delete_if_exists(path): + """Delete a file, but ignore file not found error. + + :param path: File to delete + """ + + try: + os.unlink(path) + except OSError as e: + if e.errno == errno.ENOENT: + return + else: + raise + + +@contextlib.contextmanager +def remove_path_on_error(path): + """Protect code that wants to operate on PATH atomically. + Any exception will cause PATH to be removed. + + :param path: File to work with + """ + try: + yield + except Exception: + with excutils.save_and_reraise_exception(): + delete_if_exists(path) + + +def file_open(*args, **kwargs): + """Open file + + see built-in file() documentation for more details + + Note: The reason this is kept in a separate module is to easily + be able to provide a stub module that doesn't alter system + state at all (for unit tests) + """ + return file(*args, **kwargs) diff --git a/glance/openstack/common/lockutils.py b/glance/openstack/common/lockutils.py new file mode 100644 index 0000000000..a8b1abe251 --- /dev/null +++ b/glance/openstack/common/lockutils.py @@ -0,0 +1,276 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import errno +import functools +import os +import time +import weakref + +from eventlet import semaphore +from oslo.config import cfg + +from glance.openstack.common import fileutils +from glance.openstack.common.gettextutils import _ # noqa +from glance.openstack.common import local +from glance.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +util_opts = [ + cfg.BoolOpt('disable_process_locking', default=False, + help='Whether to disable inter-process locks'), + cfg.StrOpt('lock_path', + help=('Directory to use for lock files.')) +] + + +CONF = cfg.CONF +CONF.register_opts(util_opts) + + +def set_defaults(lock_path): + cfg.set_defaults(util_opts, lock_path=lock_path) + + +class _InterProcessLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. Exclusive + access between local threads should be achieved using the semaphores + in the @synchronized decorator. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self.lockfile = None + self.fname = name + + def __enter__(self): + self.lockfile = open(self.fname, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + return self + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + # external locks synchronise things like iptables + # updates - give it some time to prevent busy spinning + time.sleep(0.01) + else: + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.unlock() + self.lockfile.close() + except IOError: + LOG.exception(_("Could not release the acquired lock `%s`"), + self.fname) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_InterProcessLock): + def trylock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) + + +class _PosixLock(_InterProcessLock): + def trylock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self.lockfile, fcntl.LOCK_UN) + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock +else: + import fcntl + InterProcessLock = _PosixLock + +_semaphores = weakref.WeakValueDictionary() + + +@contextlib.contextmanager +def lock(name, lock_file_prefix=None, external=False, lock_path=None): + """Context based lock + + This function yields a `semaphore.Semaphore` instance unless external is + True, in which case, it'll yield an InterProcessLock instance. + + :param lock_file_prefix: The lock_file_prefix argument is used to provide + lock files on disk with a meaningful prefix. + + :param external: The external keyword argument denotes whether this lock + should work across multiple processes. This means that if two different + workers both run a a method decorated with @synchronized('mylock', + external=True), only one of them will execute at a time. + + :param lock_path: The lock_path keyword argument is used to specify a + special location for external lock files to live. If nothing is set, then + CONF.lock_path is used as a default. + """ + # NOTE(soren): If we ever go natively threaded, this will be racy. + # See http://stackoverflow.com/questions/5390569/dyn + # amically-allocating-and-destroying-mutexes + sem = _semaphores.get(name, semaphore.Semaphore()) + if name not in _semaphores: + # this check is not racy - we're already holding ref locally + # so GC won't remove the item and there was no IO switch + # (only valid in greenthreads) + _semaphores[name] = sem + + with sem: + LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s"'), + {'lock': name}) + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path or CONF.lock_path + if not local_lock_path: + raise cfg.RequiredOptError('lock_path') + + if not os.path.exists(local_lock_path): + fileutils.ensure_tree(local_lock_path) + LOG.info(_('Created lock path: %s'), local_lock_path) + + def add_prefix(name, prefix): + if not prefix: + return name + sep = '' if prefix.endswith('-') else '-' + return '%s%s%s' % (prefix, sep, name) + + # NOTE(mikal): the lock name cannot contain directory + # separators + lock_file_name = add_prefix(name.replace(os.sep, '_'), + lock_file_prefix) + + lock_file_path = os.path.join(local_lock_path, lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock as lock: + LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + yield lock + finally: + LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), + {'lock': name, 'path': lock_file_path}) + else: + yield sem + + finally: + local.strong_store.locks_held.remove(name) + + +def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): + """Synchronization decorator. + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one thread will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + """ + + def wrap(f): + @functools.wraps(f) + def inner(*args, **kwargs): + with lock(name, lock_file_prefix, external, lock_path): + LOG.debug(_('Got semaphore / lock "%(function)s"'), + {'function': f.__name__}) + return f(*args, **kwargs) + + LOG.debug(_('Semaphore / lock released "%(function)s"'), + {'function': f.__name__}) + return inner + return wrap + + +def synchronized_with_prefix(lock_file_prefix): + """Partial object generator for the synchronization decorator. + + Redefine @synchronized in each project like so:: + + (in nova/utils.py) + from nova.openstack.common import lockutils + + synchronized = lockutils.synchronized_with_prefix('nova-') + + + (in nova/foo.py) + from nova import utils + + @utils.synchronized('mylock') + def bar(self, *args): + ... + + The lock_file_prefix argument is used to provide lock files on disk with a + meaningful prefix. + """ + + return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) diff --git a/glance/store/__init__.py b/glance/store/__init__.py index 328eba7171..3f11a567a2 100644 --- a/glance/store/__init__.py +++ b/glance/store/__init__.py @@ -30,6 +30,7 @@ import glance.domain.proxy from glance.openstack.common import importutils import glance.openstack.common.log as logging from glance.store import location +from glance.store import scrubber LOG = logging.getLogger(__name__) @@ -53,8 +54,9 @@ store_opts = [ cfg.StrOpt('scrubber_datadir', default='/var/lib/glance/scrubber', help=_('Directory that the scrubber will use to track ' - 'information about what to delete. Make sure this is ' - 'also set in glance-api.conf')), + 'information about what to delete. ' + 'Make sure this is set in glance-api.conf and ' + 'glance-scrubber.conf')), cfg.BoolOpt('delayed_delete', default=False, help=_('Turn on/off delayed delete.')), cfg.IntOpt('scrub_time', default=0, @@ -265,7 +267,7 @@ def get_store_from_location(uri): return loc.store_name -def safe_delete_from_backend(uri, context, image_id, **kwargs): +def safe_delete_from_backend(context, uri, image_id, **kwargs): """Given a uri, delete an image from the store.""" try: return delete_from_backend(context, uri, **kwargs) @@ -281,31 +283,21 @@ def safe_delete_from_backend(uri, context, image_id, **kwargs): LOG.error(msg) -def schedule_delayed_delete_from_backend(uri, image_id, **kwargs): - """Given a uri, schedule the deletion of an image.""" - datadir = CONF.scrubber_datadir - delete_time = time.time() + CONF.scrub_time - file_path = os.path.join(datadir, str(image_id)) - utils.safe_mkdirs(datadir) - - if os.path.exists(file_path): - msg = _("Image id %(image_id)s already queued for delete") % { - 'image_id': image_id} - raise exception.Duplicate(msg) - - if CONF.metadata_encryption_key is not None: - uri = crypt.urlsafe_encrypt(CONF.metadata_encryption_key, uri, 64) - with open(file_path, 'w') as f: - f.write('\n'.join([uri, str(int(delete_time))])) - os.chmod(file_path, 0o600) - os.utime(file_path, (delete_time, delete_time)) +def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs): + """Given a uri, schedule the deletion of an image location.""" + (file_queue, _db_queue) = scrubber.get_scrub_queues() + # NOTE(zhiyan): Defautly ask glance-api store using file based queue. + # In future we can change it using DB based queued instead, + # such as using image location's status to saving pending delete flag + # when that property be added. + file_queue.add_location(image_id, uri) def delete_image_from_backend(context, store_api, image_id, uri): if CONF.delayed_delete: - store_api.schedule_delayed_delete_from_backend(uri, image_id) + store_api.schedule_delayed_delete_from_backend(context, uri, image_id) else: - store_api.safe_delete_from_backend(uri, context, image_id) + store_api.safe_delete_from_backend(context, uri, image_id) def check_location_metadata(val, key=''): diff --git a/glance/store/scrubber.py b/glance/store/scrubber.py index 171598e9e6..afc4009b89 100644 --- a/glance/store/scrubber.py +++ b/glance/store/scrubber.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import calendar import eventlet import os @@ -26,13 +27,23 @@ from glance.common import crypt from glance.common import exception from glance.common import utils from glance import context +from glance.openstack.common import lockutils import glance.openstack.common.log as logging +import glance.openstack.common.uuidutils as uuidutils import glance.registry.client.v1.api as registry -from glance import store LOG = logging.getLogger(__name__) scrubber_opts = [ + cfg.StrOpt('scrubber_datadir', + default='/var/lib/glance/scrubber', + help=_('Directory that the scrubber will use to track ' + 'information about what to delete. ' + 'Make sure this is set in glance-api.conf and ' + 'glance-scrubber.conf')), + cfg.IntOpt('scrub_time', default=0, + help=_('The amount of time in seconds to delay before ' + 'performing a delete.')), cfg.BoolOpt('cleanup_scrubber', default=False, help=_('A boolean that determines if the scrubber should ' 'clean up the files it uses for taking data. Only ' @@ -45,6 +56,300 @@ scrubber_opts = [ CONF = cfg.CONF CONF.register_opts(scrubber_opts) +CONF.import_opt('metadata_encryption_key', 'glance.common.config') + + +class ScrubQueue(object): + """Image scrub queue base class. + + The queue contains image's location which need to delete from backend. + """ + def __init__(self): + registry.configure_registry_client() + registry.configure_registry_admin_creds() + self.registry = registry.get_registry_client(context.RequestContext()) + + @abc.abstractmethod + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + pass + + @abc.abstractmethod + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + pass + + @abc.abstractmethod + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + pass + + @abc.abstractmethod + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + pass + + +class ScrubFileQueue(ScrubQueue): + """File-based image scrub queue class.""" + def __init__(self): + super(ScrubFileQueue, self).__init__() + self.scrubber_datadir = CONF.scrubber_datadir + utils.safe_mkdirs(self.scrubber_datadir) + self.scrub_time = CONF.scrub_time + self.metadata_encryption_key = CONF.metadata_encryption_key + + def _read_queue_file(self, file_path): + """Reading queue file to loading deleted location and timestamp out. + + :param file_path: Queue file full path + + :retval a list of image location timestamp tuple from queue file + """ + uris = [] + delete_times = [] + + try: + with open(file_path, 'r') as f: + while True: + uri = f.readline().strip() + if uri: + uris.append(uri) + delete_times.append(int(f.readline().strip())) + else: + break + except Exception: + LOG.error(_("%s file can not be read.") % file_path) + + return uris, delete_times + + def _update_queue_file(self, file_path, remove_record_idxs): + """Updating queue file to remove such queue records. + + :param file_path: Queue file full path + :param remove_record_idxs: A list of record index those want to remove + """ + try: + with open(file_path, 'r') as f: + lines = f.readlines() + # NOTE(zhiyan) we need bottom up removing to + # keep record index be valid. + remove_record_idxs.sort(reverse=True) + for record_idx in remove_record_idxs: + # Each record has two lines + line_no = (record_idx + 1) * 2 - 1 + del lines[line_no:line_no + 2] + with open(file_path, 'w') as f: + f.write(''.join(lines)) + os.chmod(file_path, 0o600) + except Exception: + LOG.error(_("%s file can not be wrote.") % file_path) + + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + + # NOTE(zhiyan): make sure scrubber does not cleanup + # 'pending_delete' images concurrently before the code + # get lock and reach here. + try: + image = self.registry.get_image(image_id) + if image['status'] == 'deleted': + return + except exception.NotFound as e: + LOG.error(_("Failed to find image to delete: " + "%(e)s") % locals()) + return + + delete_time = time.time() + self.scrub_time + file_path = os.path.join(self.scrubber_datadir, str(image_id)) + + if self.metadata_encryption_key is not None: + uri = crypt.urlsafe_encrypt(self.metadata_encryption_key, + uri, 64) + + if os.path.exists(file_path): + # Append the uri of location to the queue file + with open(file_path, 'a') as f: + f.write('\n') + f.write('\n'.join([uri, str(int(delete_time))])) + else: + # NOTE(zhiyan): Protect the file before we write any data. + open(file_path, 'w').close() + os.chmod(file_path, 0o600) + with open(file_path, 'w') as f: + f.write('\n'.join([uri, str(int(delete_time))])) + os.utime(file_path, (delete_time, delete_time)) + + def _walk_all_locations(self, remove=False): + """Returns a list of image id and location tuple from scrub queue. + + :param remove: Whether remove location from queue or not after walk + + :retval a list of image image_id and location tuple from scrub queue + """ + if not os.path.exists(self.scrubber_datadir): + LOG.info(_("%s directory does not exist.") % self.scrubber_datadir) + return [] + + ret = [] + for root, dirs, files in os.walk(self.scrubber_datadir): + for image_id in files: + if not uuidutils.is_uuid_like(image_id): + continue + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + file_path = os.path.join(self.scrubber_datadir, image_id) + uris, delete_times = self._read_queue_file(file_path) + + remove_record_idxs = [] + skipped = False + for (record_idx, delete_time) in enumerate(delete_times): + if delete_time > time.time(): + skipped = True + continue + else: + ret.append((image_id, uris[record_idx])) + remove_record_idxs.append(record_idx) + if remove: + if skipped: + # NOTE(zhiyan): remove location records from + # the queue file. + self._update_queue_file(file_path, + remove_record_idxs) + else: + utils.safe_remove(file_path) + return ret + + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations() + + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations(remove=True) + + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + return os.path.exists(os.path.join(self.scrubber_datadir, + str(image_id))) + + +class ScrubDBQueue(ScrubQueue): + """Database-based image scrub queue class.""" + def __init__(self): + super(ScrubDBQueue, self).__init__() + self.cleanup_scrubber_time = CONF.cleanup_scrubber_time + + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + raise NotImplementedError + + def _walk_all_locations(self, remove=False): + """Returns a list of image id and location tuple from scrub queue. + + :param remove: Whether remove location from queue or not after walk + + :retval a list of image id and location tuple from scrub queue + """ + filters = {'deleted': True, + 'is_public': 'none', + 'status': 'pending_delete'} + ret = [] + for image in self.registry.get_images_detailed(filters=filters): + deleted_at = image.get('deleted_at') + if not deleted_at: + continue + + # NOTE: Strip off microseconds which may occur after the last '.,' + # Example: 2012-07-07T19:14:34.974216 + date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0] + delete_time = calendar.timegm(time.strptime(date_str, + "%Y-%m-%dT%H:%M:%S")) + + if delete_time + self.cleanup_scrubber_time > time.time(): + continue + + ret.extend([(image['id'], location['uri']) + for location in image['location_data']]) + + if remove: + self.registry.update_image(image['id'], {'status': 'deleted'}) + return ret + + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations() + + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations(remove=True) + + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + try: + image = self.registry.get_image(image_id) + return image['status'] == 'pending_delete' + except exception.NotFound as e: + return False + + +_file_queue = None +_db_queue = None + + +def get_scrub_queues(): + global _file_queue, _db_queue + if not _file_queue: + _file_queue = ScrubFileQueue() + if not _db_queue: + _db_queue = ScrubDBQueue() + return (_file_queue, _db_queue) class Daemon(object): @@ -73,140 +378,148 @@ class Daemon(object): class Scrubber(object): - CLEANUP_FILE = ".cleanup" + def __init__(self, store_api): + LOG.info(_("Initializing scrubber with configuration: %s") % + unicode({'scrubber_datadir': CONF.scrubber_datadir, + 'cleanup': CONF.cleanup_scrubber, + 'cleanup_time': CONF.cleanup_scrubber_time, + 'registry_host': CONF.registry_host, + 'registry_port': CONF.registry_port})) - def __init__(self): - self.datadir = CONF.scrubber_datadir - self.cleanup = CONF.cleanup_scrubber - self.cleanup_time = CONF.cleanup_scrubber_time - # configs for registry API store auth - self.admin_user = CONF.admin_user - self.admin_tenant = CONF.admin_tenant_name + utils.safe_mkdirs(CONF.scrubber_datadir) - host, port = CONF.registry_host, CONF.registry_port - - LOG.info(_("Initializing scrubber with conf: %s") % - {'datadir': self.datadir, 'cleanup': self.cleanup, - 'cleanup_time': self.cleanup_time, - 'registry_host': host, 'registry_port': port}) + self.store_api = store_api registry.configure_registry_client() registry.configure_registry_admin_creds() - ctx = context.RequestContext() - self.registry = registry.get_registry_client(ctx) + self.registry = registry.get_registry_client(context.RequestContext()) - utils.safe_mkdirs(self.datadir) + (self.file_queue, self.db_queue) = get_scrub_queues() + + def _get_delete_jobs(self, queue, pop): + try: + if pop: + image_id_uri_list = queue.pop_all_locations() + else: + image_id_uri_list = queue.get_all_locations() + except: + LOG.error(_("Can not %s scrub jobs from queue.") % + 'pop' if pop else 'get') + return None + + delete_jobs = {} + for image_id, image_uri in image_id_uri_list: + if not image_id in delete_jobs: + delete_jobs[image_id] = [] + delete_jobs[image_id].append((image_id, image_uri)) + return delete_jobs def run(self, pool, event=None): - now = time.time() + delete_jobs = self._get_delete_jobs(self.file_queue, True) + if delete_jobs: + for image_id, jobs in delete_jobs.iteritems(): + self._scrub_image(pool, image_id, jobs) - if not os.path.exists(self.datadir): - LOG.info(_("%s does not exist") % self.datadir) - return - - delete_work = [] - for root, dirs, files in os.walk(self.datadir): - for id in files: - if id == self.CLEANUP_FILE: - continue - - file_name = os.path.join(root, id) - delete_time = os.stat(file_name).st_mtime - - if delete_time > now: - continue - - uri, delete_time = read_queue_file(file_name) - - if delete_time > now: - continue - - delete_work.append((id, uri, now)) - - LOG.info(_("Deleting %s images") % len(delete_work)) - # NOTE(bourke): The starmap must be iterated to do work - for job in pool.starmap(self._delete, delete_work): - pass - - if self.cleanup: + if CONF.cleanup_scrubber: self._cleanup(pool) - def _delete(self, id, uri, now): - file_path = os.path.join(self.datadir, str(id)) + def _scrub_image(self, pool, image_id, delete_jobs): + if len(delete_jobs) == 0: + return + + LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") % + {'id': image_id, 'count': len(delete_jobs)}) + # NOTE(bourke): The starmap must be iterated to do work + list(pool.starmap(self._delete_image_from_backend, delete_jobs)) + + image = self.registry.get_image(image_id) + if (image['status'] == 'pending_delete' and + not self.file_queue.has_image(image_id)): + self.registry.update_image(image_id, {'status': 'deleted'}) + + def _delete_image_from_backend(self, image_id, uri): if CONF.metadata_encryption_key is not None: uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri) + try: - LOG.debug(_("Deleting %(id)s") % {'id': id}) + LOG.debug(_("Deleting %(uri)s from image %(image_id)s.") % + {'image_id': image_id, 'uri': uri}) + # Here we create a request context with credentials to support # delayed delete when using multi-tenant backend storage - ctx = context.RequestContext(auth_tok=self.registry.auth_tok, - user=self.admin_user, - tenant=self.admin_tenant) - store.delete_from_backend(ctx, uri) - except store.UnsupportedBackend: - msg = _("Failed to delete image from store (%(id)s).") - LOG.error(msg % {'id': id}) - except exception.NotFound: - msg = _("Image not found in store (%(id)s).") - LOG.error(msg % {'id': id}) + admin_tenant = CONF.admin_tenant_name + auth_token = self.registry.auth_tok + admin_context = context.RequestContext(user=CONF.admin_user, + tenant=admin_tenant, + auth_tok=auth_token) - self.registry.update_image(id, {'status': 'deleted'}) - utils.safe_remove(file_path) + self.store_api.delete_from_backend(admin_context, uri) + except Exception: + msg = _("Failed to delete image %(image_id)s from %(uri)s.") + LOG.error(msg % {'image_id': image_id, 'uri': uri}) + + def _read_cleanup_file(self, file_path): + """Reading cleanup to get latest cleanup timestamp. + + :param file_path: Cleanup status file full path + + :retval latest cleanup timestamp + """ + try: + if not os.path.exists(file_path): + msg = _("%s file is not exists.") % unicode(file_path) + raise Exception(msg) + atime = int(os.path.getatime(file_path)) + mtime = int(os.path.getmtime(file_path)) + if atime != mtime: + msg = _("%s file contains conflicting cleanup " + "timestamp.") % unicode(file_path) + raise Exception(msg) + return atime + except Exception as e: + LOG.error(e) + return None + + def _update_cleanup_file(self, file_path, cleanup_time): + """Update latest cleanup timestamp to cleanup file. + + :param file_path: Cleanup status file full path + :param cleanup_time: The Latest cleanup timestamp + """ + try: + open(file_path, 'w').close() + os.chmod(file_path, 0o600) + os.utime(file_path, (cleanup_time, cleanup_time)) + except Exception: + LOG.error(_("%s file can not be created.") % unicode(file_path)) def _cleanup(self, pool): now = time.time() - cleanup_file = os.path.join(self.datadir, self.CLEANUP_FILE) + cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup") if not os.path.exists(cleanup_file): - write_queue_file(cleanup_file, 'cleanup', now) + self._update_cleanup_file(cleanup_file, now) return - _uri, last_run_time = read_queue_file(cleanup_file) - cleanup_time = last_run_time + self.cleanup_time + last_cleanup_time = self._read_cleanup_file(cleanup_file) + cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time if cleanup_time > now: return - LOG.info(_("Getting images deleted before %s") % self.cleanup_time) - write_queue_file(cleanup_file, 'cleanup', now) + LOG.info(_("Getting images deleted before " + "%s") % CONF.cleanup_scrubber_time) + self._update_cleanup_file(cleanup_file, now) - filters = {'deleted': True, 'is_public': 'none', - 'status': 'pending_delete'} - pending_deletes = self.registry.get_images_detailed(filters=filters) + delete_jobs = self._get_delete_jobs(self.db_queue, False) + if not delete_jobs: + return - delete_work = [] - for pending_delete in pending_deletes: - deleted_at = pending_delete.get('deleted_at') - if not deleted_at: - continue - - time_fmt = "%Y-%m-%dT%H:%M:%S" - # NOTE: Strip off microseconds which may occur after the last '.,' - # Example: 2012-07-07T19:14:34.974216 - date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0] - delete_time = calendar.timegm(time.strptime(date_str, - time_fmt)) - - if delete_time + self.cleanup_time > now: - continue - - delete_work.append((pending_delete['id'], - pending_delete['location'], - now)) - - LOG.info(_("Deleting %s images") % len(delete_work)) - # NOTE(bourke): The starmap must be iterated to do work - for job in pool.starmap(self._delete, delete_work): - pass - - -def read_queue_file(file_path): - with open(file_path) as f: - uri = f.readline().strip() - delete_time = int(f.readline().strip()) - return uri, delete_time - - -def write_queue_file(file_path, uri, delete_time): - with open(file_path, 'w') as f: - f.write('\n'.join([uri, str(int(delete_time))])) - os.chmod(file_path, 0o600) - os.utime(file_path, (delete_time, delete_time)) + for image_id, jobs in delete_jobs.iteritems(): + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + if not self.file_queue.has_image(image_id): + # NOTE(zhiyan): scrubber should not cleanup this image + # since a queue file be created for this 'pending_delete' + # image concurrently before the code get lock and + # reach here. The checking only be worth if glance-api and + # glance-scrubber service be deployed on a same host. + self._scrub_image(pool, image_id, jobs) diff --git a/glance/tests/functional/__init__.py b/glance/tests/functional/__init__.py index 4acc7c242d..6e6a637d67 100644 --- a/glance/tests/functional/__init__.py +++ b/glance/tests/functional/__init__.py @@ -315,6 +315,7 @@ class ApiServer(Server): self.sql_connection = os.environ.get('GLANCE_TEST_SQL_CONNECTION', default_sql_connection) self.user_storage_quota = 0 + self.lock_path = self.test_dir self.conf_base = """[DEFAULT] verbose = %(verbose)s @@ -363,7 +364,8 @@ show_image_direct_url = %(show_image_direct_url)s show_multiple_locations = %(show_multiple_locations)s user_storage_quota = %(user_storage_quota)s enable_v1_api = %(enable_v1_api)s -enable_v2_api= %(enable_v2_api)s +enable_v2_api = %(enable_v2_api)s +lock_path = %(lock_path)s [paste_deploy] flavor = %(deployment_flavor)s """ @@ -515,6 +517,7 @@ class ScrubberDaemon(Server): self.swift_store_auth_version = kwargs.get("swift_store_auth_version", "2") self.metadata_encryption_key = "012345678901234567890123456789ab" + self.lock_path = self.test_dir self.conf_base = """[DEFAULT] verbose = %(verbose)s debug = %(debug)s @@ -531,6 +534,7 @@ swift_store_user = %(swift_store_user)s swift_store_key = %(swift_store_key)s swift_store_container = %(swift_store_container)s swift_store_auth_version = %(swift_store_auth_version)s +lock_path = %(lock_path)s """ def start(self, expect_exit=True, expected_exitcode=0, **kwargs): diff --git a/glance/tests/unit/base.py b/glance/tests/unit/base.py index f7054f855e..4a19730e1e 100644 --- a/glance/tests/unit/base.py +++ b/glance/tests/unit/base.py @@ -69,7 +69,8 @@ class IsolatedUnitTest(StoreClearingUnitTest): debug=False, default_store='filesystem', filesystem_store_datadir=os.path.join(self.test_dir), - policy_file=policy_file) + policy_file=policy_file, + lock_path=os.path.join(self.test_dir)) stubs.stub_out_registry_and_store_server(self.stubs, self.test_dir, registry=self.registry) diff --git a/glance/tests/unit/test_http_store.py b/glance/tests/unit/test_http_store.py index 5b84d405cd..becc0f9190 100644 --- a/glance/tests/unit/test_http_store.py +++ b/glance/tests/unit/test_http_store.py @@ -184,6 +184,6 @@ class TestHttpStore(base.StoreClearingUnitTest): ctx = context.RequestContext() stub_out_registry_image_update(self.stubs) try: - safe_delete_from_backend(uri, ctx, 'image_id') + safe_delete_from_backend(ctx, uri, 'image_id') except exception.StoreDeleteNotSupported: self.fail('StoreDeleteNotSupported should be swallowed') diff --git a/glance/tests/unit/test_scrubber.py b/glance/tests/unit/test_scrubber.py index f775ef4b2f..7ed89b3112 100644 --- a/glance/tests/unit/test_scrubber.py +++ b/glance/tests/unit/test_scrubber.py @@ -20,6 +20,7 @@ import shutil import time import tempfile +import eventlet import mox from glance.common import exception @@ -50,17 +51,17 @@ class TestScrubber(test_utils.BaseTestCase): uri = 'file://some/path/%s' % (fname) id = 'helloworldid' now = time.time() - scrub = glance.store.scrubber.Scrubber() + scrub = glance.store.scrubber.Scrubber(glance.store) scrub.registry = self.mox.CreateMockAnything() - self.mox.StubOutWithMock(glance.store, "delete_from_backend") - + scrub.registry.get_image(id).AndReturn({'status': 'pending_delete'}) scrub.registry.update_image(id, {'status': 'deleted'}) + self.mox.StubOutWithMock(glance.store, "delete_from_backend") glance.store.delete_from_backend( mox.IgnoreArg(), uri).AndRaise(ex) - self.mox.ReplayAll() - scrub._delete(id, uri, now) + scrub._scrub_image(eventlet.greenpool.GreenPool(1), + id, [(id, uri)]) self.mox.VerifyAll() q_path = os.path.join(self.data_dir, id) diff --git a/glance/tests/unit/utils.py b/glance/tests/unit/utils.py index 500c964a70..773716b8a5 100644 --- a/glance/tests/unit/utils.py +++ b/glance/tests/unit/utils.py @@ -124,18 +124,20 @@ class FakeStoreAPI(object): except KeyError: raise exception.NotFound() - def safe_delete_from_backend(self, uri, context, id, **kwargs): + def safe_delete_from_backend(self, context, uri, id, **kwargs): try: del self.data[uri] except KeyError: pass - def schedule_delayed_delete_from_backend(self, uri, id, **kwargs): + def schedule_delayed_delete_from_backend(self, context, uri, id, **kwargs): pass def delete_image_from_backend(self, context, store_api, image_id, uri): - glance.store.delete_image_from_backend(context, - store_api, image_id, uri) + if CONF.delayed_delete: + self.schedule_delayed_delete_from_backend(context, uri, image_id) + else: + self.safe_delete_from_backend(context, uri, image_id) def get_size_from_backend(self, context, location): return self.get_from_backend(context, location)[1] diff --git a/glance/tests/unit/v1/test_upload_utils.py b/glance/tests/unit/v1/test_upload_utils.py index afbe44cce3..739ce00146 100644 --- a/glance/tests/unit/v1/test_upload_utils.py +++ b/glance/tests/unit/v1/test_upload_utils.py @@ -43,7 +43,7 @@ class TestUploadUtils(base.StoreClearingUnitTest): id = unit_test_utils.UUID1 self.mox.StubOutWithMock(glance.store, "safe_delete_from_backend") - glance.store.safe_delete_from_backend(location, req.context, id) + glance.store.safe_delete_from_backend(req.context, location, id) self.mox.ReplayAll() upload_utils.initiate_deletion(req, location, id) @@ -57,7 +57,8 @@ class TestUploadUtils(base.StoreClearingUnitTest): self.mox.StubOutWithMock(glance.store, "schedule_delayed_delete_from_backend") - glance.store.schedule_delayed_delete_from_backend(location, + glance.store.schedule_delayed_delete_from_backend(req.context, + location, id) self.mox.ReplayAll() diff --git a/openstack-common.conf b/openstack-common.conf index 8dd9a5a08a..48aaf7d895 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,11 +1,13 @@ [DEFAULT] # The list of modules to copy from openstack-common +module=fileutils module=gettextutils module=importutils module=install_venv_common module=jsonutils module=local +module=lockutils module=log module=notifier module=policy