diff --git a/glance/cmd/manage.py b/glance/cmd/manage.py
index 79620db93d..420b4f5a1b 100755
--- a/glance/cmd/manage.py
+++ b/glance/cmd/manage.py
@@ -29,6 +29,7 @@ from __future__ import print_function
 
 import os
 import sys
+import time
 
 # If ../glance/__init__.py exists, add ../ to Python search path, so that
 # it will override what happens to be installed in /usr/(local/)lib/python...
@@ -46,6 +47,7 @@ import six
 
 from glance.common import config
 from glance.common import exception
+from glance import context
 from glance.db import migration as db_migration
 from glance.db.sqlalchemy import api as db_api
 from glance.db.sqlalchemy import metadata
@@ -145,6 +147,26 @@ class DbCommands(object):
         metadata.db_export_metadefs(db_api.get_engine(),
                                     path)
 
+    @args('--age_in_days', type=int,
+          help='Purge deleted rows older than age in days')
+    @args('--max_rows', type=int,
+          help='Limit number of records to delete')
+    def purge(self, age_in_days=30, max_rows=100):
+        """Purge deleted rows older than a given age from glance tables."""
+        age_in_days = int(age_in_days)
+        max_rows = int(max_rows)
+        if age_in_days <= 0:
+            print(_("Must supply a positive, non-zero value for age."))
+            exit(1)
+        if age_in_days >= (int(time.time()) / 86400):
+            print(_("Maximal age is count of days since epoch."))
+            exit(1)
+        if max_rows < 1:
+            print(_("Minimal rows limit is 1."))
+            exit(1)
+        ctx = context.get_admin_context(show_deleted=True)
+        db_api.purge_deleted_rows(ctx, age_in_days, max_rows)
+
 
 class DbLegacyCommands(object):
     """Class for managing the db using legacy commands"""
diff --git a/glance/context.py b/glance/context.py
index b289d85206..2491f39360 100644
--- a/glance/context.py
+++ b/glance/context.py
@@ -58,3 +58,12 @@ class RequestContext(context.RequestContext):
     def can_see_deleted(self):
         """Admins can see deleted by default"""
         return self.show_deleted or self.is_admin
+
+
+def get_admin_context(show_deleted=False):
+    """Create an administrator context."""
+    return RequestContext(auth_token=None,
+                          tenant=None,
+                          is_admin=True,
+                          show_deleted=show_deleted,
+                          overwrite=False)
diff --git a/glance/db/sqlalchemy/api.py b/glance/db/sqlalchemy/api.py
index 2092dcf11c..30c5758232 100644
--- a/glance/db/sqlalchemy/api.py
+++ b/glance/db/sqlalchemy/api.py
@@ -21,6 +21,7 @@
 
 """Defines interface for DB access."""
 
+import datetime
 import threading
 
 from oslo_config import cfg
@@ -33,6 +34,7 @@ import six
 # NOTE(jokke): simplified transition to py3, behaves like py2 xrange
 from six.moves import range
 import sqlalchemy
+from sqlalchemy import MetaData, Table, select
 import sqlalchemy.orm as sa_orm
 import sqlalchemy.sql as sa_sql
 
@@ -50,7 +52,7 @@ from glance.db.sqlalchemy.metadef_api import object as metadef_object_api
 from glance.db.sqlalchemy.metadef_api import property as metadef_property_api
 from glance.db.sqlalchemy.metadef_api import tag as metadef_tag_api
 from glance.db.sqlalchemy import models
-from glance.i18n import _, _LW
+from glance.i18n import _, _LW, _LE, _LI
 
 BASE = models.BASE
 sa_logger = None
@@ -1226,6 +1228,69 @@ def image_tag_get_all(context, image_id, session=None):
     return [tag[0] for tag in tags]
 
 
+def purge_deleted_rows(context, age_in_days, max_rows, session=None):
+    """Purges soft deleted rows
+
+    Deletes rows of table images, table tasks and all dependent tables
+    according to given age for relevant models.
+    """
+    try:
+        age_in_days = int(age_in_days)
+    except ValueError:
+        LOG.exception(_LE('Invalid value for age, %(age)d'),
+                      {'age': age_in_days})
+        raise exception.InvalidParameterValue(value=age_in_days,
+                                              param='age_in_days')
+    try:
+        max_rows = int(max_rows)
+    except ValueError:
+        LOG.exception(_LE('Invalid value for max_rows, %(max_rows)d'),
+                      {'max_rows': max_rows})
+        raise exception.InvalidParameterValue(value=max_rows,
+                                              param='max_rows')
+
+    session = session or get_session()
+    metadata = MetaData(get_engine())
+    deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days)
+
+    tables = []
+    for model_class in models.__dict__.values():
+        if not hasattr(model_class, '__tablename__'):
+            continue
+        if hasattr(model_class, 'deleted'):
+            tables.append(model_class.__tablename__)
+    # get rid of FX constraints
+    for tbl in ('images', 'tasks'):
+        try:
+            tables.remove(tbl)
+        except ValueError:
+            LOG.warning(_LW('Expected table %(tbl)s was not found in DB.'),
+                        **locals())
+        else:
+            tables.append(tbl)
+
+    for tbl in tables:
+        tab = Table(tbl, metadata, autoload=True)
+        LOG.info(
+            _LI('Purging deleted rows older than %(age_in_days)d day(s) '
+                'from table %(tbl)s'),
+            **locals()
+        )
+        with session.begin():
+            result = session.execute(
+                tab.delete().where(
+                    tab.columns.id.in_(
+                        select([tab.columns.id]).where(
+                            tab.columns.deleted_at < deleted_age
+                        ).limit(max_rows)
+                    )
+                )
+            )
+        rows = result.rowcount
+        LOG.info(_LI('Deleted %(rows)d row(s) from table %(tbl)s'),
+                 **locals())
+
+
 def user_get_storage_usage(context, owner_id, image_id=None, session=None):
     _check_image_id(image_id)
     session = session or get_session()
