Fixed bug in InsertFromSelect columns order
Previous realization generates query string like: `INSERT INTO tbl1 SELECT tbl2.ID, tbl2.name, tbl2.currdate FROM tbl2;`. It doesn't specify columns in INSERT clause. If tbl1 columns differs from tbl2, execution of insert fails, or silently insert data in incorrect columns. To fix this implementation of class changed to default SQLAlchemy insert from select. added extra parameter 'columns', which allows to determine column order in INSERT clause. Closes-Bug:#1430884 Change-Id: Ica5cdea10d9aa253f395ec553559dba54e062028
This commit is contained in:
parent
700a01bc77
commit
74b539baba
@ -30,14 +30,12 @@ from sqlalchemy import Column
|
||||
from sqlalchemy.engine import Connectable
|
||||
from sqlalchemy.engine import reflection
|
||||
from sqlalchemy.engine import url as sa_url
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import Index
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy.sql.expression import UpdateBase
|
||||
from sqlalchemy.sql import text
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import Table
|
||||
@ -346,19 +344,40 @@ def get_table(engine, name):
|
||||
return Table(name, metadata, autoload=True)
|
||||
|
||||
|
||||
class InsertFromSelect(UpdateBase):
|
||||
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
|
||||
def __init__(self, table, select):
|
||||
self.table = table
|
||||
self.select = select
|
||||
class InsertFromSelect(object):
|
||||
"""Form the base for `INSERT INTO table (SELECT ... )` statement.
|
||||
|
||||
DEPRECATED: this class is deprecated and will be removed from oslo.db
|
||||
in a few releases. Use default SQLAlchemy insert from select implementation
|
||||
instead
|
||||
|
||||
@compiles(InsertFromSelect)
|
||||
def visit_insert_from_select(element, compiler, **kw):
|
||||
"""Form the `INSERT INTO table (SELECT ... )` statement."""
|
||||
return "INSERT INTO %s %s" % (
|
||||
compiler.process(element.table, asfrom=True),
|
||||
compiler.process(element.select))
|
||||
:param table: table to insert records
|
||||
:param select: select query
|
||||
:param cols: list of columns to specify in insert clause
|
||||
:return: SQLAlchemy :class:`Insert` object instance
|
||||
|
||||
Usage:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
select = sql.select(table_from)
|
||||
insert = InsertFromSelect(table_to, select,
|
||||
['id', 'name', 'insert_date'])
|
||||
engine.execute(insert)
|
||||
|
||||
"""
|
||||
# NOTE(tdurakov): Insert from select implementation added to SQLAlchemy
|
||||
# starting from version 0.8.7. Default SQLAlchemy implementation should be
|
||||
# used instead of this. Deprecated.
|
||||
|
||||
def __new__(cls, table, select, cols=None):
|
||||
if not cols:
|
||||
cols = [c.name for c in table.c]
|
||||
|
||||
return table.insert(inline=True).from_select(cols, select)
|
||||
|
||||
def __init__(self, table, select, cols=None):
|
||||
pass
|
||||
|
||||
|
||||
def _get_not_supported_column(col_name_col_instance, column_name):
|
||||
|
@ -508,6 +508,79 @@ class TestMigrationUtils(db_test_base.DbTestCase):
|
||||
# Verify we really have 4 rows in insert_table
|
||||
self.assertEqual(len(rows), 4)
|
||||
|
||||
def test_insert_from_select_with_specified_columns(self):
|
||||
insert_table_name = "__test_insert_to_table__"
|
||||
select_table_name = "__test_select_from_table__"
|
||||
uuidstrs = []
|
||||
for unused in range(10):
|
||||
uuidstrs.append(uuid.uuid4().hex)
|
||||
insert_table = Table(
|
||||
insert_table_name, self.meta,
|
||||
Column('id', Integer, primary_key=True,
|
||||
nullable=False, autoincrement=True),
|
||||
Column('uuid', String(36), nullable=False))
|
||||
select_table = Table(
|
||||
select_table_name, self.meta,
|
||||
Column('id', Integer, primary_key=True,
|
||||
nullable=False, autoincrement=True),
|
||||
Column('uuid', String(36), nullable=False))
|
||||
|
||||
insert_table.create()
|
||||
select_table.create()
|
||||
# Add 10 rows to select_table
|
||||
for uuidstr in uuidstrs:
|
||||
ins_stmt = select_table.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(ins_stmt)
|
||||
|
||||
# Select 4 rows in one chunk from select_table
|
||||
column = select_table.c.id
|
||||
query_insert = select([select_table],
|
||||
select_table.c.id < 5).order_by(column)
|
||||
insert_statement = utils.InsertFromSelect(insert_table,
|
||||
query_insert, ['id', 'uuid'])
|
||||
result_insert = self.conn.execute(insert_statement)
|
||||
# Verify we insert 4 rows
|
||||
self.assertEqual(result_insert.rowcount, 4)
|
||||
|
||||
query_all = select([insert_table]).where(
|
||||
insert_table.c.uuid.in_(uuidstrs))
|
||||
rows = self.conn.execute(query_all).fetchall()
|
||||
# Verify we really have 4 rows in insert_table
|
||||
self.assertEqual(len(rows), 4)
|
||||
|
||||
def test_insert_from_select_with_specified_columns_negative(self):
|
||||
insert_table_name = "__test_insert_to_table__"
|
||||
select_table_name = "__test_select_from_table__"
|
||||
uuidstrs = []
|
||||
for unused in range(10):
|
||||
uuidstrs.append(uuid.uuid4().hex)
|
||||
insert_table = Table(
|
||||
insert_table_name, self.meta,
|
||||
Column('id', Integer, primary_key=True,
|
||||
nullable=False, autoincrement=True),
|
||||
Column('uuid', String(36), nullable=False))
|
||||
select_table = Table(
|
||||
select_table_name, self.meta,
|
||||
Column('id', Integer, primary_key=True,
|
||||
nullable=False, autoincrement=True),
|
||||
Column('uuid', String(36), nullable=False))
|
||||
|
||||
insert_table.create()
|
||||
select_table.create()
|
||||
# Add 10 rows to select_table
|
||||
for uuidstr in uuidstrs:
|
||||
ins_stmt = select_table.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(ins_stmt)
|
||||
|
||||
# Select 4 rows in one chunk from select_table
|
||||
column = select_table.c.id
|
||||
query_insert = select([select_table],
|
||||
select_table.c.id < 5).order_by(column)
|
||||
insert_statement = utils.InsertFromSelect(insert_table,
|
||||
query_insert, ['uuid', 'id'])
|
||||
self.assertRaises(exception.DBError, self.conn.execute,
|
||||
insert_statement)
|
||||
|
||||
|
||||
class PostgesqlTestMigrations(TestMigrationUtils,
|
||||
db_test_base.PostgreSQLOpportunisticTestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user