5f1fbbee3c
Updates Deckhand to use alembic to manage database upgrades. Moves from creating tables at startup of Deckhand to the db-sync job. Change-Id: I6f4cb237fadc46fbee81d1c33096f48a720f589f
209 lines
8.9 KiB
Python
209 lines
8.9 KiB
Python
"""initial deckhand base
|
|
|
|
Revision ID: 918bbfd28185
|
|
Revises:
|
|
Create Date: 2018-04-04 17:19:24.222703
|
|
|
|
"""
|
|
import logging
|
|
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.dialects import postgresql
|
|
from sqlalchemy.sql import text
|
|
|
|
# revision identifiers, used by Alembic.
|
|
revision = '918bbfd28185'
|
|
down_revision = None
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
LOG = logging.getLogger('alembic.runtime.migration')
|
|
|
|
tables_select = text("""
|
|
select table_name from information_schema.tables where table_schema = 'public'
|
|
and table_name in ('buckets', 'revisions','documents', 'revision_tags',
|
|
'validations')
|
|
""")
|
|
|
|
check_documents_columns = text("""
|
|
select column_name from information_schema.columns
|
|
where table_name = 'documents' and column_name in ('_metadata', 'layer')
|
|
""")
|
|
|
|
get_constraints = text("""
|
|
select conname from pg_constraint
|
|
""")
|
|
|
|
convert_layer = text("""
|
|
update documents d1 set layer = (
|
|
select meta->'layeringDefinition'->>'layer' from documents d2
|
|
where d2.id = d1.id)
|
|
""")
|
|
|
|
|
|
def upgrade():
|
|
|
|
# Need to check if the tables exist first.
|
|
# If they do, then we can't create them, and rather need to:
|
|
# check if documents has _metadata or meta column
|
|
# rename to meta if it does.
|
|
# check if documents.layer exists
|
|
# if not, add it and populate it from
|
|
# metadata.layeringDefinition.layer in the associated document
|
|
# If the tables don't exist it is a new environment; create tables.
|
|
#
|
|
# Note that this is not fool-proof, if we have environments that are
|
|
# not in a state accounted for in this migration, the migration will fail
|
|
# This is easist and best if this first migration is starting from an
|
|
# empty database.
|
|
#
|
|
# IMPORTANT Note:
|
|
# It is irregular for migrations to conditionally apply changes.
|
|
# Migraitons are generally straightforward applicaiton of changes -- e.g.
|
|
# crate tables, drop columns, etc...
|
|
# Do not model future migrations after this migration, which is specially
|
|
# crafted to coerce non-Alembic manageed databases into an Alembic-managed
|
|
# form.
|
|
|
|
conn = op.get_bind()
|
|
LOG.info("Finding tables with query: %s", tables_select)
|
|
rs = conn.execute(tables_select)
|
|
existing_tables = [row[0] for row in rs]
|
|
LOG.info("Existing tables: %s", str(existing_tables))
|
|
|
|
if 'buckets' not in existing_tables:
|
|
LOG.info("'buckets' not present. Creating table")
|
|
op.create_table('buckets',
|
|
sa.Column('created_at', sa.DateTime(), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted', sa.Boolean(), nullable=False),
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('name', sa.String(length=36), nullable=True),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
sa.UniqueConstraint('name'),
|
|
mysql_charset='utf8',
|
|
mysql_engine='Postgre'
|
|
)
|
|
|
|
if 'revisions' not in existing_tables:
|
|
LOG.info("'revisions' not present. Creating table")
|
|
op.create_table('revisions',
|
|
sa.Column('created_at', sa.DateTime(), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted', sa.Boolean(), nullable=False),
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
mysql_charset='utf8',
|
|
mysql_engine='Postgre'
|
|
)
|
|
|
|
if 'documents' not in existing_tables:
|
|
LOG.info("'documents' not present. Creating table")
|
|
op.create_table('documents',
|
|
sa.Column('created_at', sa.DateTime(), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted', sa.Boolean(), nullable=False),
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('name', sa.String(length=64), nullable=False),
|
|
sa.Column('schema', sa.String(length=64), nullable=False),
|
|
sa.Column('layer', sa.String(length=64), nullable=True),
|
|
sa.Column('meta', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
|
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
|
|
sa.Column('data_hash', sa.String(), nullable=False),
|
|
sa.Column('metadata_hash', sa.String(), nullable=False),
|
|
sa.Column('bucket_id', sa.Integer(), nullable=False),
|
|
sa.Column('revision_id', sa.Integer(), nullable=False),
|
|
sa.Column('orig_revision_id', sa.Integer(), nullable=True),
|
|
sa.ForeignKeyConstraint(['bucket_id'], ['buckets.id'], ondelete='CASCADE'),
|
|
sa.ForeignKeyConstraint(['orig_revision_id'], ['revisions.id'], ondelete='CASCADE'),
|
|
sa.ForeignKeyConstraint(['revision_id'], ['revisions.id'], ondelete='CASCADE'),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
sa.UniqueConstraint('schema', 'layer', 'name', 'revision_id', name='duplicate_document_constraint')
|
|
)
|
|
else:
|
|
# documents has undergone some changes that need to be accounted for
|
|
# in this migration to ensure a common base.
|
|
LOG.info("Finding columns in 'documents' table with query: %s",
|
|
check_documents_columns)
|
|
rs = conn.execute(check_documents_columns)
|
|
columns = [row[0] for row in rs]
|
|
LOG.info("Columns are: %s", str(columns))
|
|
|
|
if '_metadata' in columns:
|
|
LOG.info("Found '_metadata' column; will rename to 'meta'")
|
|
op.alter_column('documents', '_metadata', nullable=False,
|
|
new_column_name='meta')
|
|
LOG.info("'_metadata' renamed to 'meta'")
|
|
if 'layer' not in columns:
|
|
LOG.info("'layer' column is not present. Adding column and"
|
|
" extracting data from meta column")
|
|
|
|
# remove the constraint that is being modified
|
|
rs = conn.execute(get_constraints)
|
|
constraints = [row[0] for row in rs]
|
|
|
|
if 'duplicate_document_constraint' in constraints:
|
|
op.drop_constraint('duplicate_document_constraint',
|
|
'documents')
|
|
|
|
# add the layer column to documents
|
|
op.add_column('documents',
|
|
sa.Column('layer', sa.String(length=64), nullable=True)
|
|
)
|
|
|
|
# convert the data from meta to here.
|
|
conn.execute(convert_layer)
|
|
|
|
# add the constraint back in with the wole set of columns
|
|
op.create_unique_constraint('duplicate_document_constraint',
|
|
'documents', ['schema', 'layer', 'name', 'revision_id']
|
|
)
|
|
LOG.info("'layer' column added and initialized")
|
|
|
|
if 'revision_tags' not in existing_tables:
|
|
LOG.info("'revision_tags' not present. Creating table")
|
|
op.create_table('revision_tags',
|
|
sa.Column('created_at', sa.DateTime(), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted', sa.Boolean(), nullable=False),
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('tag', sa.String(length=64), nullable=False),
|
|
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
|
|
sa.Column('revision_id', sa.Integer(), nullable=False),
|
|
sa.ForeignKeyConstraint(['revision_id'], ['revisions.id'], ondelete='CASCADE'),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
mysql_charset='utf8',
|
|
mysql_engine='Postgre'
|
|
)
|
|
|
|
if 'validations' not in existing_tables:
|
|
LOG.info("'validations' not present. Creating table")
|
|
op.create_table('validations',
|
|
sa.Column('created_at', sa.DateTime(), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted_at', sa.DateTime(), nullable=True),
|
|
sa.Column('deleted', sa.Boolean(), nullable=False),
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('name', sa.String(length=64), nullable=False),
|
|
sa.Column('status', sa.String(length=8), nullable=False),
|
|
sa.Column('validator', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
|
sa.Column('errors', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
|
sa.Column('revision_id', sa.Integer(), nullable=False),
|
|
sa.ForeignKeyConstraint(['revision_id'], ['revisions.id'], ondelete='CASCADE'),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
mysql_charset='utf8',
|
|
mysql_engine='Postgre'
|
|
)
|
|
|
|
def downgrade():
|
|
op.drop_table('validations')
|
|
op.drop_table('revision_tags')
|
|
op.drop_table('documents')
|
|
op.drop_table('revisions')
|
|
op.drop_table('buckets')
|