updated changeset tests. whole package is finally PEP8. fixed mysql tests&bugs. updated docs where apropriate. changeset test coverage almost at 100%

This commit is contained in:
iElectric 2009-06-20 22:33:03 +00:00
parent 7eafe744c2
commit cde0f9b52d
16 changed files with 580 additions and 406 deletions

View File

@ -1,9 +1,13 @@
0.5.5
-----
- code coverage is up to 99%
- server_defaults passed to column.create are now issued correctly
- constraints passed to column.create are correctly interpreted (ALTER TABLE ADD CONSTRAINT is issued after ADD COLUMN)
- column.create accepts `primary_key_name`, `unique_name` and `index_name` as string value which is used as contraint name when adding a column
- Constraint classes have cascade=True keyword argument to issue CASCADE drop where supported
- added UniqueConstraint/CheckConstraint and corresponding create/drop methods
- use SQLAlchemy quoting system to avoid name conflicts (for issue 32)
- code coverage is up to 99% with more than 100 tests
- partial refactoring of changeset package
- majoy update to documentation
- dialect support table was added to documentation

View File

@ -31,6 +31,8 @@ Given a standard SQLAlchemy table::
)
table.create()
.. _column-create:
Create a column::
col = Column('col1', String)
@ -39,12 +41,16 @@ Create a column::
# Column is added to table based on its name
assert col is table.c.col1
Drop a column (Not supported by SQLite_)::
.. _column-drop:
Drop a column::
col.drop()
Alter a column (Not supported by SQLite_)::
.. _column-alter:
Alter a column::
col.alter(name='col2')
@ -52,17 +58,19 @@ Alter a column (Not supported by SQLite_)::
assert col is table.c.col2
# Other properties can be modified as well
col.alter(type=String(42),
default="life, the universe, and everything",
nullable=False,
)
col.alter(type=String(42), default="life, the universe, and everything", nullable=False)
# Given another column object, col1.alter(col2), col1 will be changed to match col2
col.alter(Column('col3',String(77),nullable=True))
col.alter(Column('col3', String(77), nullable=True))
assert col.nullable
assert table.c.col3 is col
.. _sqlite: http://www.sqlite.org/lang_altertable.html
.. note::
Since version ``0.5.5`` you can pass primary_key_name, index_name and unique_name to column.create method to issue ALTER TABLE ADD CONSTRAINT after changing the column. Note for multi columns constraints and other advanced configuration, check :ref:`constraint tutorial <constraint-tutorial>`.
.. _table-rename:
Table
=====
@ -76,6 +84,9 @@ Rename a table::
.. _`table create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#creating-and-dropping-database-tables
.. currentmodule:: migrate.changeset.constraint
.. _index-rename:
Index
=====
@ -87,6 +98,9 @@ Rename an index, given an SQLAlchemy ``Index`` object::
.. _`index create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#indexes
.. _constraint-tutorial:
Constraint
==========
@ -106,7 +120,9 @@ The following rundowns are true for all constraints classes:
# Drop the constraint
cons.drop()
or you can pass column objects (and table argument can be left out).
or you can pass in column objects (and table argument can be left out)::
cons = PrimaryKeyConstraint(col1, col2)
3. Some dialects support CASCADE option when dropping constraints::

View File

