Merge "db: Re-use a single connection"

This commit is contained in:
Zuul 2023-09-25 14:21:11 +00:00 committed by Gerrit Code Review
commit db05d6c27c

View File

@ -65,35 +65,41 @@ CONF = cfg.CONF
CONF.register_opts(metadata_opts) CONF.register_opts(metadata_opts)
def get_metadef_namespaces_table(meta, engine): def get_metadef_namespaces_table(meta, conn):
return sqlalchemy.Table('metadef_namespaces', meta, autoload_with=engine) with conn.begin():
return sqlalchemy.Table('metadef_namespaces', meta, autoload_with=conn)
def get_metadef_resource_types_table(meta, engine): def get_metadef_resource_types_table(meta, conn):
return sqlalchemy.Table('metadef_resource_types', meta, with conn.begin():
autoload_with=engine) return sqlalchemy.Table('metadef_resource_types', meta,
autoload_with=conn)
def get_metadef_namespace_resource_types_table(meta, engine): def get_metadef_namespace_resource_types_table(meta, conn):
return sqlalchemy.Table('metadef_namespace_resource_types', meta, with conn.begin():
autoload_with=engine) return sqlalchemy.Table('metadef_namespace_resource_types', meta,
autoload_with=conn)
def get_metadef_properties_table(meta, engine): def get_metadef_properties_table(meta, conn):
return sqlalchemy.Table('metadef_properties', meta, autoload_with=engine) with conn.begin():
return sqlalchemy.Table('metadef_properties', meta, autoload_with=conn)
def get_metadef_objects_table(meta, engine): def get_metadef_objects_table(meta, conn):
return sqlalchemy.Table('metadef_objects', meta, autoload_with=engine) with conn.begin():
return sqlalchemy.Table('metadef_objects', meta, autoload_with=conn)
def get_metadef_tags_table(meta, engine): def get_metadef_tags_table(meta, conn):
return sqlalchemy.Table('metadef_tags', meta, autoload_with=engine) with conn.begin():
return sqlalchemy.Table('metadef_tags', meta, autoload_with=conn)
def _get_resource_type_id(meta, engine, name): def _get_resource_type_id(meta, conn, name):
rt_table = get_metadef_resource_types_table(meta, engine) rt_table = get_metadef_resource_types_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
resource_type = conn.execute( resource_type = conn.execute(
select(rt_table.c.id).where( select(rt_table.c.id).where(
rt_table.c.name == name rt_table.c.name == name
@ -104,9 +110,9 @@ def _get_resource_type_id(meta, engine, name):
return None return None
def _get_resource_type(meta, engine, resource_type_id): def _get_resource_type(meta, conn, resource_type_id):
rt_table = get_metadef_resource_types_table(meta, engine) rt_table = get_metadef_resource_types_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
rt_table.select().where( rt_table.select().where(
rt_table.c.id == resource_type_id rt_table.c.id == resource_type_id
@ -114,10 +120,10 @@ def _get_resource_type(meta, engine, resource_type_id):
).fetchone() ).fetchone()
def _get_namespace_resource_types(meta, engine, namespace_id): def _get_namespace_resource_types(meta, conn, namespace_id):
namespace_resource_types_table = ( namespace_resource_types_table = (
get_metadef_namespace_resource_types_table(meta, engine)) get_metadef_namespace_resource_types_table(meta, conn))
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
namespace_resource_types_table.select().where( namespace_resource_types_table.select().where(
namespace_resource_types_table.c.namespace_id == namespace_id namespace_resource_types_table.c.namespace_id == namespace_id
@ -125,10 +131,10 @@ def _get_namespace_resource_types(meta, engine, namespace_id):
).fetchall() ).fetchall()
def _get_namespace_resource_type_by_ids(meta, engine, namespace_id, rt_id): def _get_namespace_resource_type_by_ids(meta, conn, namespace_id, rt_id):
namespace_resource_types_table = ( namespace_resource_types_table = (
get_metadef_namespace_resource_types_table(meta, engine)) get_metadef_namespace_resource_types_table(meta, conn))
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
namespace_resource_types_table.select().where(and_( namespace_resource_types_table.select().where(and_(
namespace_resource_types_table.c.namespace_id == namespace_id, namespace_resource_types_table.c.namespace_id == namespace_id,
@ -137,9 +143,9 @@ def _get_namespace_resource_type_by_ids(meta, engine, namespace_id, rt_id):
).fetchone() ).fetchone()
def _get_properties(meta, engine, namespace_id): def _get_properties(meta, conn, namespace_id):
properties_table = get_metadef_properties_table(meta, engine) properties_table = get_metadef_properties_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
properties_table.select().where( properties_table.select().where(
properties_table.c.namespace_id == namespace_id properties_table.c.namespace_id == namespace_id
@ -147,18 +153,18 @@ def _get_properties(meta, engine, namespace_id):
).fetchall() ).fetchall()
def _get_objects(meta, engine, namespace_id): def _get_objects(meta, conn, namespace_id):
objects_table = get_metadef_objects_table(meta, engine) objects_table = get_metadef_objects_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
objects_table.select().where( objects_table.select().where(
objects_table.c.namespace_id == namespace_id) objects_table.c.namespace_id == namespace_id)
).fetchall() ).fetchall()
def _get_tags(meta, engine, namespace_id): def _get_tags(meta, conn, namespace_id):
tags_table = get_metadef_tags_table(meta, engine) tags_table = get_metadef_tags_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
return conn.execute( return conn.execute(
tags_table.select().where( tags_table.select().where(
tags_table.c.namespace_id == namespace_id tags_table.c.namespace_id == namespace_id
@ -166,8 +172,8 @@ def _get_tags(meta, engine, namespace_id):
).fetchall() ).fetchall()
def _get_resource_id(table, engine, namespace_id, resource_name): def _get_resource_id(table, conn, namespace_id, resource_name):
with engine.connect() as conn, conn.begin(): with conn.begin():
resource = conn.execute( resource = conn.execute(
select(table.c.id).where( select(table.c.id).where(
and_( and_(
@ -181,28 +187,28 @@ def _get_resource_id(table, engine, namespace_id, resource_name):
return None return None
def _clear_metadata(meta, engine): def _clear_metadata(meta, conn):
metadef_tables = [get_metadef_properties_table(meta, engine), metadef_tables = [get_metadef_properties_table(meta, conn),
get_metadef_objects_table(meta, engine), get_metadef_objects_table(meta, conn),
get_metadef_tags_table(meta, engine), get_metadef_tags_table(meta, conn),
get_metadef_namespace_resource_types_table(meta, engine), get_metadef_namespace_resource_types_table(meta, conn),
get_metadef_namespaces_table(meta, engine), get_metadef_namespaces_table(meta, conn),
get_metadef_resource_types_table(meta, engine)] get_metadef_resource_types_table(meta, conn)]
with engine.connect() as conn, conn.begin(): with conn.begin():
for table in metadef_tables: for table in metadef_tables:
conn.execute(table.delete()) conn.execute(table.delete())
LOG.info(_LI("Table %s has been cleared"), table) LOG.info(_LI("Table %s has been cleared"), table)
def _clear_namespace_metadata(meta, engine, namespace_id): def _clear_namespace_metadata(meta, conn, namespace_id):
metadef_tables = [get_metadef_properties_table(meta, engine), metadef_tables = [get_metadef_properties_table(meta, conn),
get_metadef_objects_table(meta, engine), get_metadef_objects_table(meta, conn),
get_metadef_tags_table(meta, engine), get_metadef_tags_table(meta, conn),
get_metadef_namespace_resource_types_table(meta, engine)] get_metadef_namespace_resource_types_table(meta, conn)]
namespaces_table = get_metadef_namespaces_table(meta, engine) namespaces_table = get_metadef_namespaces_table(meta, conn)
with engine.connect() as conn, conn.begin(): with conn.begin():
for table in metadef_tables: for table in metadef_tables:
conn.execute( conn.execute(
table.delete().where(table.c.namespace_id == namespace_id)) table.delete().where(table.c.namespace_id == namespace_id))
@ -212,7 +218,7 @@ def _clear_namespace_metadata(meta, engine, namespace_id):
namespaces_table.c.id == namespace_id)) namespaces_table.c.id == namespace_id))
def _populate_metadata(meta, engine, metadata_path=None, merge=False, def _populate_metadata(meta, conn, metadata_path=None, merge=False,
prefer_new=False, overwrite=False): prefer_new=False, overwrite=False):
if not metadata_path: if not metadata_path:
metadata_path = CONF.metadata_source_path metadata_path = CONF.metadata_source_path
@ -233,13 +239,12 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
metadata_path) metadata_path)
return return
namespaces_table = get_metadef_namespaces_table(meta, engine) namespaces_table = get_metadef_namespaces_table(meta, conn)
namespace_rt_table = get_metadef_namespace_resource_types_table( namespace_rt_table = get_metadef_namespace_resource_types_table(meta, conn)
meta, engine) objects_table = get_metadef_objects_table(meta, conn)
objects_table = get_metadef_objects_table(meta, engine) tags_table = get_metadef_tags_table(meta, conn)
tags_table = get_metadef_tags_table(meta, engine) properties_table = get_metadef_properties_table(meta, conn)
properties_table = get_metadef_properties_table(meta, engine) resource_types_table = get_metadef_resource_types_table(meta, conn)
resource_types_table = get_metadef_resource_types_table(meta, engine)
for json_schema_file in json_schema_files: for json_schema_file in json_schema_files:
try: try:
@ -262,7 +267,7 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
'owner': metadata.get('owner', 'admin') 'owner': metadata.get('owner', 'admin')
} }
with engine.connect() as conn, conn.begin(): with conn.begin():
db_namespace = conn.execute( db_namespace = conn.execute(
select( select(
namespaces_table.c.id namespaces_table.c.id
@ -280,9 +285,9 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
if not db_namespace: if not db_namespace:
values.update({'created_at': timeutils.utcnow()}) values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(engine, namespaces_table, values) _insert_data_to_db(conn, namespaces_table, values)
with engine.connect() as conn, conn.begin(): with conn.begin():
db_namespace = conn.execute( db_namespace = conn.execute(
select( select(
namespaces_table.c.id namespaces_table.c.id
@ -304,16 +309,16 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
namespace_id = db_namespace[0] namespace_id = db_namespace[0]
for resource_type in metadata.get('resource_type_associations', []): for resource_type in metadata.get('resource_type_associations', []):
rt_id = _get_resource_type_id(meta, engine, resource_type['name']) rt_id = _get_resource_type_id(meta, conn, resource_type['name'])
if not rt_id: if not rt_id:
val = { val = {
'name': resource_type['name'], 'name': resource_type['name'],
'created_at': timeutils.utcnow(), 'created_at': timeutils.utcnow(),
'protected': True 'protected': True
} }
_insert_data_to_db(engine, resource_types_table, val) _insert_data_to_db(conn, resource_types_table, val)
rt_id = _get_resource_type_id( rt_id = _get_resource_type_id(
meta, engine, resource_type['name']) meta, conn, resource_type['name'])
elif prefer_new: elif prefer_new:
val = {'updated_at': timeutils.utcnow()} val = {'updated_at': timeutils.utcnow()}
_update_data_in_db(resource_types_table, val, _update_data_in_db(resource_types_table, val,
@ -327,10 +332,10 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
'prefix': resource_type.get('prefix') 'prefix': resource_type.get('prefix')
} }
namespace_resource_type = _get_namespace_resource_type_by_ids( namespace_resource_type = _get_namespace_resource_type_by_ids(
meta, engine, namespace_id, rt_id) meta, conn, namespace_id, rt_id)
if not namespace_resource_type: if not namespace_resource_type:
values.update({'created_at': timeutils.utcnow()}) values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(engine, namespace_rt_table, values) _insert_data_to_db(conn, namespace_rt_table, values)
elif prefer_new: elif prefer_new:
values.update({'updated_at': timeutils.utcnow()}) values.update({'updated_at': timeutils.utcnow()})
_update_rt_association(namespace_rt_table, values, _update_rt_association(namespace_rt_table, values,
@ -343,11 +348,11 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
'json_schema': json.dumps(schema) 'json_schema': json.dumps(schema)
} }
property_id = _get_resource_id( property_id = _get_resource_id(
properties_table, engine, namespace_id, name, properties_table, conn, namespace_id, name,
) )
if not property_id: if not property_id:
values.update({'created_at': timeutils.utcnow()}) values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(engine, properties_table, values) _insert_data_to_db(conn, properties_table, values)
elif prefer_new: elif prefer_new:
values.update({'updated_at': timeutils.utcnow()}) values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(properties_table, values, _update_data_in_db(properties_table, values,
@ -361,11 +366,11 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
'json_schema': json.dumps( 'json_schema': json.dumps(
object.get('properties')) object.get('properties'))
} }
object_id = _get_resource_id(objects_table, engine, namespace_id, object_id = _get_resource_id(objects_table, conn, namespace_id,
object['name']) object['name'])
if not object_id: if not object_id:
values.update({'created_at': timeutils.utcnow()}) values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(engine, objects_table, values) _insert_data_to_db(conn, objects_table, values)
elif prefer_new: elif prefer_new:
values.update({'updated_at': timeutils.utcnow()}) values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(objects_table, values, _update_data_in_db(objects_table, values,
@ -377,10 +382,10 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
'namespace_id': namespace_id, 'namespace_id': namespace_id,
} }
tag_id = _get_resource_id( tag_id = _get_resource_id(
tags_table, engine, namespace_id, tag['name']) tags_table, conn, namespace_id, tag['name'])
if not tag_id: if not tag_id:
values.update({'created_at': timeutils.utcnow()}) values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(engine, tags_table, values) _insert_data_to_db(conn, tags_table, values)
elif prefer_new: elif prefer_new:
values.update({'updated_at': timeutils.utcnow()}) values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(tags_table, values, _update_data_in_db(tags_table, values,
@ -391,18 +396,18 @@ def _populate_metadata(meta, engine, metadata_path=None, merge=False,
LOG.info(_LI("Metadata loading finished")) LOG.info(_LI("Metadata loading finished"))
def _insert_data_to_db(engine, table, values, log_exception=True): def _insert_data_to_db(conn, table, values, log_exception=True):
try: try:
with engine.connect() as conn, conn.begin(): with conn.begin():
conn.execute(table.insert().values(values)) conn.execute(table.insert().values(values))
except sqlalchemy.exc.IntegrityError: except sqlalchemy.exc.IntegrityError:
if log_exception: if log_exception:
LOG.warning(_LW("Duplicate entry for values: %s"), values) LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _update_data_in_db(engine, table, values, column, value): def _update_data_in_db(conn, table, values, column, value):
try: try:
with engine.connect() as conn, conn.begin(): with conn.begin():
conn.execute( conn.execute(
table.update().values(values).where(column == value) table.update().values(values).where(column == value)
) )
@ -410,9 +415,9 @@ def _update_data_in_db(engine, table, values, column, value):
LOG.warning(_LW("Duplicate entry for values: %s"), values) LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _update_rt_association(engine, table, values, rt_id, namespace_id): def _update_rt_association(conn, table, values, rt_id, namespace_id):
try: try:
with engine.connect() as conn, conn.begin(): with conn.begin():
conn.execute( conn.execute(
table.update().values(values).where( table.update().values(values).where(
and_( and_(
@ -425,12 +430,12 @@ def _update_rt_association(engine, table, values, rt_id, namespace_id):
LOG.warning(_LW("Duplicate entry for values: %s"), values) LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _export_data_to_file(meta, engine, path): def _export_data_to_file(meta, conn, path):
if not path: if not path:
path = CONF.metadata_source_path path = CONF.metadata_source_path
namespace_table = get_metadef_namespaces_table(meta) namespace_table = get_metadef_namespaces_table(meta)
with engine.connect() as conn, conn.begin(): with conn.begin():
namespaces = conn.execute(namespace_table.select()).fetchall() namespaces = conn.execute(namespace_table.select()).fetchall()
pattern = re.compile(r'[\W_]+', re.UNICODE) pattern = re.compile(r'[\W_]+', re.UNICODE)
@ -452,15 +457,15 @@ def _export_data_to_file(meta, engine, path):
} }
namespace_resource_types = _get_namespace_resource_types( namespace_resource_types = _get_namespace_resource_types(
meta, engine, namespace_id) meta, conn, namespace_id)
db_objects = _get_objects(meta, engine, namespace_id) db_objects = _get_objects(meta, conn, namespace_id)
db_properties = _get_properties(meta, engine, namespace_id) db_properties = _get_properties(meta, conn, namespace_id)
db_tags = _get_tags(meta, engine, namespace_id) db_tags = _get_tags(meta, conn, namespace_id)
resource_types = [] resource_types = []
for namespace_resource_type in namespace_resource_types: for namespace_resource_type in namespace_resource_types:
resource_type = _get_resource_type( resource_type = _get_resource_type(
meta, engine, namespace_resource_type['resource_type_id']) meta, conn, namespace_resource_type['resource_type_id'])
resource_types.append({ resource_types.append({
'name': resource_type['name'], 'name': resource_type['name'],
'prefix': namespace_resource_type['prefix'], 'prefix': namespace_resource_type['prefix'],
@ -526,17 +531,20 @@ def db_load_metadefs(engine, metadata_path=None, merge=False,
"--prefer_new, --overwrite")) "--prefer_new, --overwrite"))
return return
_populate_metadata( with engine.connect() as conn:
meta, engine, metadata_path, merge, prefer_new, overwrite) _populate_metadata(
meta, conn, metadata_path, merge, prefer_new, overwrite)
def db_unload_metadefs(engine): def db_unload_metadefs(engine):
meta = MetaData() meta = MetaData()
_clear_metadata(meta, engine) with engine.connect() as conn:
_clear_metadata(meta, conn)
def db_export_metadefs(engine, metadata_path=None): def db_export_metadefs(engine, metadata_path=None):
meta = MetaData() meta = MetaData()
_export_data_to_file(meta, engine, metadata_path) with engine.connect() as conn:
_export_data_to_file(meta, conn, metadata_path)