Scrub images in parallel
Prior to this change, images were being scrubbed serially while the image locations, if multiple, were being scrubbed parallely. For the general case, this may not achieve much performance gain as the number of images is likely to be more than the number of image locations per image. Hence, this change attempts to parallelize image scrubbing while leaving image locations to be scrubbed serially. Also, though eventlet greenpool was being used, greening the world wasn't done. As is, it's unlikely to achieve the intended gains. So, this change also monkey patches essential python modules. Finally, this change also makes the pool size configurable. This offers the flexibility to choose between serial or parallel scrubbing. Also, parallel scrubbing can be regulated to a desired level by altering the pool size. DocImpact Implements: blueprint scrub-images-in-parallel Change-Id: I5f18a70cd427e2c1e19a6bddeff317a46396eecc
This commit is contained in:
@@ -21,6 +21,11 @@ daemon = False
|
||||
# Loop time between checking for new items to schedule for delete
|
||||
wakeup_time = 300
|
||||
|
||||
# The size of thread pool to be used for scrubbing images. The default is one,
|
||||
# which signifies serial scrubbing. Any value above one indicates the max number
|
||||
# of images that may be scrubbed in parallel.
|
||||
scrub_pool_size = 1
|
||||
|
||||
# Directory that the scrubber will use to remind itself of what to delete
|
||||
# Make sure this is also set in glance-api.conf
|
||||
scrubber_datadir = /var/lib/glance/scrubber
|
||||
|
@@ -29,6 +29,7 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'glance', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
import eventlet
|
||||
|
||||
import glance_store
|
||||
from oslo_config import cfg
|
||||
@@ -37,6 +38,8 @@ from oslo_log import log as logging
|
||||
from glance.common import config
|
||||
from glance import scrubber
|
||||
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True, time=True, select=True,
|
||||
thread=True, os=True)
|
||||
|
||||
CONF = cfg.CONF
|
||||
logging.register_options(CONF)
|
||||
@@ -61,9 +64,7 @@ def main():
|
||||
server.start(app)
|
||||
server.wait()
|
||||
else:
|
||||
import eventlet
|
||||
pool = eventlet.greenpool.GreenPool(1000)
|
||||
app.run(pool)
|
||||
app.run()
|
||||
except RuntimeError as e:
|
||||
sys.exit("ERROR: %s" % e)
|
||||
|
||||
|
@@ -40,6 +40,12 @@ scrubber_opts = [
|
||||
cfg.IntOpt('scrub_time', default=0,
|
||||
help=_('The amount of time in seconds to delay before '
|
||||
'performing a delete.')),
|
||||
cfg.IntOpt('scrub_pool_size', default=1,
|
||||
help=_('The size of thread pool to be used for '
|
||||
'scrubbing images. The default is one, which '
|
||||
'signifies serial scrubbing. Any value above '
|
||||
'one indicates the max number of images that '
|
||||
'may be scrubbed in parallel.')),
|
||||
cfg.BoolOpt('delayed_delete', default=False,
|
||||
help=_('Turn on/off delayed delete.')),
|
||||
cfg.StrOpt('admin_role', default='admin',
|
||||
@@ -205,13 +211,14 @@ def get_scrub_queue():
|
||||
|
||||
|
||||
class Daemon(object):
|
||||
def __init__(self, wakeup_time=300, threads=1000):
|
||||
def __init__(self, wakeup_time=300, threads=100):
|
||||
LOG.info(_LI("Starting Daemon: wakeup_time=%(wakeup_time)s "
|
||||
"threads=%(threads)s"),
|
||||
{'wakeup_time': wakeup_time, 'threads': threads})
|
||||
self.wakeup_time = wakeup_time
|
||||
self.event = eventlet.event.Event()
|
||||
self.pool = eventlet.greenpool.GreenPool(threads)
|
||||
# This pool is used for periodic instantiation of scrubber
|
||||
self.daemon_pool = eventlet.greenpool.GreenPool(threads)
|
||||
|
||||
def start(self, application):
|
||||
self._run(application)
|
||||
@@ -225,7 +232,7 @@ class Daemon(object):
|
||||
|
||||
def _run(self, application):
|
||||
LOG.debug("Running application")
|
||||
self.pool.spawn_n(application.run, self.pool, self.event)
|
||||
self.daemon_pool.spawn_n(application.run, self.event)
|
||||
eventlet.spawn_after(self.wakeup_time, self._run, application)
|
||||
LOG.debug("Next run scheduled in %s seconds" % self.wakeup_time)
|
||||
|
||||
@@ -263,6 +270,7 @@ class Scrubber(object):
|
||||
auth_token=auth_token)
|
||||
|
||||
self.db_queue = get_scrub_queue()
|
||||
self.pool = eventlet.greenpool.GreenPool(CONF.scrub_pool_size)
|
||||
|
||||
def _get_delete_jobs(self):
|
||||
try:
|
||||
@@ -279,22 +287,21 @@ class Scrubber(object):
|
||||
delete_jobs[image_id].append((image_id, loc_id, loc_uri))
|
||||
return delete_jobs
|
||||
|
||||
def run(self, pool, event=None):
|
||||
def run(self, event=None):
|
||||
delete_jobs = self._get_delete_jobs()
|
||||
|
||||
if delete_jobs:
|
||||
for image_id, jobs in six.iteritems(delete_jobs):
|
||||
self._scrub_image(pool, image_id, jobs)
|
||||
list(self.pool.starmap(self._scrub_image, delete_jobs.items()))
|
||||
|
||||
def _scrub_image(self, pool, image_id, delete_jobs):
|
||||
def _scrub_image(self, image_id, delete_jobs):
|
||||
if len(delete_jobs) == 0:
|
||||
return
|
||||
|
||||
LOG.info(_LI("Scrubbing image %(id)s from %(count)d locations.") %
|
||||
{'id': image_id, 'count': len(delete_jobs)})
|
||||
# NOTE(bourke): The starmap must be iterated to do work
|
||||
list(pool.starmap(self._delete_image_location_from_backend,
|
||||
delete_jobs))
|
||||
|
||||
for img_id, loc_id, uri in delete_jobs:
|
||||
self._delete_image_location_from_backend(img_id, loc_id, uri)
|
||||
|
||||
image = self.registry.get_image(image_id)
|
||||
if image['status'] == 'pending_delete':
|
||||
|
@@ -15,7 +15,6 @@
|
||||
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import glance_store
|
||||
from mock import patch
|
||||
from mox3 import mox
|
||||
@@ -59,8 +58,7 @@ class TestScrubber(test_utils.BaseTestCase):
|
||||
uri,
|
||||
mox.IgnoreArg()).AndRaise(ex)
|
||||
self.mox.ReplayAll()
|
||||
scrub._scrub_image(eventlet.greenpool.GreenPool(1),
|
||||
id, [(id, '-', uri)])
|
||||
scrub._scrub_image(id, [(id, '-', uri)])
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_store_delete_unsupported_backend_exception(self):
|
||||
|
Reference in New Issue
Block a user