@ -41,36 +41,36 @@ Download and Development
Dialect support
----------------------------------
---------------
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| Operation / Dialect | :ref:`sqlite <sqlite-d>` | :ref:`postgres <postgres-d>` | :ref:`mysql <mysql-d>` | :ref:`oracle <oracle-d>` | firebird | mssql |
| | | | | | | |
+==========================+==========================+==============================+========================+===========================+==========+=======+
| ALTER TABLE | yes | yes | | | | |
| RENAME TABLE | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| RENAME COLUMN | (workaround) [#1]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| DROP COLUMN | (workaround) [#1]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | yes | yes | | | | |
| ADD COLUMN | (with limitations) [#2]_ | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| ADD CONSTRAINT | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| DROP CONSTRAINT | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| ALTER TABLE | no | yes | | | | |
| ALTER COLUMN | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| RENAME INDEX | no | yes | | | | |
| | | | | | | |
+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| Operation / Dialect | :ref:`sqlite <sqlite-d>` | :ref:`postgres <postgres-d>` | :ref:`mysql <mysql-d>` | :ref:`oracle <oracle-d>` | firebird | mssql |
| | | | | | | |
+=========================================================+==========================+==============================+========================+===========================+==========+=======+
| :ref:`ALTER TABLE RENAME TABLE <table-rename>` | yes | yes | yes | | | |
| | | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE RENAME COLUMN <column-alter>` | yes | yes | yes | | | |
| | (workaround) [#1]_ | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE ADD COLUMN <column-create>` | yes | yes | yes | | | |
| | (with limitations) [#2]_ | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE DROP COLUMN <column-drop` | yes | yes | yes | | | |
| | (workaround) [#1]_ | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE ALTER COLUMN <column-alter>` | no | yes | yes | | | |
| | | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE ADD CONSTRAINT <constraint-tutorial>` | no | yes | yes | | | |
| | | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`ALTER TABLE DROP CONSTRAINT <constraint-tutorial>`| no | yes | yes | | | |
| | | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
| :ref:`RENAME INDEX <index-rename>` | no | yes | no | | | |
| | | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+
.. [#1] Table is renamed to temporary table, new table is created followed by INSERT statements.

View File

@ -6,11 +6,12 @@
"""
import sqlalchemy as sa
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql.compiler import SchemaGenerator, SchemaDropper
from sqlalchemy.schema import (ForeignKeyConstraint,
PrimaryKeyConstraint,
CheckConstraint,
UniqueConstraint)
UniqueConstraint,
Index)
from sqlalchemy.sql.compiler import SchemaGenerator, SchemaDropper
from migrate.changeset import exceptions, constraint
@ -44,28 +45,29 @@ class AlterTableVisitor(SchemaIterator):
self.append('\nALTER TABLE %s ' % self.preparer.format_table(table))
return table
def _pk_constraint(self, table, column, status):
"""Create a primary key constraint from a table, column.
# DEPRECATED: use plain constraints instead
#def _pk_constraint(self, table, column, status):
# """Create a primary key constraint from a table, column.
Status: true if the constraint is being added; false if being dropped
"""
if isinstance(column, basestring):
column = getattr(table.c, name)
# Status: true if the constraint is being added; false if being dropped
# """
# if isinstance(column, basestring):
# column = getattr(table.c, name)
ret = constraint.PrimaryKeyConstraint(*table.primary_key)
if status:
# Created PK
ret.c.append(column)
else:
# Dropped PK
names = [c.name for c in cons.c]
index = names.index(col.name)
del ret.c[index]
# ret = constraint.PrimaryKeyConstraint(*table.primary_key)
# if status:
# # Created PK
# ret.c.append(column)
# else:
# # Dropped PK
# names = [c.name for c in cons.c]
# index = names.index(col.name)
# del ret.c[index]
# Allow explicit PK name assignment
if isinstance(pk, basestring):
ret.name = pk
return ret
# # Allow explicit PK name assignment
# if isinstance(pk, basestring):
# ret.name = pk
# return ret
class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
@ -77,28 +79,36 @@ class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
:param column: column object
:type column: :class:`sqlalchemy.Column` instance
"""
if column.default is not None:
self.traverse_single(column.default)
table = self.start_alter_table(column)
self.append("ADD ")
colspec = self.get_column_specification(column)
self.append(colspec)
self.append(self.get_column_specification(column))
for cons in column.constraints:
self.traverse_single(cons)
self.execute()
# add in foreign keys
if column.foreign_keys:
self.visit_alter_foriegn_keys(column)
# ALTER TABLE STATEMENTS
def visit_alter_foriegn_keys(self, column):
# add indexes and unique constraints
if column.index_name:
ix = Index(column.index_name,
column,
unique=bool(column.index_name or column.index))
ix.create()
elif column.unique_name:
constraint.UniqueConstraint(column, name=column.unique_name).create()
# SA bounds FK constraints to table, add manually
for fk in column.foreign_keys:
self.define_foreign_key(fk.constraint)
def visit_table(self, table):
"""Default table visitor, does nothing.
:param table: table object
:type table: :class:`sqlalchemy.Table` instance
"""
pass
self.add_foreignkey(fk.constraint)
# add primary key constraint if needed
if column.primary_key_name:
cons = constraint.PrimaryKeyConstraint(column, name=column.primary_key_name)
cons.create()
class ANSIColumnDropper(AlterTableVisitor, SchemaDropper):
@ -113,7 +123,7 @@ class ANSIColumnDropper(AlterTableVisitor, SchemaDropper):
:type column: :class:`sqlalchemy.Column`
"""
table = self.start_alter_table(column)
self.append(' DROP COLUMN %s' % self.preparer.format_column(column))
self.append('DROP COLUMN %s' % self.preparer.format_column(column))
self.execute()
@ -159,43 +169,25 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
# are managed by the app, not the db.
self._run_subvisit(delta, self._visit_column_default)
if 'name' in keys:
self._run_subvisit(delta, self._visit_column_name)
self._run_subvisit(delta, self._visit_column_name, start_alter=False)
def _run_subvisit(self, delta, func):
def _run_subvisit(self, delta, func, start_alter=True):
"""Runs visit method based on what needs to be changed on column"""
table = self._to_table(delta.table)
col_name = delta.current_name
if start_alter:
self.start_alter_column(table, col_name)
ret = func(table, col_name, delta)
self.execute()
def _visit_column_foreign_key(self, delta):
table = delta.table
column = getattr(table.c, delta.current_name)
cons = constraint.ForeignKeyConstraint(column, autoload=True)
fk = delta['foreign_key']
if fk:
# For now, cons.columns is limited to one column:
# no multicolumn FKs
column.foreign_key = ForeignKey(*cons.columns)
else:
column_foreign_key = None
cons.drop()
cons.create()
def _visit_column_primary_key(self, delta):
table = delta.table
col = getattr(table.c, delta.current_name)
pk = delta['primary_key']
cons = self._pk_constraint(table, col, pk)
cons.drop()
cons.create()
def _visit_column_nullable(self, table, col_name, delta):
nullable = delta['nullable']
table = self._to_table(table)
def start_alter_column(self, table, col_name):
"""Starts ALTER COLUMN"""
self.start_alter_table(table)
# TODO: use preparer.format_column
self.append("ALTER COLUMN %s " % self.preparer.quote_identifier(col_name))
def _visit_column_nullable(self, table, col_name, delta):
nullable = delta['nullable']
if nullable:
self.append("DROP NOT NULL")
else:
@ -207,9 +199,6 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
# reason
dummy = sa.Column(None, None, server_default=server_default)
default_text = self.get_column_default_string(dummy)
self.start_alter_table(table)
# TODO: use preparer.format_column
self.append("ALTER COLUMN %s " % self.preparer.quote_identifier(col_name))
if default_text is not None:
self.append("SET DEFAULT %s" % default_text)
else:
@ -218,15 +207,10 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
def _visit_column_type(self, table, col_name, delta):
type_ = delta['type']
if not isinstance(type_, sa.types.AbstractType):
# It's the class itself, not an instance... make an
# instance
# It's the class itself, not an instance... make an instance
type_ = type_()
type_text = type_.dialect_impl(self.dialect).get_col_spec()
self.start_alter_table(table)
# TODO: does type need formating?
# TODO: use preparer.format_column
self.append("ALTER COLUMN %s TYPE %s" %
(self.preparer.quote_identifier(col_name), type_text))
self.append("TYPE %s" % type_text)
def _visit_column_name(self, table, col_name, delta):
new_name = delta['name']
@ -292,13 +276,13 @@ class ANSIConstraintGenerator(ANSIConstraintCommon, SchemaGenerator):
if cons.name is not None:
self.append("CONSTRAINT %s " %
self.preparer.format_constraint(cons))
self.append(" CHECK (%s)" % cons.sqltext)
self.append("CHECK (%s)" % cons.sqltext)
self.define_constraint_deferrability(cons)
elif isinstance(cons, UniqueConstraint):
if cons.name is not None:
self.append("CONSTRAINT %s " %
self.preparer.format_constraint(cons))
self.append(" UNIQUE (%s)" % \
self.append("UNIQUE (%s)" % \
(', '.join(self.preparer.quote(c.name, c.quote) for c in cons)))
self.define_constraint_deferrability(cons)
else:
@ -317,7 +301,8 @@ class ANSIConstraintDropper(ANSIConstraintCommon, SchemaDropper):
def _visit_constraint(self, constraint):
self.start_alter_table(constraint)
self.append("DROP CONSTRAINT ")
self.append(self.get_constraint_name(constraint))
constraint.name = self.get_constraint_name(constraint)
self.append(self.preparer.format_constraint(constraint))
if constraint.cascade:
self.append(" CASCADE")
self.execute()

View File

@ -1,7 +1,6 @@
"""
This module defines standalone schema constraint classes.
"""
import sqlalchemy
from sqlalchemy import schema
from migrate.changeset.exceptions import *
@ -142,7 +141,7 @@ class CheckConstraint(ConstraintChangeset, schema.CheckConstraint):
__visit_name__ = 'migrate_check_constraint'
def __init__(self, sqltext, *args, **kwargs):
cols = kwargs.pop('columns', False)
cols = kwargs.pop('columns', [])
if not cols and not kwargs.get('name', False):
raise InvalidConstraintError('You must either set "name"'
'parameter or "columns" to autogenarate it.')
@ -169,6 +168,8 @@ class UniqueConstraint(ConstraintChangeset, schema.UniqueConstraint):
:param table: If columns are passed as strings, this kw is required
:type table: Table instance
:type cols: strings or Column instances
.. versionadded:: 0.5.5
"""
__visit_name__ = 'migrate_unique_constraint'

View File

@ -20,39 +20,28 @@ class MySQLColumnDropper(ansisql.ANSIColumnDropper):
class MySQLSchemaChanger(MySQLSchemaGenerator, ansisql.ANSISchemaChanger):
def visit_column(self, delta):
keys = delta.keys()
if 'type' in keys or 'nullable' in keys or 'name' in keys:
self._run_subvisit(delta, self._visit_column_change)
if 'server_default' in keys:
# Column name might have changed above
col_name = delta.get('name', delta.current_name)
self._run_subvisit(delta, self._visit_column_default,
col_name=col_name)
def _visit_column_change(self, table_name, col_name, delta):
if not hasattr(delta, 'result_column'):
# Mysql needs the whole column definition, not just a lone
# name/type
raise exceptions.NotSupportedError(
"A column object is required to do this")
column = delta.result_column
# needed by get_column_specification
if not column.table:
column.table = delta.table
def visit_column(self, column):
delta = column.delta
table = column.table
colspec = self.get_column_specification(column)
# TODO: we need table formating here
self.start_alter_table(self.preparer.quote(table_name, True))
self.append("CHANGE COLUMN ")
self.append(self.preparer.quote(col_name, True))
self.append(' ')
if not hasattr(delta, 'result_column'):
# Mysql needs the whole column definition, not just a lone name/type
raise exceptions.NotSupportedError(
"A column object must be present in table to alter it")
self.start_alter_table(table)
old_col_name = self.preparer.quote(delta.current_name, column.quote)
self.append("CHANGE COLUMN %s " % old_col_name)
self.append(colspec)
self.execute()
def visit_index(self, param):
# If MySQL can do this, I can't find how
raise exceptions.NotSupportedError("MySQL cannot rename indexes")
class MySQLConstraintGenerator(ansisql.ANSIConstraintGenerator):
pass
@ -67,9 +56,22 @@ class MySQLConstraintDropper(ansisql.ANSIConstraintDropper):
def visit_migrate_foreign_key_constraint(self, constraint):
self.start_alter_table(constraint)
self.append("DROP FOREIGN KEY ")
constraint.name = self.get_constraint_name(constraint)
self.append(self.preparer.format_constraint(constraint))
self.execute()
def visit_migrate_check_constraint(self, *p, **k):
raise exceptions.NotSupportedError("MySQL does not support CHECK"
" constraints, use triggers instead.")
def visit_migrate_unique_constraint(self, constraint, *p, **k):
self.start_alter_table(constraint)
self.append('DROP INDEX ')
constraint.name = self.get_constraint_name(constraint)
self.append(self.preparer.format_constraint(constraint))
self.execute()
class MySQLDialect(ansisql.ANSIDialect):
columngenerator = MySQLColumnGenerator
columndropper = MySQLColumnDropper

View File

@ -1,10 +1,11 @@
"""
Oracle database specific implementations of changeset classes.
"""
import sqlalchemy as sa
from sqlalchemy.databases import oracle as sa_base
from migrate.changeset import ansisql, exceptions
from sqlalchemy.databases import oracle as sa_base
import sqlalchemy as sa
OracleSchemaGenerator = sa_base.OracleSchemaGenerator

View File

@ -3,10 +3,10 @@
.. _`SQLite`: http://www.sqlite.org/
"""
from migrate.changeset import ansisql, exceptions, constraint
from sqlalchemy.databases import sqlite as sa_base
from sqlalchemy import Table, MetaData
#import sqlalchemy as sa
from migrate.changeset import ansisql, exceptions
SQLiteSchemaGenerator = sa_base.SQLiteSchemaGenerator
@ -20,12 +20,13 @@ class SQLiteCommon(object):
class SQLiteHelper(SQLiteCommon):
def visit_column(self, column):
try:
table = self._to_table(column.table)
except:
table = self._to_table(column)
raise
table = self._to_table(column.table)
table_name = self.preparer.format_table(table)
# we remove all constraints, indexes so it doesnt recreate them
table.indexes = set()
table.constraints = set()
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
@ -42,7 +43,7 @@ class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
ansisql.ANSIColumnGenerator):
"""SQLite ColumnGenerator"""
def visit_alter_foriegn_keys(self, column):
def add_foreignkey(self, constraint):
"""Does not support ALTER TABLE ADD FOREIGN KEY"""
self._not_supported("ALTER TABLE ADD CONSTRAINT")
@ -51,7 +52,6 @@ class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
"""SQLite ColumnDropper"""
def _modify_table(self, table, column):
del table.columns[column.name]
columns = ' ,'.join(map(self.preparer.format_column, table.columns))
return 'INSERT INTO %(table_name)s SELECT ' + columns + \
' from migration_tmp'

View File

@ -2,8 +2,10 @@
Module for visitor class mapping.
"""
import sqlalchemy as sa
from migrate.changeset.databases import sqlite, postgres, mysql, oracle
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite, postgres, mysql, oracle
# Map SA dialects to the corresponding Migrate extensions
dialects = {

View File

@ -1,8 +1,6 @@
"""
Schema module providing common schema operations.
"""
import re
import sqlalchemy
from migrate.changeset.databases.visitor import (get_engine_visitor,
@ -101,13 +99,12 @@ def alter_column(*p, **k):
engine = k['engine']
delta = _ColumnDelta(*p, **k)
visitorcallable = get_engine_visitor(engine, 'schemachanger')
column = sqlalchemy.Column(delta.current_name)
column.delta = delta
column.table = delta.table
engine._run_visitor(visitorcallable, column)
#_engine_run_visitor(engine, visitorcallable, delta)
delta.result_column.delta = delta
delta.result_column.table = delta.table
visitorcallable = get_engine_visitor(engine, 'schemachanger')
engine._run_visitor(visitorcallable, delta.result_column)
# Update column
if col is not None:
@ -155,18 +152,6 @@ def _to_index(index, table=None, engine=None):
return ret
def _normalize_table(column, table):
if table is not None:
if table is not column.table:
# This is a bit of a hack: we end up with dupe PK columns here
pk_names = map(lambda c: c.name, table.primary_key)
if column.primary_key and pk_names.count(column.name):
index = pk_names.index(column_name)
del table.primary_key[index]
table.append_column(column)
return column.table
class _ColumnDelta(dict):
"""Extracts the differences between two columns/column-parameters"""
@ -223,6 +208,7 @@ class _ColumnDelta(dict):
table = k.pop('table')
self.current_name = current_name
self._table = table
self.result_column = table.c.get(current_name, None)
return k
def _init_1col(self, col, *p, **k):
@ -277,9 +263,6 @@ class _ColumnDelta(dict):
getattr(that, 'length', None))
return ret
def accept_schema_visitor(self, visitor):
return visitor.visit_column(self)
class ChangesetTable(object):
"""Changeset extensions to SQLAlchemy tables."""
@ -300,7 +283,7 @@ class ChangesetTable(object):
if not isinstance(column, sqlalchemy.Column):
# It's a column name
try:
column = getattr(self.c, str(column), None)
column = getattr(self.c, str(column))
except AttributeError:
# That column isn't part of the table. We don't need
# its entire definition to drop the column, just its
@ -362,17 +345,23 @@ class ChangesetColumn(object):
k['engine'] = k['table'].bind
return alter_column(self, *p, **k)
def create(self, table=None, *args, **kwargs):
def create(self, table=None, index_name=None, unique_name=None,
primary_key_name=None, *args, **kwargs):
"""Create this column in the database.
Assumes the given table exists. ``ALTER TABLE ADD COLUMN``,
for most databases.
"""
table = _normalize_table(self, table)
engine = table.bind
self.index_name = index_name
self.unique_name = unique_name
self.primary_key_name = primary_key_name
for cons in ('index_name', 'unique_name', 'primary_key_name'):
self._check_sanity_constraints(cons)
self.add_to_table(table)
engine = self.table.bind
visitorcallable = get_engine_visitor(engine, 'columngenerator')
engine._run_visitor(visitorcallable, self, *args, **kwargs)
return self
def drop(self, table=None, *args, **kwargs):
@ -380,12 +369,32 @@ class ChangesetColumn(object):
``ALTER TABLE DROP COLUMN``, for most databases.
"""
table = _normalize_table(self, table)
engine = table.bind
if table is not None:
self.table = table
self.remove_from_table(self.table)
engine = self.table.bind
visitorcallable = get_engine_visitor(engine, 'columndropper')
engine._run_visitor(visitorcallable, self, *args, **kwargs)
return self
def add_to_table(self, table):
if table and not self.table:
self._set_parent(table)
def remove_from_table(self, table):
# TODO: remove indexes, primary keys, constraints, etc
if table.c.contains_column(self):
table.c.remove(self)
def _check_sanity_constraints(self, name):
obj = getattr(self, name)
if (getattr(self, name[:-5]) and not obj):
raise InvalidConstraintError("Column.create() accepts index_name,"
" primary_key_name and unique_name to generate constraints")
if not isinstance(obj, basestring) and obj is not None:
raise InvalidConstraintError(
"%s argument for column must be constraint name" % name)
class ChangesetIndex(object):
"""Changeset extensions to SQLAlchemy Indexes."""

View File

@ -140,7 +140,8 @@ class PythonScript(base.BaseScript):
try:
func(engine)
except TypeError:
print "upgrade/downgrade functions must accept one parameter (migrate_engine)"
print "upgrade/downgrade functions must accept engine parameter (since ver 0.5.5)"
raise
@property
def module(self):

View File

@ -9,6 +9,7 @@ tag_build = .dev
[nosetests]
pdb = true
pdb-failures = true
stop = true
[aliases]
release = egg_info -RDb ''

View File

@ -1,32 +1,29 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.databases import information_schema
import migrate
from migrate import changeset
from migrate.changeset import *
from migrate.changeset.schema import _ColumnDelta
from test import fixture
# TODO: test quoting
# TODO: test all other constraints on create column, test defaults
class TestAddDropColumn(fixture.DB):
"""Test add/drop column through all possible interfaces
also test for constraints"""
level = fixture.DB.CONNECT
meta = MetaData()
# We'll be adding the 'data' column
table_name = 'tmp_adddropcol'
table_int = 0
def _setup(self, url):
super(TestAddDropColumn, self)._setup(url)
self.meta.clear()
self.meta = MetaData()
self.table = Table(self.table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('id', Integer, unique=True),
)
self.meta.bind = self.engine
if self.engine.has_table(self.table.name):
@ -35,72 +32,57 @@ class TestAddDropColumn(fixture.DB):
def _teardown(self):
if self.engine.has_table(self.table.name):
try:
self.table.drop()
except:
pass
self.table.drop()
self.meta.clear()
super(TestAddDropColumn,self)._teardown()
def run_(self,create_column_func,drop_column_func,*col_p,**col_k):
def run_(self, create_column_func, drop_column_func, *col_p, **col_k):
col_name = 'data'
def _assert_numcols(expected,type_):
def assert_numcols(num_of_expected_cols):
# number of cols should be correct in table object and in database
self.refresh_table(self.table_name)
result = len(self.table.c)
self.assertEquals(result,expected,
"# %s cols incorrect: %s != %s"%(type_,result,expected))
if not col_k.get('primary_key',None):
return
# new primary key: check its length too
result = len(self.table.primary_key)
self.assertEquals(result,expected,
"# %s pks incorrect: %s != %s"%(type_,result,expected))
def assert_numcols(expected):
# number of cols should be correct in table object and in database
# Changed: create/drop shouldn't mess with the objects
#_assert_numcols(expected,'object')
# Detect # database cols via autoload
#self.meta.clear()
del self.meta.tables[self.table_name]
self.table=Table(self.table_name,self.meta,autoload=True)
_assert_numcols(expected,'database')
self.assertEquals(result, num_of_expected_cols),
if col_k.get('primary_key', None):
# new primary key: check its length too
result = len(self.table.primary_key)
self.assertEquals(result, num_of_expected_cols)
assert_numcols(1)
if len(col_p) == 0:
col_p = [String(40)]
col = Column(col_name,*col_p,**col_k)
col = Column(col_name, *col_p, **col_k)
create_column_func(col)
#create_column(col,self.table)
assert_numcols(2)
self.assertEquals(getattr(self.table.c,col_name),col)
#drop_column(col,self.table)
col = getattr(self.table.c,col_name)
drop_column_func(col)
col2 = getattr(self.table.c, col_name)
self.assertEquals(col2, col)
drop_column_func(col2)
assert_numcols(1)
@fixture.usedb()
def test_undefined(self):
"""Add/drop columns not yet defined in the table"""
def add_func(col):
return create_column(col,self.table)
return create_column(col, self.table)
def drop_func(col):
return drop_column(col,self.table)
return self.run_(add_func,drop_func)
return drop_column(col, self.table)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_defined(self):
"""Add/drop columns already defined in the table"""
def add_func(col):
self.meta.clear()
self.table = Table(self.table_name,self.meta,
Column('id',Integer,primary_key=True),
self.table = Table(self.table_name, self.meta,
Column('id', Integer, primary_key=True),
col,
)
return create_column(col,self.table)
return create_column(col)
def drop_func(col):
return drop_column(col,self.table)
return self.run_(add_func,drop_func)
return drop_column(col)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_method_bound(self):
@ -108,14 +90,14 @@ class TestAddDropColumn(fixture.DB):
ie. no table parameter passed to function
"""
def add_func(col):
self.assert_(col.table is None,col.table)
self.assert_(col.table is None, col.table)
self.table.append_column(col)
return col.create()
def drop_func(col):
#self.assert_(col.table is None,col.table)
#self.table.append_column(col)
return col.drop()
return self.run_(add_func,drop_func)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_method_notbound(self):
@ -124,7 +106,7 @@ class TestAddDropColumn(fixture.DB):
return col.create(self.table)
def drop_func(col):
return col.drop(self.table)
return self.run_(add_func,drop_func)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_tablemethod_obj(self):
@ -133,7 +115,7 @@ class TestAddDropColumn(fixture.DB):
return self.table.create_column(col)
def drop_func(col):
return self.table.drop_column(col)
return self.run_(add_func,drop_func)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_tablemethod_name(self):
@ -145,69 +127,181 @@ class TestAddDropColumn(fixture.DB):
def drop_func(col):
# Not necessarily bound to table
return self.table.drop_column(col.name)
return self.run_(add_func,drop_func)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_byname(self):
"""Add/drop columns via functions; by table object and column name"""
def add_func(col):
self.table.append_column(col)
return create_column(col.name,self.table)
return create_column(col.name, self.table)
def drop_func(col):
return drop_column(col.name,self.table)
return self.run_(add_func,drop_func)
return drop_column(col.name, self.table)
return self.run_(add_func, drop_func)
@fixture.usedb()
def test_drop_column_not_in_table(self):
"""Drop column by name"""
def add_func(col):
return self.table.create_column(col)
def drop_func(col):
self.table.c.remove(col)
return self.table.drop_column(col.name)
self.run_(add_func, drop_func)
@fixture.usedb()
def test_fk(self):
"""Can create columns with foreign keys"""
reftable = Table('tmp_ref',self.meta,
Column('id',Integer,primary_key=True),
)
# create FK's target
reftable = Table('tmp_ref', self.meta,
Column('id', Integer, primary_key=True),
)
if self.engine.has_table(reftable.name):
reftable.drop()
reftable.create()
def add_func(col):
self.table.append_column(col)
return create_column(col.name, self.table)
def drop_func(col):
ret = drop_column(col.name,self.table)
if self.engine.has_table(reftable.name):
reftable.drop()
return ret
# create column with fk
col = Column('data', Integer, ForeignKey(reftable.c.id))
if self.url.startswith('sqlite'):
self.assertRaises(changeset.exceptions.NotSupportedError,
self.run_, add_func, drop_func, Integer,
ForeignKey(reftable.c.id))
col.create, self.table)
else:
return self.run_(add_func, drop_func, Integer,
ForeignKey(reftable.c.id))
col.create(self.table)
# check if constraint is added
for cons in self.table.constraints:
if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint):
break
else:
self.fail('No constraint found')
# TODO: test on db level if constraints work
self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name)
col.drop(self.table)
if self.engine.has_table(reftable.name):
reftable.drop()
@fixture.usedb(not_supported='sqlite')
def test_pk(self):
"""Can create columns with primary key"""
col = Column('data', Integer)
self.assertRaises(changeset.exceptions.InvalidConstraintError,
col.create, self.table, primary_key_name=True)
col.create(self.table, primary_key_name='data_pkey')
# check if constraint was added (cannot test on objects)
self.table.insert(values={'data': 4}).execute()
try:
self.table.insert(values={'data': 4}).execute()
except sqlalchemy.exc.IntegrityError:
pass
else:
self.fail()
col.drop()
@fixture.usedb(not_supported='mysql')
def test_check(self):
"""Can create columns with check constraint"""
col = Column('data',
Integer,
sqlalchemy.schema.CheckConstraint('data > 4'))
col.create(self.table)
# check if constraint was added (cannot test on objects)
self.table.insert(values={'data': 5}).execute()
try:
self.table.insert(values={'data': 3}).execute()
except sqlalchemy.exc.IntegrityError:
pass
else:
self.fail()
col.drop()
@fixture.usedb(not_supported='sqlite')
def test_unique(self):
"""Can create columns with unique constraint"""
self.assertRaises(changeset.exceptions.InvalidConstraintError,
Column('data', Integer, unique=True).create, self.table)
col = Column('data', Integer)
col.create(self.table, unique_name='data_unique')
# check if constraint was added (cannot test on objects)
self.table.insert(values={'data': 5}).execute()
try:
self.table.insert(values={'data': 5}).execute()
except sqlalchemy.exc.IntegrityError:
pass
else:
self.fail()
col.drop(self.table)
@fixture.usedb()
def test_index(self):
"""Can create columns with indexes"""
self.assertRaises(changeset.exceptions.InvalidConstraintError,
Column('data', Integer).create, self.table, index_name=True)
col = Column('data', Integer)
col.create(self.table, index_name='ix_data')
# check if index was added
self.table.insert(values={'data': 5}).execute()
try:
self.table.insert(values={'data': 5}).execute()
except sqlalchemy.exc.IntegrityError:
pass
else:
self.fail()
col.drop()
@fixture.usedb()
def test_server_defaults(self):
"""Can create columns with server_default values"""
col = Column('data', String(244), server_default='foobar')
col.create(self.table)
self.table.insert().execute()
row = self.table.select(autocommit=True).execute().fetchone()
self.assertEqual(u'foobar', row['data'])
col.drop()
# TODO: test sequence
# TODO: test that if column is appended on creation and removed on deletion
class TestRename(fixture.DB):
"""Tests for table and index rename methods"""
level = fixture.DB.CONNECT
meta = MetaData()
def _setup(self, url):
super(TestRename, self)._setup(url)
self.meta.bind = self.engine #self.meta.connect(self.engine)
self.meta.bind = self.engine
@fixture.usedb()
def test_rename_table(self):
"""Tables can be renamed"""
c_name = 'col_1'
name1 = 'name_one'
name2 = 'name_two'
xname1 = 'x' + name1
xname2 = 'x' + name2
self.column = Column(c_name, Integer)
table_name1 = 'name_one'
table_name2 = 'name_two'
index_name1 = 'x' + table_name1
index_name2 = 'x' + table_name2
self.meta.clear()
self.table = Table(name1, self.meta, self.column)
self.index = Index(xname1, self.column, unique=False)
self.column = Column(c_name, Integer)
self.table = Table(table_name1, self.meta, self.column)
self.index = Index(index_name1, self.column, unique=False)
if self.engine.has_table(self.table.name):
self.table.drop()
if self.engine.has_table(name2):
tmp = Table(name2, self.meta, autoload=True)
if self.engine.has_table(table_name2):
tmp = Table(table_name2, self.meta, autoload=True)
tmp.drop()
tmp.deregister()
del tmp
@ -228,69 +322,72 @@ class TestRename(fixture.DB):
# we know the object's name isn't consistent: just assign it
newname = expected
# Table DB check
#table = self.refresh_table(self.table,newname)
self.meta.clear()
self.table = Table(newname, self.meta, autoload=True)
self.assertEquals(self.table.name,expected)
def assert_index_name(expected,skip_object_check=False):
self.assertEquals(self.table.name, expected)
def assert_index_name(expected, skip_object_check=False):
if not skip_object_check:
# Index object check
self.assertEquals(self.index.name,expected)
self.assertEquals(self.index.name, expected)
else:
# object is inconsistent
self.index.name = expected
# Index DB check
#TODO
# TODO: Index DB check
try:
# Table renames
assert_table_name(name1)
rename_table(self.table, name2)
assert_table_name(name2)
self.table.rename(name1)
assert_table_name(name1)
# ..by just the string
rename_table(name1, name2, engine=self.engine)
assert_table_name(name2, True) # object not updated
assert_table_name(table_name1)
rename_table(self.table, table_name2)
assert_table_name(table_name2)
self.table.rename(table_name1)
assert_table_name(table_name1)
# test by just the string
rename_table(table_name1, table_name2, engine=self.engine)
assert_table_name(table_name2, True) # object not updated
# Index renames
if self.url.startswith('sqlite') or self.url.startswith('mysql'):
self.assertRaises(changeset.exceptions.NotSupportedError,
self.index.rename, xname2)
self.index.rename, index_name2)
else:
assert_index_name(xname1)
rename_index(self.index,xname2,engine=self.engine)
assert_index_name(xname2)
self.index.rename(xname1)
assert_index_name(xname1)
# ..by just the string
rename_index(xname1,xname2,engine=self.engine)
assert_index_name(xname2,True)
assert_index_name(index_name1)
rename_index(self.index, index_name2, engine=self.engine)
assert_index_name(index_name2)
self.index.rename(index_name1)
assert_index_name(index_name1)
# test by just the string
rename_index(index_name1, index_name2, engine=self.engine)
assert_index_name(index_name2, True)
finally:
#self.index.drop()
if self.table.exists():
self.table.drop()
class TestColumnChange(fixture.DB):
level=fixture.DB.CONNECT
level = fixture.DB.CONNECT
table_name = 'tmp_colchange'
def _setup(self, url):
super(TestColumnChange, self)._setup(url)
self.meta = MetaData(self.engine)
self.table = Table(self.table_name,self.meta,
Column('id',Integer,primary_key=True),
Column('data',String(40),server_default=DefaultClause("tluafed"),nullable=True),
self.table = Table(self.table_name, self.meta,
Column('id', Integer, primary_key=True),
Column('data', String(40), server_default=DefaultClause("tluafed"),
nullable=True),
)
if self.table.exists():
self.table.drop()
try:
self.table.create()
except sqlalchemy.exceptions.SQLError,e:
except sqlalchemy.exceptions.SQLError, e:
# SQLite: database schema has changed
if not self.url.startswith('sqlite://'):
raise
def _teardown(self):
if self.table.exists():
try:
@ -299,90 +396,91 @@ class TestColumnChange(fixture.DB):
# SQLite: database schema has changed
if not self.url.startswith('sqlite://'):
raise
#self.engine.echo=False
super(TestColumnChange, self)._teardown()
@fixture.usedb()
def test_rename(self):
"""Can rename a column"""
def num_rows(col,content):
return len(list(self.table.select(col==content).execute()))
def num_rows(col, content):
return len(list(self.table.select(col == content).execute()))
# Table content should be preserved in changed columns
content = "fgsfds"
self.engine.execute(self.table.insert(),data=content,id=42)
self.assertEquals(num_rows(self.table.c.data,content),1)
self.engine.execute(self.table.insert(), data=content, id=42)
self.assertEquals(num_rows(self.table.c.data, content), 1)
# ...as a function, given a column object and the new name
alter_column(self.table.c.data, name='atad')
alter_column('data', name='data2', table=self.table)
self.refresh_table()
alter_column(self.table.c.data2, name='atad')
self.refresh_table(self.table.name)
self.assert_('data' not in self.table.c.keys())
self.assert_('atad' in self.table.c.keys())
#self.assertRaises(AttributeError,getattr,self.table.c,'data')
self.table.c.atad # Should not raise exception
self.assertEquals(num_rows(self.table.c.atad,content),1)
self.assertEquals(num_rows(self.table.c.atad, content), 1)
# ...as a method, given a new name
self.table.c.atad.alter(name='data')
self.refresh_table(self.table.name)
self.assert_('atad' not in self.table.c.keys())
self.table.c.data # Should not raise exception
self.assertEquals(num_rows(self.table.c.data,content),1)
self.assertEquals(num_rows(self.table.c.data, content), 1)
# ...as a function, given a new object
col = Column('atad',String(40),server_default=self.table.c.data.server_default)
col = Column('atad', String(40), server_default=self.table.c.data.server_default)
alter_column(self.table.c.data, col)
self.refresh_table(self.table.name)
self.assert_('data' not in self.table.c.keys())
self.table.c.atad # Should not raise exception
self.assertEquals(num_rows(self.table.c.atad,content),1)
self.assertEquals(num_rows(self.table.c.atad, content), 1)
# ...as a method, given a new object
col = Column('data',String(40),server_default=self.table.c.atad.server_default)
col = Column('data', String(40), server_default=self.table.c.atad.server_default)
self.table.c.atad.alter(col)
self.refresh_table(self.table.name)
self.assert_('atad' not in self.table.c.keys())
self.table.c.data # Should not raise exception
self.assertEquals(num_rows(self.table.c.data,content),1)
self.assertEquals(num_rows(self.table.c.data,content), 1)
@fixture.usedb()
def xtest_fk(self):
"""Can add/drop foreign key constraints to/from a column
Not supported
"""
self.assert_(self.table.c.data.foreign_key is None)
#@fixture.usedb()
#def test_fk(self):
# """Can add/drop foreign key constraints to/from a column
# Not supported
# """
# self.assert_(self.table.c.data.foreign_key is None)
# add
self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id))
self.refresh_table(self.table.name)
self.assert_(self.table.c.data.foreign_key is not None)
# # add
# self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id))
# self.refresh_table(self.table.name)
# self.assert_(self.table.c.data.foreign_key is not None)
# drop
self.table.c.data.alter(foreign_key=None)
self.refresh_table(self.table.name)
self.assert_(self.table.c.data.foreign_key is None)
# # drop
# self.table.c.data.alter(foreign_key=None)
# self.refresh_table(self.table.name)
# self.assert_(self.table.c.data.foreign_key is None)
@fixture.usedb()
def test_type(self):
"""Can change a column's type"""
# Entire column definition given
self.table.c.data.alter(Column('data',String(42)))
self.table.c.data.alter(Column('data', String(42)))
self.refresh_table(self.table.name)
self.assert_(isinstance(self.table.c.data.type,String))
self.assertEquals(self.table.c.data.type.length,42)
self.assert_(isinstance(self.table.c.data.type, String))
self.assertEquals(self.table.c.data.type.length, 42)
# Just the new type
self.table.c.data.alter(type=String(21))
self.refresh_table(self.table.name)
self.assert_(isinstance(self.table.c.data.type,String))
self.assertEquals(self.table.c.data.type.length,21)
self.assert_(isinstance(self.table.c.data.type, String))
self.assertEquals(self.table.c.data.type.length, 21)
# Different type
self.assert_(isinstance(self.table.c.id.type,Integer))
self.assertEquals(self.table.c.id.nullable,False)
self.assert_(isinstance(self.table.c.id.type, Integer))
self.assertEquals(self.table.c.id.nullable, False)
self.table.c.id.alter(type=String(20))
self.assertEquals(self.table.c.id.nullable,False)
self.assertEquals(self.table.c.id.nullable, False)
self.refresh_table(self.table.name)
self.assert_(isinstance(self.table.c.id.type,String))
self.assert_(isinstance(self.table.c.id.type, String))
@fixture.usedb(not_supported='mysql')
def test_default(self):
@ -391,7 +489,7 @@ class TestColumnChange(fixture.DB):
application / by SA
"""
#self.engine.echo=True
self.assertEquals(self.table.c.data.server_default.arg,'tluafed')
self.assertEquals(self.table.c.data.server_default.arg, 'tluafed')
# Just the new default
default = 'my_default'
@ -403,7 +501,7 @@ class TestColumnChange(fixture.DB):
# Column object
default = 'your_default'
self.table.c.data.alter(Column('data',String(40),server_default=DefaultClause(default)))
self.table.c.data.alter(Column('data', String(40), server_default=DefaultClause(default)))
self.refresh_table(self.table.name)
self.assert_(default in str(self.table.c.data.server_default.arg))
@ -412,90 +510,101 @@ class TestColumnChange(fixture.DB):
self.refresh_table(self.table.name)
# server_default isn't necessarily None for Oracle
#self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
self.engine.execute(self.table.insert(),id=11)
self.engine.execute(self.table.insert(), id=11)
row = self.table.select().execute().fetchone()
self.assert_(row['data'] is None,row['data'])
self.assert_(row['data'] is None, row['data'])
@fixture.usedb()
def test_null(self):
"""Can change a column's null constraint"""
self.assertEquals(self.table.c.data.nullable,True)
self.assertEquals(self.table.c.data.nullable, True)
# Column object
self.table.c.data.alter(Column('data',String(40),nullable=False))
self.table.c.data.alter(Column('data', String(40), nullable=False))
self.table.nullable=None
self.refresh_table(self.table.name)
self.assertEquals(self.table.c.data.nullable,False)
self.assertEquals(self.table.c.data.nullable, False)
# Just the new status
self.table.c.data.alter(nullable=True)
self.refresh_table(self.table.name)
self.assertEquals(self.table.c.data.nullable,True)
self.assertEquals(self.table.c.data.nullable, True)
@fixture.usedb()
def xtest_pk(self):
"""Can add/drop a column to/from its table's primary key
Not supported
"""
self.engine.echo = True
self.assertEquals(len(self.table.primary_key),1)
#@fixture.usedb()
#def test_pk(self):
# """Can add/drop a column to/from its table's primary key
# Not supported
# """
# self.engine.echo = True
# self.assertEquals(len(self.table.primary_key), 1)
# Entire column definition
self.table.c.data.alter(Column('data',String,primary_key=True))
self.refresh_table(self.table.name)
self.assertEquals(len(self.table.primary_key),2)
# # Entire column definition
# self.table.c.data.alter(Column('data', String, primary_key=True))
# self.refresh_table(self.table.name)
# self.assertEquals(len(self.table.primary_key), 2)
# # Just the new status
# self.table.c.data.alter(primary_key=False)
# self.refresh_table(self.table.name)
# self.assertEquals(len(self.table.primary_key), 1)
# Just the new status
self.table.c.data.alter(primary_key=False)
self.refresh_table(self.table.name)
self.assertEquals(len(self.table.primary_key),1)
class TestColumnDelta(fixture.Base):
def test_deltas(self):
def mkcol(name='id',type=String,*p,**k):
return Column(name,type,*p,**k)
col_orig = mkcol(primary_key=True)
def mkcol(name='id', type=String, *p, **k):
return Column(name, type, *p, **k)
def verify(expected,original,*p,**k):
delta = _ColumnDelta(original,*p,**k)
def verify(expected, original, *p, **k):
delta = _ColumnDelta(original, *p, **k)
result = delta.keys()
result.sort()
self.assertEquals(expected,result)
self.assertEquals(expected, result)
return delta
verify([],col_orig)
verify(['name'],col_orig,'ids')
col_orig = mkcol(primary_key=True)
verify([], col_orig)
verify(['name'], col_orig, 'ids')
# Parameters are always executed, even if they're 'unchanged'
# (We can't assume given column is up-to-date)
verify(['name','primary_key','type'],col_orig,'id',Integer,primary_key=True)
verify(['name','primary_key','type'],col_orig,name='id',type=Integer,primary_key=True)
verify(['name', 'primary_key', 'type'],
col_orig, 'id', Integer, primary_key=True)
verify(['name', 'primary_key', 'type'],
col_orig, name='id', type=Integer, primary_key=True)
# Can compare two columns and find differences
col_new = mkcol(name='ids',primary_key=True)
verify([],col_orig,col_orig)
verify(['name'],col_orig,col_orig,'ids')
verify(['name'],col_orig,col_orig,name='ids')
verify(['name'],col_orig,col_new)
verify(['name','type'],col_orig,col_new,type=String)
col_new = mkcol(name='ids', primary_key=True)
verify([], col_orig, col_orig)
verify(['name'], 'ids', table=Table('test', MetaData()), name='hey')
verify(['name'], col_orig, col_orig, 'ids')
verify(['name'], col_orig, col_orig, name='ids')
verify(['name'], col_orig, col_new)
verify(['name','type'], col_orig, col_new, type=String)
# Change name, given an up-to-date definition and the current name
delta = verify(['name'],col_new,current_name='id')
self.assertEquals(delta.get('name'),'ids')
delta = verify(['name'], col_new, current_name='id')
self.assertEquals(delta.get('name'), 'ids')
# Change other params at the same time
verify(['name','type'],col_new,current_name='id',type=String)
verify(['name', 'type'], col_new, current_name='id', type=String)
# Type comparisons
verify([],mkcol(type=String),mkcol(type=String))
verify(['type'],mkcol(type=String),mkcol(type=Integer))
verify(['type'],mkcol(type=String),mkcol(type=String(42)))
verify([],mkcol(type=String(42)),mkcol(type=String(42)))
verify(['type'],mkcol(type=String(24)),mkcol(type=String(42)))
verify([], mkcol(type=String), mkcol(type=String))
verify(['type'], mkcol(type=String), mkcol(type=Integer))
verify(['type'], mkcol(type=String), mkcol(type=String(42)))
verify([], mkcol(type=String(42)), mkcol(type=String(42)))
verify(['type'], mkcol(type=String(24)), mkcol(type=String(42)))
# Other comparisons
verify(['primary_key'],mkcol(nullable=False),mkcol(primary_key=True))
verify(['primary_key'], mkcol(nullable=False), mkcol(primary_key=True))
# PK implies nullable=False
verify(['nullable','primary_key'],mkcol(nullable=True),mkcol(primary_key=True))
verify([],mkcol(primary_key=True),mkcol(primary_key=True))
verify(['nullable'],mkcol(nullable=True),mkcol(nullable=False))
verify([],mkcol(nullable=True),mkcol(nullable=True))
verify(['default'],mkcol(default=None),mkcol(default='42'))
verify([],mkcol(default=None),mkcol(default=None))
verify([],mkcol(default='42'),mkcol(default='42'))
verify(['nullable', 'primary_key'],
mkcol(nullable=True), mkcol(primary_key=True))
verify([], mkcol(primary_key=True), mkcol(primary_key=True))
verify(['nullable'], mkcol(nullable=True), mkcol(nullable=False))
verify([], mkcol(nullable=True), mkcol(nullable=True))
verify(['default'], mkcol(default=None), mkcol(default='42'))
verify([], mkcol(default=None), mkcol(default=None))
verify([], mkcol(default='42'), mkcol(default='42'))

View File

@ -78,7 +78,11 @@ class TestConstraint(CommonTestConstraint):
# Add a FK by creating a FK constraint
self.assertEquals(self.table.c.fkey.foreign_keys._list, [])
fk = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id], name="fk_id_fkey")
fk = ForeignKeyConstraint([self.table.c.fkey],
[self.table.c.id],
name="fk_id_fkey",
onupdate="CASCADE",
ondelete="CASCADE")
self.assert_(self.table.c.fkey.foreign_keys._list is not [])
self.assertEquals(list(fk.columns), [self.table.c.fkey])
self.assertEquals([e.column for e in fk.elements], [self.table.c.id])
@ -89,6 +93,13 @@ class TestConstraint(CommonTestConstraint):
index = Index('index_name', self.table.c.fkey)
index.create()
fk.create()
# test for ondelete/onupdate
fkey = self.table.c.fkey.foreign_keys._list[0]
self.assertEquals(fkey.onupdate, "CASCADE")
self.assertEquals(fkey.ondelete, "CASCADE")
# TODO: test on real db if it was set
self.refresh_table()
self.assert_(self.table.c.fkey.foreign_keys._list is not [])
@ -109,12 +120,51 @@ class TestConstraint(CommonTestConstraint):
@fixture.usedb()
def test_drop_cascade(self):
"""Drop constraint cascaded"""
pk = PrimaryKeyConstraint('id', table=self.table, name="id_pkey")
pk.create()
self.refresh_table()
# Drop the PK constraint forcing cascade
pk.drop(cascade=True)
# TODO: add real assertion if it was added
@fixture.usedb(supported=['mysql'])
def test_fail_mysql_check_constraints(self):
"""Check constraints raise NotSupported for mysql on drop"""
cons = CheckConstraint('id > 3', name="id_check", table=self.table)
cons.create()
self.refresh_table()
try:
cons.drop()
except NotSupportedError:
pass
else:
self.fail()
@fixture.usedb(not_supported=['sqlite', 'mysql'])
def test_named_check_constraints(self):
"""Check constraints can be defined, created, and dropped"""
self.assertRaises(InvalidConstraintError,
CheckConstraint, 'id > 3')
cons = CheckConstraint('id > 3', name="id_check", table=self.table)
cons.create()
self.refresh_table()
self.table.insert(values={'id': 4}).execute()
try:
self.table.insert(values={'id': 1}).execute()
except IntegrityError:
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.drop()
self.refresh_table()
self.table.insert(values={'id': 2}).execute()
self.table.insert(values={'id': 1}).execute()
class TestAutoname(CommonTestConstraint):
@ -154,10 +204,6 @@ class TestAutoname(CommonTestConstraint):
def test_autoname_fk(self):
"""ForeignKeyConstraints can guess their name if None is given"""
cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id])
if self.url.startswith('mysql'):
# MySQL FKs need an index
index = Index('index_name', self.table.c.fkey)
index.create()
cons.create()
self.refresh_table()
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
@ -170,10 +216,6 @@ class TestAutoname(CommonTestConstraint):
# test string names
cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table)
if self.url.startswith('mysql'):
# MySQL FKs need an index
index = Index('index_name', self.table.c.fkey)
index.create()
cons.create()
self.refresh_table()
self.table.c.fkey.foreign_keys[0].column is self.table.c.id
@ -182,7 +224,7 @@ class TestAutoname(CommonTestConstraint):
cons.name = None
cons.drop()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
@fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql'])
def test_autoname_check(self):
"""CheckConstraints can guess their name if None is given"""
cons = CheckConstraint('id > 3', columns=[self.table.c.id])
@ -190,20 +232,21 @@ class TestAutoname(CommonTestConstraint):
self.refresh_table()
self.table.insert(values={'id': 4}).execute()
try:
self.table.insert(values={'id': 1}).execute()
except IntegrityError:
pass
else:
self.fail()
if not self.engine.name == 'mysql':
self.table.insert(values={'id': 4}).execute()
try:
self.table.insert(values={'id': 1}).execute()
except IntegrityError:
pass
else:
self.fail()
# Remove the name, drop the constraint; it should succeed
cons.name = None
cons.drop()
self.refresh_table()
self.table.insert(values={'id': 2}).execute()
self.table.insert(values={'id': 5}).execute()
self.table.insert(values={'id': 1}).execute()
@fixture.usedb(not_supported=['oracle', 'sqlite'])
def test_autoname_unique(self):

View File

@ -130,7 +130,7 @@ class DB(Base):
def _not_supported(self, url):
return not self._supported(url)
def refresh_table(self,name=None):
def refresh_table(self, name=None):
"""Reload the table from the database
Assumes we're working with only a single table, self.table, and
metadata self.meta

View File

@ -196,7 +196,7 @@ class TestControlledSchema(fixture.Pathed, fixture.DB):
def construct_model(self):
meta = MetaData()
user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String))
user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String(245)))
return meta