Merge "Fixed bug in InsertFromSelect columns order"

This commit is contained in:
Jenkins 2015-04-01 11:07:36 +00:00 committed by Gerrit Code Review
commit f749ad4362
2 changed files with 105 additions and 13 deletions

View File

@ -30,14 +30,12 @@ from sqlalchemy import Column
from sqlalchemy.engine import Connectable
from sqlalchemy.engine import reflection
from sqlalchemy.engine import url as sa_url
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import text
from sqlalchemy import String
from sqlalchemy import Table
@ -358,19 +356,40 @@ def get_table(engine, name):
return Table(name, metadata, autoload=True)
class InsertFromSelect(UpdateBase):
"""Form the base for `INSERT INTO table (SELECT ... )` statement."""
def __init__(self, table, select):
self.table = table
self.select = select
class InsertFromSelect(object):
"""Form the base for `INSERT INTO table (SELECT ... )` statement.
DEPRECATED: this class is deprecated and will be removed from oslo.db
in a few releases. Use default SQLAlchemy insert from select implementation
instead
@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
"""Form the `INSERT INTO table (SELECT ... )` statement."""
return "INSERT INTO %s %s" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.select))
:param table: table to insert records
:param select: select query
:param cols: list of columns to specify in insert clause
:return: SQLAlchemy :class:`Insert` object instance
Usage:
.. code-block:: python
select = sql.select(table_from)
insert = InsertFromSelect(table_to, select,
['id', 'name', 'insert_date'])
engine.execute(insert)
"""
# NOTE(tdurakov): Insert from select implementation added to SQLAlchemy
# starting from version 0.8.7. Default SQLAlchemy implementation should be
# used instead of this. Deprecated.
def __new__(cls, table, select, cols=None):
if not cols:
cols = [c.name for c in table.c]
return table.insert(inline=True).from_select(cols, select)
def __init__(self, table, select, cols=None):
pass
def _get_not_supported_column(col_name_col_instance, column_name):

View File

@ -509,6 +509,79 @@ class TestMigrationUtils(db_test_base.DbTestCase):
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
def test_insert_from_select_with_specified_columns(self):
insert_table_name = "__test_insert_to_table__"
select_table_name = "__test_select_from_table__"
uuidstrs = []
for unused in range(10):
uuidstrs.append(uuid.uuid4().hex)
insert_table = Table(
insert_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
select_table = Table(
select_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
insert_table.create()
select_table.create()
# Add 10 rows to select_table
for uuidstr in uuidstrs:
ins_stmt = select_table.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
# Select 4 rows in one chunk from select_table
column = select_table.c.id
query_insert = select([select_table],
select_table.c.id < 5).order_by(column)
insert_statement = utils.InsertFromSelect(insert_table,
query_insert, ['id', 'uuid'])
result_insert = self.conn.execute(insert_statement)
# Verify we insert 4 rows
self.assertEqual(result_insert.rowcount, 4)
query_all = select([insert_table]).where(
insert_table.c.uuid.in_(uuidstrs))
rows = self.conn.execute(query_all).fetchall()
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
def test_insert_from_select_with_specified_columns_negative(self):
insert_table_name = "__test_insert_to_table__"
select_table_name = "__test_select_from_table__"
uuidstrs = []
for unused in range(10):
uuidstrs.append(uuid.uuid4().hex)
insert_table = Table(
insert_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
select_table = Table(
select_table_name, self.meta,
Column('id', Integer, primary_key=True,
nullable=False, autoincrement=True),
Column('uuid', String(36), nullable=False))
insert_table.create()
select_table.create()
# Add 10 rows to select_table
for uuidstr in uuidstrs:
ins_stmt = select_table.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
# Select 4 rows in one chunk from select_table
column = select_table.c.id
query_insert = select([select_table],
select_table.c.id < 5).order_by(column)
insert_statement = utils.InsertFromSelect(insert_table,
query_insert, ['uuid', 'id'])
self.assertRaises(exception.DBError, self.conn.execute,
insert_statement)
class PostgesqlTestMigrations(TestMigrationUtils,
db_test_base.PostgreSQLOpportunisticTestCase):