(fix) Guard against DB connection leaks
- Use connection contexts to ensure database connections are released back to the pool. - Make database connection pool size configurable, defaults to 15 Change-Id: Id8011fbf45a1b3c87835ff5f47ebfa9334488319
This commit is contained in:
parent
fec226fc66
commit
7e7739ce2c
@ -91,6 +91,10 @@ class DrydockConfig(object):
|
||||
cfg.StrOpt(
|
||||
'database_connect_string',
|
||||
help='The URI database connect string.'),
|
||||
cfg.IntOpt(
|
||||
'pool_size',
|
||||
default=15,
|
||||
help='The SQLalchemy database connection pool size.'),
|
||||
]
|
||||
|
||||
# Options for the boot action framework
|
||||
|
@ -42,7 +42,8 @@ class DrydockState(object):
|
||||
def connect_db(self):
|
||||
"""Connect the state manager to the persistent DB."""
|
||||
self.db_engine = create_engine(
|
||||
config.config_mgr.conf.database.database_connect_string)
|
||||
config.config_mgr.conf.database.database_connect_string,
|
||||
pool_size=config.config_mgr.conf.database.pool_size)
|
||||
self.db_metadata = MetaData(bind=self.db_engine)
|
||||
|
||||
self.tasks_tbl = tables.Tasks(self.db_metadata)
|
||||
@ -67,12 +68,11 @@ class DrydockState(object):
|
||||
'build_data',
|
||||
]
|
||||
|
||||
conn = self.db_engine.connect()
|
||||
for t in table_names:
|
||||
query_text = sql.text(
|
||||
"TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
|
||||
conn.execute(query_text)
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
for t in table_names:
|
||||
query_text = sql.text(
|
||||
"TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
|
||||
conn.execute(query_text)
|
||||
|
||||
def get_design_documents(self, design_ref):
|
||||
return ReferenceResolver.resolve_reference(design_ref)
|
||||
@ -80,19 +80,17 @@ class DrydockState(object):
|
||||
def get_tasks(self):
|
||||
"""Get all tasks in the database."""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = sql.select([self.tasks_tbl])
|
||||
rs = conn.execute(query)
|
||||
with self.db_engine.connect() as conn:
|
||||
query = sql.select([self.tasks_tbl])
|
||||
rs = conn.execute(query)
|
||||
|
||||
task_list = [objects.Task.from_db(dict(r)) for r in rs]
|
||||
task_list = [objects.Task.from_db(dict(r)) for r in rs]
|
||||
|
||||
self._assemble_tasks(task_list=task_list)
|
||||
self._assemble_tasks(task_list=task_list)
|
||||
|
||||
# add reference to this state manager to each task
|
||||
for t in task_list:
|
||||
t.statemgr = self
|
||||
|
||||
conn.close()
|
||||
# add reference to this state manager to each task
|
||||
for t in task_list:
|
||||
t.statemgr = self
|
||||
|
||||
return task_list
|
||||
except Exception as ex:
|
||||
@ -164,25 +162,24 @@ class DrydockState(object):
|
||||
:param allowed_actions: list of string action names
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
if allowed_actions is None:
|
||||
query = self.tasks_tbl.select().where(
|
||||
self.tasks_tbl.c.status ==
|
||||
hd_fields.TaskStatus.Queued).order_by(
|
||||
self.tasks_tbl.c.created.asc())
|
||||
rs = conn.execute(query)
|
||||
else:
|
||||
query = sql.text("SELECT * FROM tasks WHERE "
|
||||
"status = :queued_status AND "
|
||||
"action = ANY(:actions) "
|
||||
"ORDER BY created ASC")
|
||||
rs = conn.execute(
|
||||
query,
|
||||
queued_status=hd_fields.TaskStatus.Queued,
|
||||
actions=allowed_actions)
|
||||
with self.db_engine.connect() as conn:
|
||||
if allowed_actions is None:
|
||||
query = self.tasks_tbl.select().where(
|
||||
self.tasks_tbl.c.status ==
|
||||
hd_fields.TaskStatus.Queued).order_by(
|
||||
self.tasks_tbl.c.created.asc())
|
||||
rs = conn.execute(query)
|
||||
else:
|
||||
query = sql.text("SELECT * FROM tasks WHERE "
|
||||
"status = :queued_status AND "
|
||||
"action = ANY(:actions) "
|
||||
"ORDER BY created ASC")
|
||||
rs = conn.execute(
|
||||
query,
|
||||
queued_status=hd_fields.TaskStatus.Queued,
|
||||
actions=allowed_actions)
|
||||
|
||||
r = rs.first()
|
||||
conn.close()
|
||||
r = rs.first()
|
||||
|
||||
if r is not None:
|
||||
task = objects.Task.from_db(dict(r))
|
||||
@ -203,12 +200,11 @@ class DrydockState(object):
|
||||
:param task_id: uuid.UUID of a task_id to query against
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.tasks_tbl.select().where(
|
||||
self.tasks_tbl.c.task_id == task_id.bytes)
|
||||
rs = conn.execute(query)
|
||||
|
||||
r = rs.fetchone()
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.tasks_tbl.select().where(
|
||||
self.tasks_tbl.c.task_id == task_id.bytes)
|
||||
rs = conn.execute(query)
|
||||
r = rs.fetchone()
|
||||
|
||||
task = objects.Task.from_db(dict(r))
|
||||
|
||||
@ -217,8 +213,6 @@ class DrydockState(object):
|
||||
self._assemble_tasks(task_list=[task])
|
||||
task.statemgr = self
|
||||
|
||||
conn.close()
|
||||
|
||||
return task
|
||||
|
||||
except Exception as ex:
|
||||
@ -234,11 +228,10 @@ class DrydockState(object):
|
||||
:param msg: instance of objects.TaskStatusMessage
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.result_message_tbl.insert().values(
|
||||
task_id=task_id.bytes, **(msg.to_db()))
|
||||
conn.execute(query)
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.result_message_tbl.insert().values(
|
||||
task_id=task_id.bytes, **(msg.to_db()))
|
||||
conn.execute(query)
|
||||
return True
|
||||
except Exception as ex:
|
||||
self.logger.error("Error inserting result message for task %s: %s"
|
||||
@ -253,24 +246,22 @@ class DrydockState(object):
|
||||
if task_list is None:
|
||||
return None
|
||||
|
||||
conn = self.db_engine.connect()
|
||||
query = sql.select([
|
||||
self.result_message_tbl
|
||||
]).where(self.result_message_tbl.c.task_id == sql.bindparam(
|
||||
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
|
||||
query.compile(self.db_engine)
|
||||
with self.db_engine.connect() as conn:
|
||||
query = sql.select([
|
||||
self.result_message_tbl
|
||||
]).where(self.result_message_tbl.c.task_id == sql.bindparam(
|
||||
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
|
||||
query.compile(self.db_engine)
|
||||
|
||||
for t in task_list:
|
||||
rs = conn.execute(query, task_id=t.task_id.bytes)
|
||||
error_count = 0
|
||||
for r in rs:
|
||||
msg = objects.TaskStatusMessage.from_db(dict(r))
|
||||
if msg.error:
|
||||
error_count = error_count + 1
|
||||
t.result.message_list.append(msg)
|
||||
t.result.error_count = error_count
|
||||
|
||||
conn.close()
|
||||
for t in task_list:
|
||||
rs = conn.execute(query, task_id=t.task_id.bytes)
|
||||
error_count = 0
|
||||
for r in rs:
|
||||
msg = objects.TaskStatusMessage.from_db(dict(r))
|
||||
if msg.error:
|
||||
error_count = error_count + 1
|
||||
t.result.message_list.append(msg)
|
||||
t.result.error_count = error_count
|
||||
|
||||
def post_task(self, task):
|
||||
"""Insert a task into the database.
|
||||
@ -280,11 +271,10 @@ class DrydockState(object):
|
||||
:param task: instance of objects.Task to insert into the database.
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.tasks_tbl.insert().values(**(
|
||||
task.to_db(include_id=True)))
|
||||
conn.execute(query)
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.tasks_tbl.insert().values(**(
|
||||
task.to_db(include_id=True)))
|
||||
conn.execute(query)
|
||||
return True
|
||||
except Exception as ex:
|
||||
self.logger.error(
|
||||
@ -297,17 +287,15 @@ class DrydockState(object):
|
||||
:param task: objects.Task instance to reference for update values
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.tasks_tbl.update().where(
|
||||
self.tasks_tbl.c.task_id == task.task_id.bytes).values(**(
|
||||
task.to_db(include_id=False)))
|
||||
rs = conn.execute(query)
|
||||
if rs.rowcount == 1:
|
||||
conn.close()
|
||||
return True
|
||||
else:
|
||||
conn.close()
|
||||
return False
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.tasks_tbl.update().where(
|
||||
self.tasks_tbl.c.task_id == task.task_id.bytes).values(**(
|
||||
task.to_db(include_id=False)))
|
||||
rs = conn.execute(query)
|
||||
if rs.rowcount == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as ex:
|
||||
self.logger.error(
|
||||
"Error updating task %s: %s" % (str(task.task_id), str(ex)))
|
||||
@ -325,17 +313,16 @@ class DrydockState(object):
|
||||
"WHERE task_id = :task_id").execution_options(autocommit=True)
|
||||
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
rs = conn.execute(
|
||||
query_string,
|
||||
new_subtask=subtask_id.bytes,
|
||||
task_id=task_id.bytes)
|
||||
rc = rs.rowcount
|
||||
conn.close()
|
||||
if rc == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
with self.db_engine.connect() as conn:
|
||||
rs = conn.execute(
|
||||
query_string,
|
||||
new_subtask=subtask_id.bytes,
|
||||
task_id=task_id.bytes)
|
||||
rc = rs.rowcount
|
||||
if rc == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as ex:
|
||||
self.logger.error("Error appending subtask %s to task %s: %s" %
|
||||
(str(subtask_id), str(task_id), str(ex)))
|
||||
@ -347,18 +334,17 @@ class DrydockState(object):
|
||||
:param leader_id: uuid.UUID ID of the leader
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.active_instance_tbl.update().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes).values(
|
||||
last_ping=datetime.utcnow())
|
||||
rs = conn.execute(query)
|
||||
rc = rs.rowcount
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.active_instance_tbl.update().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes).values(
|
||||
last_ping=datetime.utcnow())
|
||||
rs = conn.execute(query)
|
||||
rc = rs.rowcount
|
||||
|
||||
if rc == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
if rc == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as ex:
|
||||
self.logger.error("Error maintaining leadership: %s" % str(ex))
|
||||
|
||||
@ -385,13 +371,12 @@ class DrydockState(object):
|
||||
autocommit=True)
|
||||
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
conn.execute(query_string, instance_id=leader_id.bytes)
|
||||
check_query = self.active_instance_tbl.select().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes)
|
||||
rs = conn.execute(check_query)
|
||||
r = rs.fetchone()
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
conn.execute(query_string, instance_id=leader_id.bytes)
|
||||
check_query = self.active_instance_tbl.select().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes)
|
||||
rs = conn.execute(check_query)
|
||||
r = rs.fetchone()
|
||||
if r is not None:
|
||||
return True
|
||||
else:
|
||||
@ -406,12 +391,11 @@ class DrydockState(object):
|
||||
:param leader_id: a uuid.UUID instance identifying the instance giving up leadership
|
||||
"""
|
||||
try:
|
||||
conn = self.db_engine.connect()
|
||||
query = self.active_instance_tbl.delete().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes)
|
||||
rs = conn.execute(query)
|
||||
rc = rs.rowcount
|
||||
conn.close()
|
||||
with self.db_engine.connect() as conn:
|
||||
query = self.active_instance_tbl.delete().where(
|
||||
self.active_instance_tbl.c.identity == leader_id.bytes)
|
||||
rs = conn.execute(query)
|
||||
rc = rs.rowcount
|
||||
|
||||
if rc == 1:
|
||||
return True
|
||||
|
Loading…
Reference in New Issue
Block a user