* Scrubber now uses registry client to communicate with registry

* glance-api writes out to a scrubber "queue" dir on delete
* Scrubber determines images to deleted from "queue" dir not db

Change-Id: Ia5574fc75f1a9c763bdef0f5773c2c182932b68a
This commit is contained in:
Jason Kölker 2011-08-16 12:38:12 -05:00
parent d5099cca70
commit 27b3df281f
10 changed files with 266 additions and 109 deletions

@ -131,6 +131,13 @@ image_cache_stall_timeout = 86400
# Turn on/off delayed delete
delayed_delete = False
# Delayed delete time in seconds
scrub_time = 43200
# Directory that the scrubber will use to remind itself of what to delete
# Make sure this is also set in glance-scrubber.conf
scrubber_datadir = /var/lib/glance/scrubber
[pipeline:glance-api]
pipeline = versionnegotiation context apiv1app
# NOTE: use the following pipeline for keystone

@ -12,28 +12,27 @@ log_file = /var/log/glance/scrubber.log
# Send logs to syslog (/dev/log) instead of to file specified by `log_file`
use_syslog = False
# Delayed delete time in seconds
scrub_time = 43200
# Should we run our own loop or rely on cron/scheduler to run us
daemon = False
# Loop time between checking the db for new items to schedule for delete
# Loop time between checking for new items to schedule for delete
wakeup_time = 300
# SQLAlchemy connection string for the reference implementation
# registry server. Any valid SQLAlchemy connection string is fine.
# See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine
sql_connection = sqlite:///glance.sqlite
# Directory that the scrubber will use to remind itself of what to delete
# Make sure this is also set in glance-api.conf
scrubber_datadir = /var/lib/glance/scrubber
# Period in seconds after which SQLAlchemy should reestablish its connection
# to the database.
#
# MySQL uses a default `wait_timeout` of 8 hours, after which it will drop
# idle connections. This can result in 'MySQL Gone Away' exceptions. If you
# notice this, you can lower this value to ensure that SQLAlchemy reconnects
# before MySQL can drop the connection.
sql_idle_timeout = 3600
# Only one server in your deployment should be designated the cleanup host
cleanup_scrubber = False
# pending_delete items older than this time are candidates for cleanup
cleanup_scrubber_time = 86400
# Address to find the registry server for cleanups
registry_host = 0.0.0.0
# Port the registry server is listening on
registry_port = 9191
[app:glance-scrubber]
paste.app_factory = glance.store.scrubber:app_factory

@ -36,7 +36,7 @@ class RequestContext(object):
self.roles = roles or []
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
self._show_deleted = show_deleted
self.owner_is_tenant = owner_is_tenant
@property
@ -44,6 +44,13 @@ class RequestContext(object):
"""Return the owner to correlate with an image."""
return self.tenant if self.owner_is_tenant else self.user
@property
def show_deleted(self):
"""Admins can see deleted by default"""
if self._show_deleted or self.is_admin:
return True
return False
class ContextMiddleware(wsgi.Middleware):
def __init__(self, app, options):

