From cde0f9b52d900288d5ff85fe0e742f9d73c37e6f Mon Sep 17 00:00:00 2001 From: iElectric Date: Sat, 20 Jun 2009 22:33:03 +0000 Subject: [PATCH] updated changeset tests. whole package is finally PEP8. fixed mysql tests&bugs. updated docs where apropriate. changeset test coverage almost at 100% --- docs/changelog.rst | 6 +- docs/changeset.rst | 34 +- docs/index.rst | 58 +-- migrate/changeset/ansisql.py | 139 +++---- migrate/changeset/constraint.py | 5 +- migrate/changeset/databases/mysql.py | 54 +-- migrate/changeset/databases/oracle.py | 5 +- migrate/changeset/databases/sqlite.py | 20 +- migrate/changeset/databases/visitor.py | 4 +- migrate/changeset/schema.py | 69 ++-- migrate/versioning/script/py.py | 3 +- setup.cfg | 1 + test/changeset/test_changeset.py | 505 +++++++++++++++---------- test/changeset/test_constraint.py | 79 +++- test/fixture/database.py | 2 +- test/versioning/test_schema.py | 2 +- 16 files changed, 580 insertions(+), 406 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index f4e12d5..11efbda 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,9 +1,13 @@ 0.5.5 ----- -- code coverage is up to 99% +- server_defaults passed to column.create are now issued correctly +- constraints passed to column.create are correctly interpreted (ALTER TABLE ADD CONSTRAINT is issued after ADD COLUMN) +- column.create accepts `primary_key_name`, `unique_name` and `index_name` as string value which is used as contraint name when adding a column - Constraint classes have cascade=True keyword argument to issue CASCADE drop where supported - added UniqueConstraint/CheckConstraint and corresponding create/drop methods +- use SQLAlchemy quoting system to avoid name conflicts (for issue 32) +- code coverage is up to 99% with more than 100 tests - partial refactoring of changeset package - majoy update to documentation - dialect support table was added to documentation diff --git a/docs/changeset.rst b/docs/changeset.rst index 133a0b1..f2dbd68 100644 --- a/docs/changeset.rst +++ b/docs/changeset.rst @@ -31,6 +31,8 @@ Given a standard SQLAlchemy table:: ) table.create() +.. _column-create: + Create a column:: col = Column('col1', String) @@ -39,12 +41,16 @@ Create a column:: # Column is added to table based on its name assert col is table.c.col1 -Drop a column (Not supported by SQLite_):: +.. _column-drop: + +Drop a column:: col.drop() -Alter a column (Not supported by SQLite_):: +.. _column-alter: + +Alter a column:: col.alter(name='col2') @@ -52,17 +58,19 @@ Alter a column (Not supported by SQLite_):: assert col is table.c.col2 # Other properties can be modified as well - col.alter(type=String(42), - default="life, the universe, and everything", - nullable=False, - ) + col.alter(type=String(42), default="life, the universe, and everything", nullable=False) # Given another column object, col1.alter(col2), col1 will be changed to match col2 - col.alter(Column('col3',String(77),nullable=True)) + col.alter(Column('col3', String(77), nullable=True)) assert col.nullable assert table.c.col3 is col -.. _sqlite: http://www.sqlite.org/lang_altertable.html + +.. note:: + + Since version ``0.5.5`` you can pass primary_key_name, index_name and unique_name to column.create method to issue ALTER TABLE ADD CONSTRAINT after changing the column. Note for multi columns constraints and other advanced configuration, check :ref:`constraint tutorial `. + +.. _table-rename: Table ===== @@ -76,6 +84,9 @@ Rename a table:: .. _`table create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#creating-and-dropping-database-tables .. currentmodule:: migrate.changeset.constraint + +.. _index-rename: + Index ===== @@ -87,6 +98,9 @@ Rename an index, given an SQLAlchemy ``Index`` object:: .. _`index create/drop`: http://www.sqlalchemy.org/docs/05/metadata.html#indexes + +.. _constraint-tutorial: + Constraint ========== @@ -106,7 +120,9 @@ The following rundowns are true for all constraints classes: # Drop the constraint cons.drop() -or you can pass column objects (and table argument can be left out). +or you can pass in column objects (and table argument can be left out):: + + cons = PrimaryKeyConstraint(col1, col2) 3. Some dialects support CASCADE option when dropping constraints:: diff --git a/docs/index.rst b/docs/index.rst index 9e1d96c..aba488c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -41,36 +41,36 @@ Download and Development Dialect support ----------------------------------- +--------------- -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| Operation / Dialect | :ref:`sqlite ` | :ref:`postgres ` | :ref:`mysql ` | :ref:`oracle ` | firebird | mssql | -| | | | | | | | -+==========================+==========================+==============================+========================+===========================+==========+=======+ -| ALTER TABLE | yes | yes | | | | | -| RENAME TABLE | | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | yes | yes | | | | | -| RENAME COLUMN | (workaround) [#1]_ | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | yes | yes | | | | | -| DROP COLUMN | (workaround) [#1]_ | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | yes | yes | | | | | -| ADD COLUMN | (with limitations) [#2]_ | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | no | yes | | | | | -| ADD CONSTRAINT | | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | no | yes | | | | | -| DROP CONSTRAINT | | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| ALTER TABLE | no | yes | | | | | -| ALTER COLUMN | | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ -| RENAME INDEX | no | yes | | | | | -| | | | | | | | -+--------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| Operation / Dialect | :ref:`sqlite ` | :ref:`postgres ` | :ref:`mysql ` | :ref:`oracle ` | firebird | mssql | +| | | | | | | | ++=========================================================+==========================+==============================+========================+===========================+==========+=======+ +| :ref:`ALTER TABLE RENAME TABLE ` | yes | yes | yes | | | | +| | | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`ALTER TABLE RENAME COLUMN ` | yes | yes | yes | | | | +| | (workaround) [#1]_ | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`ALTER TABLE ADD COLUMN ` | yes | yes | yes | | | | +| | (with limitations) [#2]_ | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`ALTER TABLE DROP COLUMN ` | no | yes | yes | | | | +| | | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`ALTER TABLE ADD CONSTRAINT ` | no | yes | yes | | | | +| | | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`ALTER TABLE DROP CONSTRAINT `| no | yes | yes | | | | +| | | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ +| :ref:`RENAME INDEX ` | no | yes | no | | | | +| | | | | | | | ++---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+----------+-------+ .. [#1] Table is renamed to temporary table, new table is created followed by INSERT statements. diff --git a/migrate/changeset/ansisql.py b/migrate/changeset/ansisql.py index 7762031..f80c38a 100644 --- a/migrate/changeset/ansisql.py +++ b/migrate/changeset/ansisql.py @@ -6,11 +6,12 @@ """ import sqlalchemy as sa from sqlalchemy.engine.default import DefaultDialect -from sqlalchemy.sql.compiler import SchemaGenerator, SchemaDropper from sqlalchemy.schema import (ForeignKeyConstraint, PrimaryKeyConstraint, CheckConstraint, - UniqueConstraint) + UniqueConstraint, + Index) +from sqlalchemy.sql.compiler import SchemaGenerator, SchemaDropper from migrate.changeset import exceptions, constraint @@ -44,28 +45,29 @@ class AlterTableVisitor(SchemaIterator): self.append('\nALTER TABLE %s ' % self.preparer.format_table(table)) return table - def _pk_constraint(self, table, column, status): - """Create a primary key constraint from a table, column. + # DEPRECATED: use plain constraints instead + #def _pk_constraint(self, table, column, status): + # """Create a primary key constraint from a table, column. - Status: true if the constraint is being added; false if being dropped - """ - if isinstance(column, basestring): - column = getattr(table.c, name) + # Status: true if the constraint is being added; false if being dropped + # """ + # if isinstance(column, basestring): + # column = getattr(table.c, name) - ret = constraint.PrimaryKeyConstraint(*table.primary_key) - if status: - # Created PK - ret.c.append(column) - else: - # Dropped PK - names = [c.name for c in cons.c] - index = names.index(col.name) - del ret.c[index] + # ret = constraint.PrimaryKeyConstraint(*table.primary_key) + # if status: + # # Created PK + # ret.c.append(column) + # else: + # # Dropped PK + # names = [c.name for c in cons.c] + # index = names.index(col.name) + # del ret.c[index] - # Allow explicit PK name assignment - if isinstance(pk, basestring): - ret.name = pk - return ret + # # Allow explicit PK name assignment + # if isinstance(pk, basestring): + # ret.name = pk + # return ret class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator): @@ -77,28 +79,36 @@ class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator): :param column: column object :type column: :class:`sqlalchemy.Column` instance """ + if column.default is not None: + self.traverse_single(column.default) + table = self.start_alter_table(column) self.append("ADD ") - colspec = self.get_column_specification(column) - self.append(colspec) + self.append(self.get_column_specification(column)) + + for cons in column.constraints: + self.traverse_single(cons) self.execute() - # add in foreign keys - if column.foreign_keys: - self.visit_alter_foriegn_keys(column) + # ALTER TABLE STATEMENTS - def visit_alter_foriegn_keys(self, column): + # add indexes and unique constraints + if column.index_name: + ix = Index(column.index_name, + column, + unique=bool(column.index_name or column.index)) + ix.create() + elif column.unique_name: + constraint.UniqueConstraint(column, name=column.unique_name).create() + + # SA bounds FK constraints to table, add manually for fk in column.foreign_keys: - self.define_foreign_key(fk.constraint) - - def visit_table(self, table): - """Default table visitor, does nothing. - - :param table: table object - :type table: :class:`sqlalchemy.Table` instance - """ - pass + self.add_foreignkey(fk.constraint) + # add primary key constraint if needed + if column.primary_key_name: + cons = constraint.PrimaryKeyConstraint(column, name=column.primary_key_name) + cons.create() class ANSIColumnDropper(AlterTableVisitor, SchemaDropper): @@ -113,7 +123,7 @@ class ANSIColumnDropper(AlterTableVisitor, SchemaDropper): :type column: :class:`sqlalchemy.Column` """ table = self.start_alter_table(column) - self.append(' DROP COLUMN %s' % self.preparer.format_column(column)) + self.append('DROP COLUMN %s' % self.preparer.format_column(column)) self.execute() @@ -159,43 +169,25 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator): # are managed by the app, not the db. self._run_subvisit(delta, self._visit_column_default) if 'name' in keys: - self._run_subvisit(delta, self._visit_column_name) + self._run_subvisit(delta, self._visit_column_name, start_alter=False) - def _run_subvisit(self, delta, func): + def _run_subvisit(self, delta, func, start_alter=True): """Runs visit method based on what needs to be changed on column""" table = self._to_table(delta.table) col_name = delta.current_name + if start_alter: + self.start_alter_column(table, col_name) ret = func(table, col_name, delta) self.execute() - def _visit_column_foreign_key(self, delta): - table = delta.table - column = getattr(table.c, delta.current_name) - cons = constraint.ForeignKeyConstraint(column, autoload=True) - fk = delta['foreign_key'] - if fk: - # For now, cons.columns is limited to one column: - # no multicolumn FKs - column.foreign_key = ForeignKey(*cons.columns) - else: - column_foreign_key = None - cons.drop() - cons.create() - - def _visit_column_primary_key(self, delta): - table = delta.table - col = getattr(table.c, delta.current_name) - pk = delta['primary_key'] - cons = self._pk_constraint(table, col, pk) - cons.drop() - cons.create() - - def _visit_column_nullable(self, table, col_name, delta): - nullable = delta['nullable'] - table = self._to_table(table) + def start_alter_column(self, table, col_name): + """Starts ALTER COLUMN""" self.start_alter_table(table) # TODO: use preparer.format_column self.append("ALTER COLUMN %s " % self.preparer.quote_identifier(col_name)) + + def _visit_column_nullable(self, table, col_name, delta): + nullable = delta['nullable'] if nullable: self.append("DROP NOT NULL") else: @@ -207,9 +199,6 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator): # reason dummy = sa.Column(None, None, server_default=server_default) default_text = self.get_column_default_string(dummy) - self.start_alter_table(table) - # TODO: use preparer.format_column - self.append("ALTER COLUMN %s " % self.preparer.quote_identifier(col_name)) if default_text is not None: self.append("SET DEFAULT %s" % default_text) else: @@ -218,15 +207,10 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator): def _visit_column_type(self, table, col_name, delta): type_ = delta['type'] if not isinstance(type_, sa.types.AbstractType): - # It's the class itself, not an instance... make an - # instance + # It's the class itself, not an instance... make an instance type_ = type_() type_text = type_.dialect_impl(self.dialect).get_col_spec() - self.start_alter_table(table) - # TODO: does type need formating? - # TODO: use preparer.format_column - self.append("ALTER COLUMN %s TYPE %s" % - (self.preparer.quote_identifier(col_name), type_text)) + self.append("TYPE %s" % type_text) def _visit_column_name(self, table, col_name, delta): new_name = delta['name'] @@ -292,13 +276,13 @@ class ANSIConstraintGenerator(ANSIConstraintCommon, SchemaGenerator): if cons.name is not None: self.append("CONSTRAINT %s " % self.preparer.format_constraint(cons)) - self.append(" CHECK (%s)" % cons.sqltext) + self.append("CHECK (%s)" % cons.sqltext) self.define_constraint_deferrability(cons) elif isinstance(cons, UniqueConstraint): if cons.name is not None: self.append("CONSTRAINT %s " % self.preparer.format_constraint(cons)) - self.append(" UNIQUE (%s)" % \ + self.append("UNIQUE (%s)" % \ (', '.join(self.preparer.quote(c.name, c.quote) for c in cons))) self.define_constraint_deferrability(cons) else: @@ -317,7 +301,8 @@ class ANSIConstraintDropper(ANSIConstraintCommon, SchemaDropper): def _visit_constraint(self, constraint): self.start_alter_table(constraint) self.append("DROP CONSTRAINT ") - self.append(self.get_constraint_name(constraint)) + constraint.name = self.get_constraint_name(constraint) + self.append(self.preparer.format_constraint(constraint)) if constraint.cascade: self.append(" CASCADE") self.execute() diff --git a/migrate/changeset/constraint.py b/migrate/changeset/constraint.py index f8e6871..4bdbba8 100644 --- a/migrate/changeset/constraint.py +++ b/migrate/changeset/constraint.py @@ -1,7 +1,6 @@ """ This module defines standalone schema constraint classes. """ -import sqlalchemy from sqlalchemy import schema from migrate.changeset.exceptions import * @@ -142,7 +141,7 @@ class CheckConstraint(ConstraintChangeset, schema.CheckConstraint): __visit_name__ = 'migrate_check_constraint' def __init__(self, sqltext, *args, **kwargs): - cols = kwargs.pop('columns', False) + cols = kwargs.pop('columns', []) if not cols and not kwargs.get('name', False): raise InvalidConstraintError('You must either set "name"' 'parameter or "columns" to autogenarate it.') @@ -169,6 +168,8 @@ class UniqueConstraint(ConstraintChangeset, schema.UniqueConstraint): :param table: If columns are passed as strings, this kw is required :type table: Table instance :type cols: strings or Column instances + + .. versionadded:: 0.5.5 """ __visit_name__ = 'migrate_unique_constraint' diff --git a/migrate/changeset/databases/mysql.py b/migrate/changeset/databases/mysql.py index fc65569..ea83d2a 100644 --- a/migrate/changeset/databases/mysql.py +++ b/migrate/changeset/databases/mysql.py @@ -20,39 +20,28 @@ class MySQLColumnDropper(ansisql.ANSIColumnDropper): class MySQLSchemaChanger(MySQLSchemaGenerator, ansisql.ANSISchemaChanger): - def visit_column(self, delta): - keys = delta.keys() - if 'type' in keys or 'nullable' in keys or 'name' in keys: - self._run_subvisit(delta, self._visit_column_change) - if 'server_default' in keys: - # Column name might have changed above - col_name = delta.get('name', delta.current_name) - self._run_subvisit(delta, self._visit_column_default, - col_name=col_name) - - def _visit_column_change(self, table_name, col_name, delta): - if not hasattr(delta, 'result_column'): - # Mysql needs the whole column definition, not just a lone - # name/type - raise exceptions.NotSupportedError( - "A column object is required to do this") - - column = delta.result_column - # needed by get_column_specification - if not column.table: - column.table = delta.table + def visit_column(self, column): + delta = column.delta + table = column.table colspec = self.get_column_specification(column) - # TODO: we need table formating here - self.start_alter_table(self.preparer.quote(table_name, True)) - self.append("CHANGE COLUMN ") - self.append(self.preparer.quote(col_name, True)) - self.append(' ') + + if not hasattr(delta, 'result_column'): + # Mysql needs the whole column definition, not just a lone name/type + raise exceptions.NotSupportedError( + "A column object must be present in table to alter it") + + self.start_alter_table(table) + + old_col_name = self.preparer.quote(delta.current_name, column.quote) + self.append("CHANGE COLUMN %s " % old_col_name) self.append(colspec) + self.execute() def visit_index(self, param): # If MySQL can do this, I can't find how raise exceptions.NotSupportedError("MySQL cannot rename indexes") + class MySQLConstraintGenerator(ansisql.ANSIConstraintGenerator): pass @@ -67,9 +56,22 @@ class MySQLConstraintDropper(ansisql.ANSIConstraintDropper): def visit_migrate_foreign_key_constraint(self, constraint): self.start_alter_table(constraint) self.append("DROP FOREIGN KEY ") + constraint.name = self.get_constraint_name(constraint) self.append(self.preparer.format_constraint(constraint)) self.execute() + def visit_migrate_check_constraint(self, *p, **k): + raise exceptions.NotSupportedError("MySQL does not support CHECK" + " constraints, use triggers instead.") + + def visit_migrate_unique_constraint(self, constraint, *p, **k): + self.start_alter_table(constraint) + self.append('DROP INDEX ') + constraint.name = self.get_constraint_name(constraint) + self.append(self.preparer.format_constraint(constraint)) + self.execute() + + class MySQLDialect(ansisql.ANSIDialect): columngenerator = MySQLColumnGenerator columndropper = MySQLColumnDropper diff --git a/migrate/changeset/databases/oracle.py b/migrate/changeset/databases/oracle.py index fc45a0f..abdaad8 100644 --- a/migrate/changeset/databases/oracle.py +++ b/migrate/changeset/databases/oracle.py @@ -1,10 +1,11 @@ """ Oracle database specific implementations of changeset classes. """ +import sqlalchemy as sa +from sqlalchemy.databases import oracle as sa_base from migrate.changeset import ansisql, exceptions -from sqlalchemy.databases import oracle as sa_base -import sqlalchemy as sa + OracleSchemaGenerator = sa_base.OracleSchemaGenerator diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py index 94ac940..fa9f381 100644 --- a/migrate/changeset/databases/sqlite.py +++ b/migrate/changeset/databases/sqlite.py @@ -3,10 +3,10 @@ .. _`SQLite`: http://www.sqlite.org/ """ -from migrate.changeset import ansisql, exceptions, constraint from sqlalchemy.databases import sqlite as sa_base -from sqlalchemy import Table, MetaData -#import sqlalchemy as sa + +from migrate.changeset import ansisql, exceptions + SQLiteSchemaGenerator = sa_base.SQLiteSchemaGenerator @@ -20,12 +20,13 @@ class SQLiteCommon(object): class SQLiteHelper(SQLiteCommon): def visit_column(self, column): - try: - table = self._to_table(column.table) - except: - table = self._to_table(column) - raise + table = self._to_table(column.table) table_name = self.preparer.format_table(table) + + # we remove all constraints, indexes so it doesnt recreate them + table.indexes = set() + table.constraints = set() + self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) self.execute() @@ -42,7 +43,7 @@ class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon, ansisql.ANSIColumnGenerator): """SQLite ColumnGenerator""" - def visit_alter_foriegn_keys(self, column): + def add_foreignkey(self, constraint): """Does not support ALTER TABLE ADD FOREIGN KEY""" self._not_supported("ALTER TABLE ADD CONSTRAINT") @@ -51,7 +52,6 @@ class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper): """SQLite ColumnDropper""" def _modify_table(self, table, column): - del table.columns[column.name] columns = ' ,'.join(map(self.preparer.format_column, table.columns)) return 'INSERT INTO %(table_name)s SELECT ' + columns + \ ' from migration_tmp' diff --git a/migrate/changeset/databases/visitor.py b/migrate/changeset/databases/visitor.py index f0fa973..2f282c7 100644 --- a/migrate/changeset/databases/visitor.py +++ b/migrate/changeset/databases/visitor.py @@ -2,8 +2,10 @@ Module for visitor class mapping. """ import sqlalchemy as sa -from migrate.changeset.databases import sqlite, postgres, mysql, oracle + from migrate.changeset import ansisql +from migrate.changeset.databases import sqlite, postgres, mysql, oracle + # Map SA dialects to the corresponding Migrate extensions dialects = { diff --git a/migrate/changeset/schema.py b/migrate/changeset/schema.py index ea066aa..dae6be0 100644 --- a/migrate/changeset/schema.py +++ b/migrate/changeset/schema.py @@ -1,8 +1,6 @@ """ Schema module providing common schema operations. """ -import re - import sqlalchemy from migrate.changeset.databases.visitor import (get_engine_visitor, @@ -101,13 +99,12 @@ def alter_column(*p, **k): engine = k['engine'] delta = _ColumnDelta(*p, **k) - visitorcallable = get_engine_visitor(engine, 'schemachanger') - column = sqlalchemy.Column(delta.current_name) - column.delta = delta - column.table = delta.table - engine._run_visitor(visitorcallable, column) - #_engine_run_visitor(engine, visitorcallable, delta) + delta.result_column.delta = delta + delta.result_column.table = delta.table + + visitorcallable = get_engine_visitor(engine, 'schemachanger') + engine._run_visitor(visitorcallable, delta.result_column) # Update column if col is not None: @@ -155,18 +152,6 @@ def _to_index(index, table=None, engine=None): return ret -def _normalize_table(column, table): - if table is not None: - if table is not column.table: - # This is a bit of a hack: we end up with dupe PK columns here - pk_names = map(lambda c: c.name, table.primary_key) - if column.primary_key and pk_names.count(column.name): - index = pk_names.index(column_name) - del table.primary_key[index] - table.append_column(column) - return column.table - - class _ColumnDelta(dict): """Extracts the differences between two columns/column-parameters""" @@ -223,6 +208,7 @@ class _ColumnDelta(dict): table = k.pop('table') self.current_name = current_name self._table = table + self.result_column = table.c.get(current_name, None) return k def _init_1col(self, col, *p, **k): @@ -277,9 +263,6 @@ class _ColumnDelta(dict): getattr(that, 'length', None)) return ret - def accept_schema_visitor(self, visitor): - return visitor.visit_column(self) - class ChangesetTable(object): """Changeset extensions to SQLAlchemy tables.""" @@ -300,7 +283,7 @@ class ChangesetTable(object): if not isinstance(column, sqlalchemy.Column): # It's a column name try: - column = getattr(self.c, str(column), None) + column = getattr(self.c, str(column)) except AttributeError: # That column isn't part of the table. We don't need # its entire definition to drop the column, just its @@ -362,17 +345,23 @@ class ChangesetColumn(object): k['engine'] = k['table'].bind return alter_column(self, *p, **k) - def create(self, table=None, *args, **kwargs): + def create(self, table=None, index_name=None, unique_name=None, + primary_key_name=None, *args, **kwargs): """Create this column in the database. Assumes the given table exists. ``ALTER TABLE ADD COLUMN``, for most databases. """ - table = _normalize_table(self, table) - engine = table.bind + self.index_name = index_name + self.unique_name = unique_name + self.primary_key_name = primary_key_name + for cons in ('index_name', 'unique_name', 'primary_key_name'): + self._check_sanity_constraints(cons) + + self.add_to_table(table) + engine = self.table.bind visitorcallable = get_engine_visitor(engine, 'columngenerator') engine._run_visitor(visitorcallable, self, *args, **kwargs) - return self def drop(self, table=None, *args, **kwargs): @@ -380,12 +369,32 @@ class ChangesetColumn(object): ``ALTER TABLE DROP COLUMN``, for most databases. """ - table = _normalize_table(self, table) - engine = table.bind + if table is not None: + self.table = table + self.remove_from_table(self.table) + engine = self.table.bind visitorcallable = get_engine_visitor(engine, 'columndropper') engine._run_visitor(visitorcallable, self, *args, **kwargs) return self + def add_to_table(self, table): + if table and not self.table: + self._set_parent(table) + + def remove_from_table(self, table): + # TODO: remove indexes, primary keys, constraints, etc + if table.c.contains_column(self): + table.c.remove(self) + + def _check_sanity_constraints(self, name): + obj = getattr(self, name) + if (getattr(self, name[:-5]) and not obj): + raise InvalidConstraintError("Column.create() accepts index_name," + " primary_key_name and unique_name to generate constraints") + if not isinstance(obj, basestring) and obj is not None: + raise InvalidConstraintError( + "%s argument for column must be constraint name" % name) + class ChangesetIndex(object): """Changeset extensions to SQLAlchemy Indexes.""" diff --git a/migrate/versioning/script/py.py b/migrate/versioning/script/py.py index 1b0f9a4..c08d438 100644 --- a/migrate/versioning/script/py.py +++ b/migrate/versioning/script/py.py @@ -140,7 +140,8 @@ class PythonScript(base.BaseScript): try: func(engine) except TypeError: - print "upgrade/downgrade functions must accept one parameter (migrate_engine)" + print "upgrade/downgrade functions must accept engine parameter (since ver 0.5.5)" + raise @property def module(self): diff --git a/setup.cfg b/setup.cfg index dd70583..0445ba9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,6 +9,7 @@ tag_build = .dev [nosetests] pdb = true pdb-failures = true +stop = true [aliases] release = egg_info -RDb '' diff --git a/test/changeset/test_changeset.py b/test/changeset/test_changeset.py index 416a008..b0b6c96 100644 --- a/test/changeset/test_changeset.py +++ b/test/changeset/test_changeset.py @@ -1,32 +1,29 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- - import sqlalchemy from sqlalchemy import * -from sqlalchemy.databases import information_schema -import migrate from migrate import changeset from migrate.changeset import * from migrate.changeset.schema import _ColumnDelta - from test import fixture # TODO: test quoting +# TODO: test all other constraints on create column, test defaults class TestAddDropColumn(fixture.DB): + """Test add/drop column through all possible interfaces + also test for constraints""" level = fixture.DB.CONNECT - meta = MetaData() - # We'll be adding the 'data' column table_name = 'tmp_adddropcol' table_int = 0 def _setup(self, url): super(TestAddDropColumn, self)._setup(url) - self.meta.clear() + self.meta = MetaData() self.table = Table(self.table_name, self.meta, - Column('id', Integer, primary_key=True), + Column('id', Integer, unique=True), ) self.meta.bind = self.engine if self.engine.has_table(self.table.name): @@ -35,72 +32,57 @@ class TestAddDropColumn(fixture.DB): def _teardown(self): if self.engine.has_table(self.table.name): - try: - self.table.drop() - except: - pass + self.table.drop() self.meta.clear() super(TestAddDropColumn,self)._teardown() - def run_(self,create_column_func,drop_column_func,*col_p,**col_k): + def run_(self, create_column_func, drop_column_func, *col_p, **col_k): col_name = 'data' - def _assert_numcols(expected,type_): + def assert_numcols(num_of_expected_cols): + # number of cols should be correct in table object and in database + self.refresh_table(self.table_name) result = len(self.table.c) - self.assertEquals(result,expected, - "# %s cols incorrect: %s != %s"%(type_,result,expected)) - if not col_k.get('primary_key',None): - return - # new primary key: check its length too - result = len(self.table.primary_key) - self.assertEquals(result,expected, - "# %s pks incorrect: %s != %s"%(type_,result,expected)) - def assert_numcols(expected): - # number of cols should be correct in table object and in database - # Changed: create/drop shouldn't mess with the objects - #_assert_numcols(expected,'object') - # Detect # database cols via autoload - #self.meta.clear() - del self.meta.tables[self.table_name] - self.table=Table(self.table_name,self.meta,autoload=True) - _assert_numcols(expected,'database') + self.assertEquals(result, num_of_expected_cols), + if col_k.get('primary_key', None): + # new primary key: check its length too + result = len(self.table.primary_key) + self.assertEquals(result, num_of_expected_cols) assert_numcols(1) if len(col_p) == 0: col_p = [String(40)] - col = Column(col_name,*col_p,**col_k) + col = Column(col_name, *col_p, **col_k) create_column_func(col) - #create_column(col,self.table) assert_numcols(2) - self.assertEquals(getattr(self.table.c,col_name),col) - #drop_column(col,self.table) - col = getattr(self.table.c,col_name) - drop_column_func(col) + col2 = getattr(self.table.c, col_name) + self.assertEquals(col2, col) + drop_column_func(col2) assert_numcols(1) @fixture.usedb() def test_undefined(self): """Add/drop columns not yet defined in the table""" def add_func(col): - return create_column(col,self.table) + return create_column(col, self.table) def drop_func(col): - return drop_column(col,self.table) - return self.run_(add_func,drop_func) + return drop_column(col, self.table) + return self.run_(add_func, drop_func) @fixture.usedb() def test_defined(self): """Add/drop columns already defined in the table""" def add_func(col): self.meta.clear() - self.table = Table(self.table_name,self.meta, - Column('id',Integer,primary_key=True), + self.table = Table(self.table_name, self.meta, + Column('id', Integer, primary_key=True), col, ) - return create_column(col,self.table) + return create_column(col) def drop_func(col): - return drop_column(col,self.table) - return self.run_(add_func,drop_func) + return drop_column(col) + return self.run_(add_func, drop_func) @fixture.usedb() def test_method_bound(self): @@ -108,14 +90,14 @@ class TestAddDropColumn(fixture.DB): ie. no table parameter passed to function """ def add_func(col): - self.assert_(col.table is None,col.table) + self.assert_(col.table is None, col.table) self.table.append_column(col) return col.create() def drop_func(col): #self.assert_(col.table is None,col.table) #self.table.append_column(col) return col.drop() - return self.run_(add_func,drop_func) + return self.run_(add_func, drop_func) @fixture.usedb() def test_method_notbound(self): @@ -124,7 +106,7 @@ class TestAddDropColumn(fixture.DB): return col.create(self.table) def drop_func(col): return col.drop(self.table) - return self.run_(add_func,drop_func) + return self.run_(add_func, drop_func) @fixture.usedb() def test_tablemethod_obj(self): @@ -133,7 +115,7 @@ class TestAddDropColumn(fixture.DB): return self.table.create_column(col) def drop_func(col): return self.table.drop_column(col) - return self.run_(add_func,drop_func) + return self.run_(add_func, drop_func) @fixture.usedb() def test_tablemethod_name(self): @@ -145,69 +127,181 @@ class TestAddDropColumn(fixture.DB): def drop_func(col): # Not necessarily bound to table return self.table.drop_column(col.name) - return self.run_(add_func,drop_func) + return self.run_(add_func, drop_func) @fixture.usedb() def test_byname(self): """Add/drop columns via functions; by table object and column name""" def add_func(col): self.table.append_column(col) - return create_column(col.name,self.table) + return create_column(col.name, self.table) def drop_func(col): - return drop_column(col.name,self.table) - return self.run_(add_func,drop_func) + return drop_column(col.name, self.table) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_drop_column_not_in_table(self): + """Drop column by name""" + def add_func(col): + return self.table.create_column(col) + def drop_func(col): + self.table.c.remove(col) + return self.table.drop_column(col.name) + self.run_(add_func, drop_func) @fixture.usedb() def test_fk(self): """Can create columns with foreign keys""" - reftable = Table('tmp_ref',self.meta, - Column('id',Integer,primary_key=True), - ) # create FK's target + reftable = Table('tmp_ref', self.meta, + Column('id', Integer, primary_key=True), + ) if self.engine.has_table(reftable.name): reftable.drop() reftable.create() - def add_func(col): - self.table.append_column(col) - return create_column(col.name, self.table) - def drop_func(col): - ret = drop_column(col.name,self.table) - if self.engine.has_table(reftable.name): - reftable.drop() - return ret + + # create column with fk + col = Column('data', Integer, ForeignKey(reftable.c.id)) if self.url.startswith('sqlite'): self.assertRaises(changeset.exceptions.NotSupportedError, - self.run_, add_func, drop_func, Integer, - ForeignKey(reftable.c.id)) + col.create, self.table) else: - return self.run_(add_func, drop_func, Integer, - ForeignKey(reftable.c.id)) + col.create(self.table) + + # check if constraint is added + for cons in self.table.constraints: + if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint): + break + else: + self.fail('No constraint found') + + # TODO: test on db level if constraints work + + self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name) + col.drop(self.table) + + if self.engine.has_table(reftable.name): + reftable.drop() + + @fixture.usedb(not_supported='sqlite') + def test_pk(self): + """Can create columns with primary key""" + col = Column('data', Integer) + self.assertRaises(changeset.exceptions.InvalidConstraintError, + col.create, self.table, primary_key_name=True) + col.create(self.table, primary_key_name='data_pkey') + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 4}).execute() + try: + self.table.insert(values={'data': 4}).execute() + except sqlalchemy.exc.IntegrityError: + pass + else: + self.fail() + + col.drop() + + @fixture.usedb(not_supported='mysql') + def test_check(self): + """Can create columns with check constraint""" + col = Column('data', + Integer, + sqlalchemy.schema.CheckConstraint('data > 4')) + col.create(self.table) + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 3}).execute() + except sqlalchemy.exc.IntegrityError: + pass + else: + self.fail() + + col.drop() + + @fixture.usedb(not_supported='sqlite') + def test_unique(self): + """Can create columns with unique constraint""" + self.assertRaises(changeset.exceptions.InvalidConstraintError, + Column('data', Integer, unique=True).create, self.table) + col = Column('data', Integer) + col.create(self.table, unique_name='data_unique') + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 5}).execute() + except sqlalchemy.exc.IntegrityError: + pass + else: + self.fail() + + col.drop(self.table) + + @fixture.usedb() + def test_index(self): + """Can create columns with indexes""" + self.assertRaises(changeset.exceptions.InvalidConstraintError, + Column('data', Integer).create, self.table, index_name=True) + col = Column('data', Integer) + col.create(self.table, index_name='ix_data') + + # check if index was added + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 5}).execute() + except sqlalchemy.exc.IntegrityError: + pass + else: + self.fail() + + col.drop() + + @fixture.usedb() + def test_server_defaults(self): + """Can create columns with server_default values""" + col = Column('data', String(244), server_default='foobar') + col.create(self.table) + + self.table.insert().execute() + row = self.table.select(autocommit=True).execute().fetchone() + self.assertEqual(u'foobar', row['data']) + + col.drop() + + # TODO: test sequence + # TODO: test that if column is appended on creation and removed on deletion class TestRename(fixture.DB): + """Tests for table and index rename methods""" level = fixture.DB.CONNECT meta = MetaData() def _setup(self, url): super(TestRename, self)._setup(url) - self.meta.bind = self.engine #self.meta.connect(self.engine) + self.meta.bind = self.engine @fixture.usedb() def test_rename_table(self): """Tables can be renamed""" c_name = 'col_1' - name1 = 'name_one' - name2 = 'name_two' - xname1 = 'x' + name1 - xname2 = 'x' + name2 - self.column = Column(c_name, Integer) + table_name1 = 'name_one' + table_name2 = 'name_two' + index_name1 = 'x' + table_name1 + index_name2 = 'x' + table_name2 + self.meta.clear() - self.table = Table(name1, self.meta, self.column) - self.index = Index(xname1, self.column, unique=False) + self.column = Column(c_name, Integer) + self.table = Table(table_name1, self.meta, self.column) + self.index = Index(index_name1, self.column, unique=False) + if self.engine.has_table(self.table.name): self.table.drop() - if self.engine.has_table(name2): - tmp = Table(name2, self.meta, autoload=True) + if self.engine.has_table(table_name2): + tmp = Table(table_name2, self.meta, autoload=True) tmp.drop() tmp.deregister() del tmp @@ -228,69 +322,72 @@ class TestRename(fixture.DB): # we know the object's name isn't consistent: just assign it newname = expected # Table DB check - #table = self.refresh_table(self.table,newname) self.meta.clear() self.table = Table(newname, self.meta, autoload=True) - self.assertEquals(self.table.name,expected) - def assert_index_name(expected,skip_object_check=False): + self.assertEquals(self.table.name, expected) + + def assert_index_name(expected, skip_object_check=False): if not skip_object_check: # Index object check - self.assertEquals(self.index.name,expected) + self.assertEquals(self.index.name, expected) else: # object is inconsistent self.index.name = expected - # Index DB check - #TODO - + # TODO: Index DB check + try: # Table renames - assert_table_name(name1) - rename_table(self.table, name2) - assert_table_name(name2) - self.table.rename(name1) - assert_table_name(name1) - # ..by just the string - rename_table(name1, name2, engine=self.engine) - assert_table_name(name2, True) # object not updated + assert_table_name(table_name1) + rename_table(self.table, table_name2) + assert_table_name(table_name2) + self.table.rename(table_name1) + assert_table_name(table_name1) + + # test by just the string + rename_table(table_name1, table_name2, engine=self.engine) + assert_table_name(table_name2, True) # object not updated # Index renames if self.url.startswith('sqlite') or self.url.startswith('mysql'): self.assertRaises(changeset.exceptions.NotSupportedError, - self.index.rename, xname2) + self.index.rename, index_name2) else: - assert_index_name(xname1) - rename_index(self.index,xname2,engine=self.engine) - assert_index_name(xname2) - self.index.rename(xname1) - assert_index_name(xname1) - # ..by just the string - rename_index(xname1,xname2,engine=self.engine) - assert_index_name(xname2,True) + assert_index_name(index_name1) + rename_index(self.index, index_name2, engine=self.engine) + assert_index_name(index_name2) + self.index.rename(index_name1) + assert_index_name(index_name1) + + # test by just the string + rename_index(index_name1, index_name2, engine=self.engine) + assert_index_name(index_name2, True) finally: - #self.index.drop() if self.table.exists(): self.table.drop() + class TestColumnChange(fixture.DB): - level=fixture.DB.CONNECT + level = fixture.DB.CONNECT table_name = 'tmp_colchange' def _setup(self, url): super(TestColumnChange, self)._setup(url) self.meta = MetaData(self.engine) - self.table = Table(self.table_name,self.meta, - Column('id',Integer,primary_key=True), - Column('data',String(40),server_default=DefaultClause("tluafed"),nullable=True), + self.table = Table(self.table_name, self.meta, + Column('id', Integer, primary_key=True), + Column('data', String(40), server_default=DefaultClause("tluafed"), + nullable=True), ) if self.table.exists(): self.table.drop() try: self.table.create() - except sqlalchemy.exceptions.SQLError,e: + except sqlalchemy.exceptions.SQLError, e: # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise + def _teardown(self): if self.table.exists(): try: @@ -299,90 +396,91 @@ class TestColumnChange(fixture.DB): # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise - #self.engine.echo=False super(TestColumnChange, self)._teardown() @fixture.usedb() def test_rename(self): """Can rename a column""" - def num_rows(col,content): - return len(list(self.table.select(col==content).execute())) + def num_rows(col, content): + return len(list(self.table.select(col == content).execute())) # Table content should be preserved in changed columns content = "fgsfds" - self.engine.execute(self.table.insert(),data=content,id=42) - self.assertEquals(num_rows(self.table.c.data,content),1) + self.engine.execute(self.table.insert(), data=content, id=42) + self.assertEquals(num_rows(self.table.c.data, content), 1) # ...as a function, given a column object and the new name - alter_column(self.table.c.data, name='atad') + alter_column('data', name='data2', table=self.table) + self.refresh_table() + alter_column(self.table.c.data2, name='atad') self.refresh_table(self.table.name) self.assert_('data' not in self.table.c.keys()) self.assert_('atad' in self.table.c.keys()) #self.assertRaises(AttributeError,getattr,self.table.c,'data') self.table.c.atad # Should not raise exception - self.assertEquals(num_rows(self.table.c.atad,content),1) + self.assertEquals(num_rows(self.table.c.atad, content), 1) # ...as a method, given a new name self.table.c.atad.alter(name='data') self.refresh_table(self.table.name) self.assert_('atad' not in self.table.c.keys()) self.table.c.data # Should not raise exception - self.assertEquals(num_rows(self.table.c.data,content),1) + self.assertEquals(num_rows(self.table.c.data, content), 1) # ...as a function, given a new object - col = Column('atad',String(40),server_default=self.table.c.data.server_default) + col = Column('atad', String(40), server_default=self.table.c.data.server_default) alter_column(self.table.c.data, col) self.refresh_table(self.table.name) self.assert_('data' not in self.table.c.keys()) self.table.c.atad # Should not raise exception - self.assertEquals(num_rows(self.table.c.atad,content),1) + self.assertEquals(num_rows(self.table.c.atad, content), 1) # ...as a method, given a new object - col = Column('data',String(40),server_default=self.table.c.atad.server_default) + col = Column('data', String(40), server_default=self.table.c.atad.server_default) self.table.c.atad.alter(col) self.refresh_table(self.table.name) self.assert_('atad' not in self.table.c.keys()) self.table.c.data # Should not raise exception - self.assertEquals(num_rows(self.table.c.data,content),1) + self.assertEquals(num_rows(self.table.c.data,content), 1) - @fixture.usedb() - def xtest_fk(self): - """Can add/drop foreign key constraints to/from a column - Not supported - """ - self.assert_(self.table.c.data.foreign_key is None) + #@fixture.usedb() + #def test_fk(self): + # """Can add/drop foreign key constraints to/from a column + # Not supported + # """ + # self.assert_(self.table.c.data.foreign_key is None) - # add - self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id)) - self.refresh_table(self.table.name) - self.assert_(self.table.c.data.foreign_key is not None) + # # add + # self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id)) + # self.refresh_table(self.table.name) + # self.assert_(self.table.c.data.foreign_key is not None) - # drop - self.table.c.data.alter(foreign_key=None) - self.refresh_table(self.table.name) - self.assert_(self.table.c.data.foreign_key is None) + # # drop + # self.table.c.data.alter(foreign_key=None) + # self.refresh_table(self.table.name) + # self.assert_(self.table.c.data.foreign_key is None) @fixture.usedb() def test_type(self): """Can change a column's type""" # Entire column definition given - self.table.c.data.alter(Column('data',String(42))) + self.table.c.data.alter(Column('data', String(42))) self.refresh_table(self.table.name) - self.assert_(isinstance(self.table.c.data.type,String)) - self.assertEquals(self.table.c.data.type.length,42) + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEquals(self.table.c.data.type.length, 42) # Just the new type self.table.c.data.alter(type=String(21)) self.refresh_table(self.table.name) - self.assert_(isinstance(self.table.c.data.type,String)) - self.assertEquals(self.table.c.data.type.length,21) + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEquals(self.table.c.data.type.length, 21) # Different type - self.assert_(isinstance(self.table.c.id.type,Integer)) - self.assertEquals(self.table.c.id.nullable,False) + self.assert_(isinstance(self.table.c.id.type, Integer)) + self.assertEquals(self.table.c.id.nullable, False) self.table.c.id.alter(type=String(20)) - self.assertEquals(self.table.c.id.nullable,False) + self.assertEquals(self.table.c.id.nullable, False) self.refresh_table(self.table.name) - self.assert_(isinstance(self.table.c.id.type,String)) + self.assert_(isinstance(self.table.c.id.type, String)) @fixture.usedb(not_supported='mysql') def test_default(self): @@ -391,7 +489,7 @@ class TestColumnChange(fixture.DB): application / by SA """ #self.engine.echo=True - self.assertEquals(self.table.c.data.server_default.arg,'tluafed') + self.assertEquals(self.table.c.data.server_default.arg, 'tluafed') # Just the new default default = 'my_default' @@ -403,7 +501,7 @@ class TestColumnChange(fixture.DB): # Column object default = 'your_default' - self.table.c.data.alter(Column('data',String(40),server_default=DefaultClause(default))) + self.table.c.data.alter(Column('data', String(40), server_default=DefaultClause(default))) self.refresh_table(self.table.name) self.assert_(default in str(self.table.c.data.server_default.arg)) @@ -412,90 +510,101 @@ class TestColumnChange(fixture.DB): self.refresh_table(self.table.name) # server_default isn't necessarily None for Oracle #self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default) - self.engine.execute(self.table.insert(),id=11) + self.engine.execute(self.table.insert(), id=11) row = self.table.select().execute().fetchone() - self.assert_(row['data'] is None,row['data']) + self.assert_(row['data'] is None, row['data']) @fixture.usedb() def test_null(self): """Can change a column's null constraint""" - self.assertEquals(self.table.c.data.nullable,True) + self.assertEquals(self.table.c.data.nullable, True) # Column object - self.table.c.data.alter(Column('data',String(40),nullable=False)) + self.table.c.data.alter(Column('data', String(40), nullable=False)) self.table.nullable=None self.refresh_table(self.table.name) - self.assertEquals(self.table.c.data.nullable,False) + self.assertEquals(self.table.c.data.nullable, False) # Just the new status self.table.c.data.alter(nullable=True) self.refresh_table(self.table.name) - self.assertEquals(self.table.c.data.nullable,True) + self.assertEquals(self.table.c.data.nullable, True) - @fixture.usedb() - def xtest_pk(self): - """Can add/drop a column to/from its table's primary key - Not supported - """ - self.engine.echo = True - self.assertEquals(len(self.table.primary_key),1) + #@fixture.usedb() + #def test_pk(self): + # """Can add/drop a column to/from its table's primary key + # Not supported + # """ + # self.engine.echo = True + # self.assertEquals(len(self.table.primary_key), 1) - # Entire column definition - self.table.c.data.alter(Column('data',String,primary_key=True)) - self.refresh_table(self.table.name) - self.assertEquals(len(self.table.primary_key),2) + # # Entire column definition + # self.table.c.data.alter(Column('data', String, primary_key=True)) + # self.refresh_table(self.table.name) + # self.assertEquals(len(self.table.primary_key), 2) + + # # Just the new status + # self.table.c.data.alter(primary_key=False) + # self.refresh_table(self.table.name) + # self.assertEquals(len(self.table.primary_key), 1) - # Just the new status - self.table.c.data.alter(primary_key=False) - self.refresh_table(self.table.name) - self.assertEquals(len(self.table.primary_key),1) class TestColumnDelta(fixture.Base): def test_deltas(self): - def mkcol(name='id',type=String,*p,**k): - return Column(name,type,*p,**k) - col_orig = mkcol(primary_key=True) + def mkcol(name='id', type=String, *p, **k): + return Column(name, type, *p, **k) - def verify(expected,original,*p,**k): - delta = _ColumnDelta(original,*p,**k) + def verify(expected, original, *p, **k): + delta = _ColumnDelta(original, *p, **k) result = delta.keys() result.sort() - self.assertEquals(expected,result) + self.assertEquals(expected, result) return delta - verify([],col_orig) - verify(['name'],col_orig,'ids') + col_orig = mkcol(primary_key=True) + + verify([], col_orig) + verify(['name'], col_orig, 'ids') # Parameters are always executed, even if they're 'unchanged' # (We can't assume given column is up-to-date) - verify(['name','primary_key','type'],col_orig,'id',Integer,primary_key=True) - verify(['name','primary_key','type'],col_orig,name='id',type=Integer,primary_key=True) + verify(['name', 'primary_key', 'type'], + col_orig, 'id', Integer, primary_key=True) + verify(['name', 'primary_key', 'type'], + col_orig, name='id', type=Integer, primary_key=True) # Can compare two columns and find differences - col_new = mkcol(name='ids',primary_key=True) - verify([],col_orig,col_orig) - verify(['name'],col_orig,col_orig,'ids') - verify(['name'],col_orig,col_orig,name='ids') - verify(['name'],col_orig,col_new) - verify(['name','type'],col_orig,col_new,type=String) + col_new = mkcol(name='ids', primary_key=True) + verify([], col_orig, col_orig) + verify(['name'], 'ids', table=Table('test', MetaData()), name='hey') + verify(['name'], col_orig, col_orig, 'ids') + verify(['name'], col_orig, col_orig, name='ids') + verify(['name'], col_orig, col_new) + verify(['name','type'], col_orig, col_new, type=String) + # Change name, given an up-to-date definition and the current name - delta = verify(['name'],col_new,current_name='id') - self.assertEquals(delta.get('name'),'ids') + delta = verify(['name'], col_new, current_name='id') + self.assertEquals(delta.get('name'), 'ids') + # Change other params at the same time - verify(['name','type'],col_new,current_name='id',type=String) + verify(['name', 'type'], col_new, current_name='id', type=String) + # Type comparisons - verify([],mkcol(type=String),mkcol(type=String)) - verify(['type'],mkcol(type=String),mkcol(type=Integer)) - verify(['type'],mkcol(type=String),mkcol(type=String(42))) - verify([],mkcol(type=String(42)),mkcol(type=String(42))) - verify(['type'],mkcol(type=String(24)),mkcol(type=String(42))) + verify([], mkcol(type=String), mkcol(type=String)) + verify(['type'], mkcol(type=String), mkcol(type=Integer)) + verify(['type'], mkcol(type=String), mkcol(type=String(42))) + verify([], mkcol(type=String(42)), mkcol(type=String(42))) + verify(['type'], mkcol(type=String(24)), mkcol(type=String(42))) + # Other comparisons - verify(['primary_key'],mkcol(nullable=False),mkcol(primary_key=True)) + verify(['primary_key'], mkcol(nullable=False), mkcol(primary_key=True)) + # PK implies nullable=False - verify(['nullable','primary_key'],mkcol(nullable=True),mkcol(primary_key=True)) - verify([],mkcol(primary_key=True),mkcol(primary_key=True)) - verify(['nullable'],mkcol(nullable=True),mkcol(nullable=False)) - verify([],mkcol(nullable=True),mkcol(nullable=True)) - verify(['default'],mkcol(default=None),mkcol(default='42')) - verify([],mkcol(default=None),mkcol(default=None)) - verify([],mkcol(default='42'),mkcol(default='42')) + verify(['nullable', 'primary_key'], + mkcol(nullable=True), mkcol(primary_key=True)) + verify([], mkcol(primary_key=True), mkcol(primary_key=True)) + verify(['nullable'], mkcol(nullable=True), mkcol(nullable=False)) + verify([], mkcol(nullable=True), mkcol(nullable=True)) + verify(['default'], mkcol(default=None), mkcol(default='42')) + verify([], mkcol(default=None), mkcol(default=None)) + verify([], mkcol(default='42'), mkcol(default='42')) diff --git a/test/changeset/test_constraint.py b/test/changeset/test_constraint.py index 7e0d04d..ee77aa9 100644 --- a/test/changeset/test_constraint.py +++ b/test/changeset/test_constraint.py @@ -78,7 +78,11 @@ class TestConstraint(CommonTestConstraint): # Add a FK by creating a FK constraint self.assertEquals(self.table.c.fkey.foreign_keys._list, []) - fk = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id], name="fk_id_fkey") + fk = ForeignKeyConstraint([self.table.c.fkey], + [self.table.c.id], + name="fk_id_fkey", + onupdate="CASCADE", + ondelete="CASCADE") self.assert_(self.table.c.fkey.foreign_keys._list is not []) self.assertEquals(list(fk.columns), [self.table.c.fkey]) self.assertEquals([e.column for e in fk.elements], [self.table.c.id]) @@ -89,6 +93,13 @@ class TestConstraint(CommonTestConstraint): index = Index('index_name', self.table.c.fkey) index.create() fk.create() + + # test for ondelete/onupdate + fkey = self.table.c.fkey.foreign_keys._list[0] + self.assertEquals(fkey.onupdate, "CASCADE") + self.assertEquals(fkey.ondelete, "CASCADE") + # TODO: test on real db if it was set + self.refresh_table() self.assert_(self.table.c.fkey.foreign_keys._list is not []) @@ -109,12 +120,51 @@ class TestConstraint(CommonTestConstraint): @fixture.usedb() def test_drop_cascade(self): + """Drop constraint cascaded""" pk = PrimaryKeyConstraint('id', table=self.table, name="id_pkey") pk.create() self.refresh_table() # Drop the PK constraint forcing cascade pk.drop(cascade=True) + # TODO: add real assertion if it was added + + @fixture.usedb(supported=['mysql']) + def test_fail_mysql_check_constraints(self): + """Check constraints raise NotSupported for mysql on drop""" + cons = CheckConstraint('id > 3', name="id_check", table=self.table) + cons.create() + self.refresh_table() + + try: + cons.drop() + except NotSupportedError: + pass + else: + self.fail() + + @fixture.usedb(not_supported=['sqlite', 'mysql']) + def test_named_check_constraints(self): + """Check constraints can be defined, created, and dropped""" + self.assertRaises(InvalidConstraintError, + CheckConstraint, 'id > 3') + cons = CheckConstraint('id > 3', name="id_check", table=self.table) + cons.create() + self.refresh_table() + + self.table.insert(values={'id': 4}).execute() + try: + self.table.insert(values={'id': 1}).execute() + except IntegrityError: + pass + else: + self.fail() + + # Remove the name, drop the constraint; it should succeed + cons.drop() + self.refresh_table() + self.table.insert(values={'id': 2}).execute() + self.table.insert(values={'id': 1}).execute() class TestAutoname(CommonTestConstraint): @@ -154,10 +204,6 @@ class TestAutoname(CommonTestConstraint): def test_autoname_fk(self): """ForeignKeyConstraints can guess their name if None is given""" cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id]) - if self.url.startswith('mysql'): - # MySQL FKs need an index - index = Index('index_name', self.table.c.fkey) - index.create() cons.create() self.refresh_table() self.table.c.fkey.foreign_keys[0].column is self.table.c.id @@ -170,10 +216,6 @@ class TestAutoname(CommonTestConstraint): # test string names cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table) - if self.url.startswith('mysql'): - # MySQL FKs need an index - index = Index('index_name', self.table.c.fkey) - index.create() cons.create() self.refresh_table() self.table.c.fkey.foreign_keys[0].column is self.table.c.id @@ -182,7 +224,7 @@ class TestAutoname(CommonTestConstraint): cons.name = None cons.drop() - @fixture.usedb(not_supported=['oracle', 'sqlite']) + @fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql']) def test_autoname_check(self): """CheckConstraints can guess their name if None is given""" cons = CheckConstraint('id > 3', columns=[self.table.c.id]) @@ -190,20 +232,21 @@ class TestAutoname(CommonTestConstraint): self.refresh_table() - self.table.insert(values={'id': 4}).execute() - try: - self.table.insert(values={'id': 1}).execute() - except IntegrityError: - pass - else: - self.fail() + if not self.engine.name == 'mysql': + self.table.insert(values={'id': 4}).execute() + try: + self.table.insert(values={'id': 1}).execute() + except IntegrityError: + pass + else: + self.fail() # Remove the name, drop the constraint; it should succeed cons.name = None cons.drop() self.refresh_table() self.table.insert(values={'id': 2}).execute() - self.table.insert(values={'id': 5}).execute() + self.table.insert(values={'id': 1}).execute() @fixture.usedb(not_supported=['oracle', 'sqlite']) def test_autoname_unique(self): diff --git a/test/fixture/database.py b/test/fixture/database.py index b948278..c6360eb 100644 --- a/test/fixture/database.py +++ b/test/fixture/database.py @@ -130,7 +130,7 @@ class DB(Base): def _not_supported(self, url): return not self._supported(url) - def refresh_table(self,name=None): + def refresh_table(self, name=None): """Reload the table from the database Assumes we're working with only a single table, self.table, and metadata self.meta diff --git a/test/versioning/test_schema.py b/test/versioning/test_schema.py index 58e1632..9f84217 100644 --- a/test/versioning/test_schema.py +++ b/test/versioning/test_schema.py @@ -196,7 +196,7 @@ class TestControlledSchema(fixture.Pathed, fixture.DB): def construct_model(self): meta = MetaData() - user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String)) + user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String(245))) return meta