@@ -1452,7 +1517,8 @@ def _task_get(context, task_id, session=None, force_show_deleted=False):
 
 def _task_update(context, task_ref, values, session=None):
     """Apply supplied dictionary of values to a task object."""
-    values["deleted"] = False
+    if 'deleted' not in values:
+        values["deleted"] = False
     task_ref.update(values)
     task_ref.save(session=session)
     return task_ref
diff --git a/glance/tests/functional/db/base.py b/glance/tests/functional/db/base.py
index 04ee1a322a..2ebeedb9f9 100644
--- a/glance/tests/functional/db/base.py
+++ b/glance/tests/functional/db/base.py
@@ -75,7 +75,7 @@ def build_task_fixture(**kwargs):
         'message': None,
         'expires_at': None,
         'created_at': default_datetime,
-        'updated_at': default_datetime
+        'updated_at': default_datetime,
     }
     task.update(kwargs)
     return task
@@ -1795,6 +1795,62 @@ class TaskTests(test_utils.BaseTestCase):
         self.assertIsNotNone(del_task['deleted_at'])
 
 
+class DBPurgeTests(test_utils.BaseTestCase):
+
+    def setUp(self):
+        super(DBPurgeTests, self).setUp()
+        self.adm_context = context.get_admin_context(show_deleted=True)
+        self.db_api = db_tests.get_db(self.config)
+        db_tests.reset_db(self.db_api)
+        self.image_fixtures, self.task_fixtures = self.build_fixtures()
+        self.create_tasks(self.task_fixtures)
+        self.create_images(self.image_fixtures)
+
+    def build_fixtures(self):
+        dt1 = timeutils.utcnow() - datetime.timedelta(days=5)
+        dt2 = dt1 + datetime.timedelta(days=1)
+        dt3 = dt2 + datetime.timedelta(days=1)
+        fixtures = [
+            {
+                'created_at': dt1,
+                'updated_at': dt1,
+                'deleted_at': dt3,
+                'deleted': True,
+            },
+            {
+                'created_at': dt1,
+                'updated_at': dt2,
+                'deleted_at': timeutils.utcnow(),
+                'deleted': True,
+            },
+            {
+                'created_at': dt2,
+                'updated_at': dt2,
+                'deleted_at': None,
+                'deleted': False,
+            },
+        ]
+        return (
+            [build_image_fixture(**fixture) for fixture in fixtures],
+            [build_task_fixture(**fixture) for fixture in fixtures],
+        )
+
+    def create_images(self, images):
+        for fixture in images:
+            self.db_api.image_create(self.adm_context, fixture)
+
+    def create_tasks(self, tasks):
+        for fixture in tasks:
+            self.db_api.task_create(self.adm_context, fixture)
+
+    def test_db_purge(self):
+        self.db_api.purge_deleted_rows(self.adm_context, 1, 5)
+        images = self.db_api.image_get_all(self.adm_context)
+        self.assertEqual(len(images), 2)
+        tasks = self.db_api.task_get_all(self.adm_context)
+        self.assertEqual(len(tasks), 2)
+
+
 class TestVisibility(test_utils.BaseTestCase):
     def setUp(self):
         super(TestVisibility, self).setUp()
diff --git a/glance/tests/functional/db/test_sqlalchemy.py b/glance/tests/functional/db/test_sqlalchemy.py
index 2a134ac8f8..cbb057e3f6 100644
--- a/glance/tests/functional/db/test_sqlalchemy.py
+++ b/glance/tests/functional/db/test_sqlalchemy.py
@@ -160,6 +160,15 @@ class TestSqlAlchemyQuota(base.DriverQuotaTests,
         self.addCleanup(db_tests.reset)
 
 
+class TestDBPurge(base.DBPurgeTests,
+                  base.FunctionalInitWrapper):
+
+    def setUp(self):
+        db_tests.load(get_db, reset_db)
+        super(TestDBPurge, self).setUp()
+        self.addCleanup(db_tests.reset)
+
+
 class TestArtifacts(base_artifacts.ArtifactsTestDriver,
                     base_artifacts.ArtifactTests):
     def setUp(self):