@ -21,6 +21,7 @@ System-level utilities and helper functions.
"""
import datetime
import errno
import inspect
import logging
import os
@ -133,6 +134,22 @@ def parse_isotime(timestr):
return datetime.datetime.strptime(timestr, TIME_FORMAT)
def safe_mkdirs(path):
try:
os.makedirs(path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
def safe_remove(path):
try:
os.remove(path)
except OSError, e:
if e.errno != errno.ENOENT:
raise
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""

@ -145,12 +145,15 @@ def image_get(context, image_id, session=None):
raise exception.NotFound("No image found")
try:
image = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
options(joinedload(models.Image.members)).\
filter_by(deleted=_deleted(context)).\
filter_by(id=image_id).\
one()
query = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
options(joinedload(models.Image.members)).\
filter_by(id=image_id)
if not can_show_deleted(context):
query = query.filter_by(deleted=False)
image = query.one()
except exc.NoResultFound:
raise exception.NotFound("No image found with ID %s" % image_id)
@ -161,30 +164,6 @@ def image_get(context, image_id, session=None):
return image
def image_get_all_pending_delete(context, delete_time=None, limit=None):
"""Get all images that are pending deletion
:param limit: maximum number of images to return
"""
session = get_session()
query = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
options(joinedload(models.Image.members)).\
filter_by(deleted=True).\
filter(models.Image.status == 'pending_delete')
if delete_time:
query = query.filter(models.Image.deleted_at <= delete_time)
query = query.order_by(desc(models.Image.deleted_at)).\
order_by(desc(models.Image.id))
if limit:
query = query.limit(limit)
return query.all()
def image_get_all(context, filters=None, marker=None, limit=None,
sort_key='created_at', sort_dir='desc'):
"""
@ -204,9 +183,16 @@ def image_get_all(context, filters=None, marker=None, limit=None,
query = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
options(joinedload(models.Image.members)).\
filter_by(deleted=_deleted(context)).\
filter(models.Image.status != 'killed')
if not can_show_deleted(context) or 'deleted' not in filters:
query = query.filter_by(deleted=False)
else:
query = query.filter_by(deleted=filters['deleted'])
if 'deleted' in filters:
del filters['deleted']
sort_dir_func = {
'asc': asc,
'desc': desc,
@ -469,11 +455,15 @@ def image_member_get(context, member_id, session=None):
"""Get an image member or raise if it does not exist."""
session = session or get_session()
try:
member = session.query(models.ImageMember).\
query = session.query(models.ImageMember).\
options(joinedload(models.ImageMember.image)).\
filter_by(deleted=_deleted(context)).\
filter_by(id=member_id).\
one()
filter_by(id=member_id)
if not can_show_deleted(context):
query = query.filter_by(deleted=False)
member = query.one()
except exc.NoResultFound:
raise exception.NotFound("No membership found with ID %s" % member_id)
@ -490,11 +480,16 @@ def image_member_find(context, image_id, member, session=None):
try:
# Note lack of permissions check; this function is called from
# RequestContext.is_image_visible(), so avoid recursive calls
return session.query(models.ImageMember).\
query = session.query(models.ImageMember).\
options(joinedload(models.ImageMember.image)).\
filter_by(image_id=image_id).\
filter_by(member=member).\
one()
filter_by(member=member)
if not can_show_deleted(context):
query = query.filter_by(deleted=False)
return query.one()
except exc.NoResultFound:
raise exception.NotFound("No membership found for image %s member %s" %
(image_id, member))
@ -515,9 +510,11 @@ def image_member_get_memberships(context, member, marker=None, limit=None,
session = get_session()
query = session.query(models.ImageMember).\
options(joinedload(models.ImageMember.image)).\
filter_by(deleted=_deleted(context)).\
filter_by(member=member)
if not can_show_deleted(context):
query = query.filter_by(deleted=False)
sort_dir_func = {
'asc': asc,
'desc': desc,
@ -551,7 +548,7 @@ def image_member_get_memberships(context, member, marker=None, limit=None,
# pylint: disable-msg=C0111
def _deleted(context):
def can_show_deleted(context):
"""
Calculates whether to include deleted objects based on context.
Currently just looks for a flag called deleted in the context dict.

@ -25,8 +25,9 @@ import logging
import routes
from webob import exc
from glance.common import wsgi
from glance.common import exception
from glance.common import utils
from glance.common import wsgi
from glance.registry.db import api as db_api
@ -147,8 +148,14 @@ class Controller(object):
if req.context.is_admin:
# Only admin gets to look for non-public images
filters['is_public'] = self._get_is_public(req)
# The same for deleted
filters['deleted'] = self._parse_deleted_filter(req)
else:
filters['is_public'] = True
# NOTE(jkoelker): This is technically unnecessary since the db
# api will force deleted=False if its not an
# admin context. But explicit > implicit.
filters['deleted'] = False
for param in req.str_params:
if param in SUPPORTED_FILTERS:
filters[param] = req.str_params.get(param)
@ -240,6 +247,13 @@ class Controller(object):
raise exc.HTTPBadRequest(_("is_public must be None, True, "
"or False"))
def _parse_deleted_filter(self, req):
"""Parse deleted into something usable."""
deleted = req.str_params.get('deleted', False)
if not deleted:
return False
return utils.bool_from_string(deleted)
def show(self, req, id):
"""Return data about the given image id."""
try:

@ -18,6 +18,7 @@
import logging
import optparse
import os
import time
import urlparse
from glance import registry
@ -168,5 +169,22 @@ def schedule_delete_from_backend(uri, options, context, image_id, **kwargs):
msg = _("Failed to delete image from store (%(uri)s).") % locals()
logger.error(msg)
datadir = config.get_option(options, 'scrubber_datadir')
scrub_time = config.get_option(options, 'scrub_time', type='int',
default=0)
delete_time = time.time() + scrub_time
file_path = os.path.join(datadir, str(image_id))
utils.safe_mkdirs(datadir)
if os.path.exists(file_path):
msg = _("Image id %(image_id)s already queued for delete") % {
'image_id': image_id}
raise exception.Duplicate(msg)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.chmod(file_path, 0600)
os.utime(file_path, (delete_time, delete_time))
registry.update_image_metadata(options, context, image_id,
{'status': 'pending_delete'})

@ -15,20 +15,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import calendar
import eventlet
import logging
import time
import os
from glance import registry
from glance import store
import glance.store.filesystem
import glance.store.http
import glance.store.s3
import glance.store.swift
from glance import registry
from glance import store
from glance.common import config
from glance.registry import context
from glance.common import utils
from glance.common import exception
from glance.registry.db import api as db_api
from glance.registry import context
from glance.registry import client
logger = logging.getLogger('glance.store.scrubber')
@ -60,35 +63,123 @@ class Daemon(object):
class Scrubber(object):
CLEANUP_FILE = ".cleanup"
def __init__(self, options):
logger.info(_("Initializing scrubber with options: %s") % options)
self.options = options
scrub_time = config.get_option(options, 'scrub_time', type='int',
default=0)
logger.info(_("Scrub interval set to %s seconds") % scrub_time)
self.scrub_time = datetime.timedelta(seconds=scrub_time)
db_api.configure_db(options)
self.datadir = config.get_option(options, 'scrubber_datadir')
self.cleanup = config.get_option(options, 'cleanup_scrubber',
type='bool', default=False)
host = config.get_option(options, 'registry_host')
port = config.get_option(options, 'registry_port', type='int')
self.registry = client.RegistryClient(host, port)
utils.safe_mkdirs(self.datadir)
if self.cleanup:
self.cleanup_time = config.get_option(options,
'cleanup_scrubber_time',
type='int', default=86400)
store.create_stores(options)
def run(self, pool, event=None):
delete_time = datetime.datetime.utcnow() - self.scrub_time
logger.info(_("Getting images deleted before %s") % delete_time)
pending = db_api.image_get_all_pending_delete(None, delete_time)
num_pending = len(pending)
logger.info(_("Deleting %(num_pending)s images") % locals())
delete_work = [(p['id'], p['location']) for p in pending]
now = time.time()
if not os.path.exists(self.datadir):
logger.info(_("%s does not exist") % self.datadir)
return
delete_work = []
for root, dirs, files in os.walk(self.datadir):
for id in files:
if id == self.CLEANUP_FILE:
continue
file_name = os.path.join(root, id)
delete_time = os.stat(file_name).st_mtime
if delete_time > now:
continue
uri, delete_time = read_queue_file(file_name)
if delete_time > now:
continue
delete_work.append((int(id), uri, now))
logger.info(_("Deleting %s images") % len(delete_work))
pool.starmap(self._delete, delete_work)
def _delete(self, image_id, location):
try:
logger.debug(_("Deleting %(location)s") % locals())
store.delete_from_backend(location)
except (store.UnsupportedBackend, exception.NotFound):
msg = _("Failed to delete image from store (%(uri)s).") % locals()
logger.error(msg)
if self.cleanup:
self._cleanup()
ctx = context.RequestContext(is_admin=True, show_deleted=True)
db_api.image_update(ctx, image_id, {'status': 'deleted'})
def _delete(self, id, uri, now):
file_path = os.path.join(self.datadir, str(id))
try:
logger.debug(_("Deleting %(uri)s") % {'uri': uri})
store.delete_from_backend(uri)
except store.UnsupportedBackend:
msg = _("Failed to delete image from store (%(uri)s).")
logger.error(msg % {'uri': uri})
write_queue_file(file_path, uri, now)
self.registry.update_image(id, {'status': 'deleted'})
utils.safe_remove(file_path)
def _cleanup(self):
now = time.time()
cleanup_file = os.path.join(self.datadir, self.CLEANUP_FILE)
if not os.path.exists(cleanup_file):
write_queue_file(cleanup_file, 'cleanup', now)
return
_uri, last_run_time = read_queue_file(cleanup_file)
cleanup_time = last_run_time + self.cleanup_time
if cleanup_time > now:
return
logger.info(_("Getting images deleted before %s") % self.cleanup_time)
write_queue_file(cleanup_file, 'cleanup', now)
filters = {'deleted': True, 'is_public': 'none',
'status': 'pending_delete'}
pending_deletes = self.registry.get_images_detailed(filters=filters)
delete_work = []
for pending_delete in pending_deletes:
deleted_at = pending_delete.get('deleted_at')
if not deleted_at:
continue
time_fmt = "%Y-%m-%dT%H:%M:%S"
delete_time = calendar.timegm(time.strptime(deleted_at,
time_fmt))
if delete_time + self.cleanup_time > now:
continue
delete_work.append((int(pending_delete['id']),
pending_delete['location'],
now))
logger.info(_("Deleting %s images") % len(delete_work))
pool.starmap(self._delete, delete_work)
def read_queue_file(file_path):
with open(file_path) as f:
uri = f.readline().strip()
delete_time = int(f.readline().strip())
return uri, delete_time
def write_queue_file(file_path, uri, delete_time):
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.chmod(file_path, 0600)
os.utime(file_path, (delete_time, delete_time))
def app_factory(global_config, **local_conf):

@ -150,6 +150,8 @@ class ApiServer(Server):
"images")
self.pid_file = os.path.join(self.test_dir,
"api.pid")
self.scrubber_datadir = os.path.join(self.test_dir,
"scrubber")
self.log_file = os.path.join(self.test_dir, "api.log")
self.registry_port = registry_port
self.s3_store_host = "s3.amazonaws.com"
@ -186,6 +188,8 @@ swift_store_large_object_size = %(swift_store_large_object_size)s
swift_store_large_object_chunk_size = %(swift_store_large_object_chunk_size)s
delayed_delete = %(delayed_delete)s
owner_is_tenant = %(owner_is_tenant)s
scrub_time = 5
scrubber_datadir = %(scrubber_datadir)s
[pipeline:glance-api]
pipeline = versionnegotiation context apiv1app
@ -254,25 +258,26 @@ class ScrubberDaemon(Server):
Server object that starts/stops/manages the Scrubber server
"""
def __init__(self, test_dir, sql_connection, daemon=False):
def __init__(self, test_dir, registry_port, daemon=False):
# NOTE(jkoelker): Set the port to 0 since we actually don't listen
super(ScrubberDaemon, self).__init__(test_dir, 0)
self.server_name = 'scrubber'
self.daemon = daemon
self.sql_connection = sql_connection
self.registry_port = registry_port
self.scrubber_datadir = os.path.join(self.test_dir,
"scrubber")
self.pid_file = os.path.join(self.test_dir, "scrubber.pid")
self.log_file = os.path.join(self.test_dir, "scrubber.log")
self.conf_base = """[DEFAULT]
verbose = %(verbose)s
debug = %(debug)s
log_file = %(log_file)s
scrub_time = 5
daemon = %(daemon)s
wakeup_time = 2
sql_connection = %(sql_connection)s
sql_idle_timeout = 3600
scrubber_datadir = %(scrubber_datadir)s
registry_host = 0.0.0.0
registry_port = %(registry_port)s
[app:glance-scrubber]
paste.app_factory = glance.store.scrubber:app_factory
@ -302,9 +307,8 @@ class FunctionalTest(unittest.TestCase):
self.registry_server = RegistryServer(self.test_dir,
self.registry_port)
registry_db = self.registry_server.sql_connection
self.scrubber_daemon = ScrubberDaemon(self.test_dir,
sql_connection=registry_db)
self.registry_port)
self.pid_files = [self.api_server.pid_file,
self.registry_server.pid_file,

@ -19,12 +19,11 @@ import os
import time
import unittest
from sqlalchemy import create_engine
from glance.tests import functional
from glance.tests.utils import execute
from glance import client
from glance.registry import client as registry_client
TEST_IMAGE_DATA = '*' * 5 * 1024
@ -41,7 +40,10 @@ class TestScrubber(functional.FunctionalTest):
def _get_client(self):
return client.Client("localhost", self.api_port)
@functional.runs_sql
def _get_registry_client(self):
return registry_client.RegistryClient('localhost',
self.registry_port)
def test_immediate_delete(self):
"""
test that images get deleted immediately by default
@ -51,26 +53,27 @@ class TestScrubber(functional.FunctionalTest):
self.start_servers()
client = self._get_client()
registry = self._get_registry_client()
meta = client.add_image(TEST_IMAGE_META, TEST_IMAGE_DATA)
id = meta['id']
sql = "SELECT * FROM images WHERE status = 'pending_delete'"
recs = list(self.run_sql_cmd(sql))
filters = {'deleted': True, 'is_public': 'none',
'status': 'pending_delete'}
recs = registry.get_images_detailed(filters=filters)
self.assertFalse(recs)
client.delete_image(id)
recs = list(self.run_sql_cmd(sql))
recs = registry.get_images_detailed(filters=filters)
self.assertFalse(recs)
sql = "SELECT * FROM images WHERE id = '%s'" % id
recs = list(self.run_sql_cmd(sql))
filters = {'deleted': True, 'is_public': 'none', 'status': 'deleted'}
recs = registry.get_images_detailed(filters=filters)
self.assertTrue(recs)
for rec in recs:
self.assertEqual(rec['status'], 'deleted')
self.stop_servers()
@functional.runs_sql
def test_delayed_delete(self):
"""
test that images don't get deleted immediatly and that the scrubber
@ -78,24 +81,24 @@ class TestScrubber(functional.FunctionalTest):
"""
self.cleanup()
registry_db = self.registry_server.sql_connection
self.start_servers(delayed_delete=True, sql_connection=registry_db,
daemon=True)
self.start_servers(delayed_delete=True, daemon=True)
client = self._get_client()
registry = self._get_registry_client()
meta = client.add_image(TEST_IMAGE_META, TEST_IMAGE_DATA)
id = meta['id']
sql = "SELECT * FROM images WHERE status = 'pending_delete'"
recs = list(self.run_sql_cmd(sql))
filters = {'deleted': True, 'is_public': 'none',
'status': 'pending_delete'}
recs = registry.get_images_detailed(filters=filters)
self.assertFalse(recs)
client.delete_image(id)
recs = self.run_sql_cmd(sql)
recs = registry.get_images_detailed(filters=filters)
self.assertTrue(recs)
sql = "SELECT * FROM images WHERE id = '%s'" % id
recs = list(self.run_sql_cmd(sql))
filters = {'deleted': True, 'is_public': 'none'}
recs = registry.get_images_detailed(filters=filters)
self.assertTrue(recs)
for rec in recs:
self.assertEqual(rec['status'], 'pending_delete')
@ -109,7 +112,7 @@ class TestScrubber(functional.FunctionalTest):
for _ in xrange(3):
time.sleep(5)
recs = list(self.run_sql_cmd(sql))
recs = registry.get_images_detailed(filters=filters)
self.assertTrue(recs)
# NOTE(jkoelker) Reset the deleted set for this loop