From 9f7ab96881415ec6bd7fc7729f5196d75d01b8ab Mon Sep 17 00:00:00 2001
From: iElectric <unknown>
Date: Sat, 27 Jun 2009 14:13:27 +0000
Subject: [PATCH] - completely refactored ColumnDelta to extract differences
 between columns/parameters (also fixes issue #23) - fixed some bugs (passing
 server_default) on column.alter - updated tests, specially ColumnDelta and
 column.alter - introduced alter_metadata which can preserve altering existing
 objects if False (defaults to True) - updated documentation

---
 TODO                                    |   9 -
 docs/changelog.rst                      |   4 +
 docs/index.rst                          |   4 +-
 migrate/changeset/__init__.py           |   2 +
 migrate/changeset/ansisql.py            |  50 +--
 migrate/changeset/databases/firebird.py |  10 +-
 migrate/changeset/databases/mysql.py    |  14 +-
 migrate/changeset/databases/oracle.py   |  19 +-
 migrate/changeset/databases/sqlite.py   |  28 +-
 migrate/changeset/schema.py             | 408 ++++++++++++++++--------
 test/changeset/test_changeset.py        | 304 +++++++++++++-----
 11 files changed, 544 insertions(+), 308 deletions(-)

diff --git a/TODO b/TODO
index 73c3f83..0c9f726 100644
--- a/TODO
+++ b/TODO
@@ -1,7 +1,3 @@
-- better MySQL support
-- fix unit tests for other databases than PostgreSQL (MySQL and SQLite
-  fail at test_changeset.test_fk(..))
-
 - better SQL scripts support (testing, source viewing)
 
 make_update_script_for_model:
@@ -9,9 +5,4 @@ make_update_script_for_model:
 - columns are not compared?
 - even if two "models" are equal, it doesn't yield so
 
-
-- refactor test_shell to test_api and use TestScript for cmd line testing
 - controlledschema.drop() drops whole migrate table, maybe there are some other repositories bound to it!
-
-- document sqlite hacks (unique index for pk constraint)
-- document constraints usage, document all ways then can be used, document cascade,table,columns options
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 25853b8..4fe9c82 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,7 +1,10 @@
 0.5.5
 -----
 
+- alter column constructs now accept `alter_metadata` parameter. If True, it will modify Column/Table objects according to changes. Otherwise, everything will be untouched.
+- complete refactoring of :class:`~migrate.changeset.schema.ColumnDelta` (fixes issue 23)
 - added support for :ref:`firebird <firebird-d>`
+- fixed bug when column.alter(server_default='string') was not properly set
 - server_defaults passed to column.create are now issued correctly
 - constraints passed to column.create are correctly interpreted (ALTER TABLE ADD CONSTRAINT is issued after ADD COLUMN)
 - column.create accepts `primary_key_name`, `unique_name` and `index_name` as string value which is used as contraint name when adding a column
@@ -18,6 +21,7 @@
 **Backward incompatible changes**:
 
 - python upgrade/downgrade scripts do not import migrate_engine magically, but recieve engine as the only parameter to function
+- alter column does not accept `current_name` anymore, it extracts name from the old column.
 
 0.5.4
 -----
diff --git a/docs/index.rst b/docs/index.rst
index 1cbf67d..e3c471d 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -59,8 +59,8 @@ Dialect support
 | :ref:`ALTER TABLE DROP COLUMN <column-drop>`            | yes                      | yes                          | yes                    | yes                       | yes                           |       |
 |                                                         | (workaround) [#1]_       |                              |                        |                           |                               |       |
 +---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+-------------------------------+-------+
-| :ref:`ALTER TABLE ALTER COLUMN <column-alter>`          | no                       | yes                          | yes                    | yes                       | yes [#4]_                     |       |
-|                                                         |                          |                              |                        | (with limitations) [#3]_  |                               |       |
+| :ref:`ALTER TABLE ALTER COLUMN <column-alter>`          | yes                      | yes                          | yes                    | yes                       | yes [#4]_                     |       |
+|                                                         | (workaround) [#1]_       |                              |                        | (with limitations) [#3]_  |                               |       |
 +---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+-------------------------------+-------+
 | :ref:`ALTER TABLE ADD CONSTRAINT <constraint-tutorial>` | no                       | yes                          | yes                    | yes                       | yes                           |       |
 |                                                         |                          |                              |                        |                           |                               |       |
diff --git a/migrate/changeset/__init__.py b/migrate/changeset/__init__.py
index d93282f..940c23f 100644
--- a/migrate/changeset/__init__.py
+++ b/migrate/changeset/__init__.py
@@ -12,3 +12,5 @@ from migrate.changeset.constraint import *
 sqlalchemy.schema.Table.__bases__ += (ChangesetTable, )
 sqlalchemy.schema.Column.__bases__ += (ChangesetColumn, )
 sqlalchemy.schema.Index.__bases__ += (ChangesetIndex, )
+
+sqlalchemy.schema.DefaultClause.__bases__ += (ChangesetDefaultClause, )
diff --git a/migrate/changeset/ansisql.py b/migrate/changeset/ansisql.py
index 7fb5e13..f0dfed5 100644
--- a/migrate/changeset/ansisql.py
+++ b/migrate/changeset/ansisql.py
@@ -45,30 +45,6 @@ class AlterTableVisitor(SchemaIterator):
         self.append('\nALTER TABLE %s ' % self.preparer.format_table(table))
         return table
 
-    # DEPRECATED: use plain constraints instead
-    #def _pk_constraint(self, table, column, status):
-    #    """Create a primary key constraint from a table, column.
-
-    #    Status: true if the constraint is being added; false if being dropped
-    #    """
-    #    if isinstance(column, basestring):
-    #        column = getattr(table.c, name)
-
-    #    ret = constraint.PrimaryKeyConstraint(*table.primary_key)
-    #    if status:
-    #        # Created PK
-    #        ret.c.append(column)
-    #    else:
-    #        # Dropped PK
-    #        names = [c.name for c in cons.c]
-    #        index = names.index(col.name)
-    #        del ret.c[index]
-
-    #    # Allow explicit PK name assignment
-    #    if isinstance(pk, basestring):
-    #        ret.name = pk
-    #    return ret
-
 
 class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
     """Extends ansisql generator for column creation (alter table add col)"""
@@ -160,10 +136,9 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
                                                            True), index.quote)))
         self.execute()
 
-    def visit_column(self, column):
+    def visit_column(self, delta):
         """Rename/change a column."""
         # ALTER COLUMN is implemented as several ALTER statements
-        delta = column.delta
         keys = delta.keys()
         if 'type' in keys:
             self._run_subvisit(delta, self._visit_column_type)
@@ -182,44 +157,37 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
         col_name = delta.current_name
         if start_alter:
             self.start_alter_column(table, col_name)
-        ret = func(table, col_name, delta)
+        ret = func(table, delta.result_column, delta)
         self.execute()
 
     def start_alter_column(self, table, col_name):
         """Starts ALTER COLUMN"""
         self.start_alter_table(table)
-        # TODO: use preparer.format_column
         self.append("ALTER COLUMN %s " % self.preparer.quote(col_name, table.quote))
 
-    def _visit_column_nullable(self, table, col_name, delta):
+    def _visit_column_nullable(self, table, column, delta):
         nullable = delta['nullable']
         if nullable:
             self.append("DROP NOT NULL")
         else:
             self.append("SET NOT NULL")
 
-    def _visit_column_default(self, table, col_name, delta):
-        server_default = delta['server_default']
-        # Dummy column: get_col_default_string needs a column for some
-        # reason
-        dummy = sa.Column(None, None, server_default=server_default)
-        default_text = self.get_column_default_string(dummy)
+    def _visit_column_default(self, table, column, delta):
+        default_text = self.get_column_default_string(column)
         if default_text is not None:
             self.append("SET DEFAULT %s" % default_text)
         else:
             self.append("DROP DEFAULT")
 
-    def _visit_column_type(self, table, col_name, delta):
+    def _visit_column_type(self, table, column, delta):
         type_ = delta['type']
-        if not isinstance(type_, sa.types.AbstractType):
-            # It's the class itself, not an instance... make an instance
-            type_ = type_()
         type_text = type_.dialect_impl(self.dialect).get_col_spec()
         self.append("TYPE %s" % type_text)
 
-    def _visit_column_name(self, table, col_name, delta):
-        new_name = delta['name']
+    def _visit_column_name(self, table, column, delta):
         self.start_alter_table(table)
+        col_name = self.preparer.quote(delta.current_name, table.quote)
+        new_name = self.preparer.format_column(delta.result_column)
         self.append('RENAME COLUMN %s TO %s' % (col_name, new_name))
 
 
diff --git a/migrate/changeset/databases/firebird.py b/migrate/changeset/databases/firebird.py
index 215762c..d60cf00 100644
--- a/migrate/changeset/databases/firebird.py
+++ b/migrate/changeset/databases/firebird.py
@@ -30,12 +30,13 @@ class FBSchemaChanger(ansisql.ANSISchemaChanger):
         raise exceptions.NotSupportedError(
             "Firebird does not support renaming tables.")
 
-    def _visit_column_name(self, table, col_name, delta):
-        new_name = delta['name']
+    def _visit_column_name(self, table, column, delta):
         self.start_alter_table(table)
-        self.append('ALTER COLUMN %s TO %s' % ((col_name), (new_name)))
+        col_name = self.preparer.quote(delta.current_name, table.quote)
+        new_name = self.preparer.format_column(delta.result_column)
+        self.append('ALTER COLUMN %s TO %s' % (col_name, new_name))
 
-    def _visit_column_nullable(self, table, col_name, delta):
+    def _visit_column_nullable(self, table, column, delta):
         """Changing NULL is not supported"""
         # TODO: http://www.firebirdfaq.org/faq103/
         raise exceptions.NotSupportedError(
@@ -50,6 +51,7 @@ class FBConstraintDropper(ansisql.ANSIConstraintDropper):
     """Firebird constaint dropper implementation."""
 
     def cascade_constraint(self, constraint):
+        """Cascading constraints is not supported"""
         raise exceptions.NotSupportedError(
             "Firebird does not support cascading constraints")
 
diff --git a/migrate/changeset/databases/mysql.py b/migrate/changeset/databases/mysql.py
index ea83d2a..5b5a16e 100644
--- a/migrate/changeset/databases/mysql.py
+++ b/migrate/changeset/databases/mysql.py
@@ -20,19 +20,13 @@ class MySQLColumnDropper(ansisql.ANSIColumnDropper):
 
 class MySQLSchemaChanger(MySQLSchemaGenerator, ansisql.ANSISchemaChanger):
 
-    def visit_column(self, column):
-        delta = column.delta
-        table = column.table
-        colspec = self.get_column_specification(column)
-
-        if not hasattr(delta, 'result_column'):
-            # Mysql needs the whole column definition, not just a lone name/type
-            raise exceptions.NotSupportedError(
-                "A column object must be present in table to alter it")
+    def visit_column(self, delta):
+        table = delta.table
+        colspec = self.get_column_specification(delta.result_column)
+        old_col_name = self.preparer.quote(delta.current_name, table.quote)
 
         self.start_alter_table(table)
 
-        old_col_name = self.preparer.quote(delta.current_name, column.quote)
         self.append("CHANGE COLUMN %s " % old_col_name)
         self.append(colspec)
         self.execute()
diff --git a/migrate/changeset/databases/oracle.py b/migrate/changeset/databases/oracle.py
index 80ec81d..93c9f8f 100644
--- a/migrate/changeset/databases/oracle.py
+++ b/migrate/changeset/databases/oracle.py
@@ -32,27 +32,20 @@ class OracleSchemaChanger(OracleSchemaGenerator, ansisql.ANSISchemaChanger):
             column.nullable = orig
         return ret
 
-    def visit_column(self, column):
-        delta = column.delta
+    def visit_column(self, delta):
         keys = delta.keys()
 
-        if len(set(('type', 'nullable', 'server_default')).intersection(keys)):
-            self._run_subvisit(delta,
-                               self._visit_column_change,
-                               start_alter=False)
-        # change name as the last action to avoid conflicts
         if 'name' in keys:
             self._run_subvisit(delta,
                                self._visit_column_name,
                                start_alter=False)
 
-    def _visit_column_change(self, table, col_name, delta):
-        if not hasattr(delta, 'result_column'):
-            # Oracle needs the whole column definition, not just a lone name/type
-            raise exceptions.NotSupportedError(
-                "A column object must be present in table to alter it")
+        if len(set(('type', 'nullable', 'server_default')).intersection(keys)):
+            self._run_subvisit(delta,
+                               self._visit_column_change,
+                               start_alter=False)
 
-        column = delta.result_column
+    def _visit_column_change(self, table, column, delta):
         # Oracle cannot drop a default once created, but it can set it
         # to null.  We'll do that if default=None
         # http://forums.oracle.com/forums/message.jspa?messageID=1273234#1273234
diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py
index fa9f381..479cf45 100644
--- a/migrate/changeset/databases/sqlite.py
+++ b/migrate/changeset/databases/sqlite.py
@@ -3,6 +3,9 @@
 
    .. _`SQLite`: http://www.sqlite.org/
 """
+from UserDict import DictMixin
+from copy import copy
+
 from sqlalchemy.databases import sqlite as sa_base
 
 from migrate.changeset import ansisql, exceptions
@@ -19,18 +22,25 @@ class SQLiteCommon(object):
 
 class SQLiteHelper(SQLiteCommon):
 
-    def visit_column(self, column):
-        table = self._to_table(column.table)
+    def visit_column(self, delta):
+        if isinstance(delta, DictMixin):
+            column = delta.result_column
+            table = self._to_table(delta.table)
+        else:
+            column = delta
+            table = self._to_table(column.table)
         table_name = self.preparer.format_table(table)
 
         # we remove all constraints, indexes so it doesnt recreate them
+        ixbackup = copy(table.indexes)
+        consbackup = copy(table.constraints)
         table.indexes = set()
         table.constraints = set()
 
         self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
         self.execute()
 
-        insertion_string = self._modify_table(table, column)
+        insertion_string = self._modify_table(table, column, delta)
 
         table.create()
         self.append(insertion_string % {'table_name': table_name})
@@ -38,6 +48,10 @@ class SQLiteHelper(SQLiteCommon):
         self.append('DROP TABLE migration_tmp')
         self.execute()
 
+        # restore indexes, constraints
+        table.indexes = ixbackup
+        table.constraints = consbackup
+
 
 class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
                             ansisql.ANSIColumnGenerator):
@@ -51,7 +65,7 @@ class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
 class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
     """SQLite ColumnDropper"""
 
-    def _modify_table(self, table, column):
+    def _modify_table(self, table, column, delta):
         columns = ' ,'.join(map(self.preparer.format_column, table.columns))
         return 'INSERT INTO %(table_name)s SELECT ' + columns + \
             ' from migration_tmp'
@@ -60,11 +74,8 @@ class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
 class SQLiteSchemaChanger(SQLiteHelper, ansisql.ANSISchemaChanger):
     """SQLite SchemaChanger"""
 
-    def _modify_table(self, table, column):
-        delta = column.delta
+    def _modify_table(self, table, column, delta):
         column = table.columns[delta.current_name]
-        for k, v in delta.items():
-            setattr(column, k, v)
         return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'
 
     def visit_index(self, index):
@@ -94,6 +105,7 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, ansisql.ANSIConstraintC
         self.execute()
 
 # TODO: add not_supported tags for constraint dropper/generator
+# TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index
 
 class SQLiteDialect(ansisql.ANSIDialect):
     columngenerator = SQLiteColumnGenerator
diff --git a/migrate/changeset/schema.py b/migrate/changeset/schema.py
index dae6be0..4dde912 100644
--- a/migrate/changeset/schema.py
+++ b/migrate/changeset/schema.py
@@ -1,11 +1,12 @@
 """
    Schema module providing common schema operations.
 """
+from UserDict import DictMixin
 import sqlalchemy
 
+from migrate.changeset.exceptions import *
 from migrate.changeset.databases.visitor import (get_engine_visitor,
                                                  run_single_visitor)
-from migrate.changeset.exceptions import *
 
 
 __all__ = [
@@ -17,13 +18,17 @@ __all__ = [
     'ChangesetTable',
     'ChangesetColumn',
     'ChangesetIndex',
+    'ChangesetDefaultClause',
+    'ColumnDelta',
 ]
 
+DEFAULT_ALTER_METADATA = True
+
 
 def create_column(column, table=None, *p, **k):
     """Create a column, given the table
     
-    API to :meth:`column.create`
+    API to :meth:`ChangesetColumn.create`
     """
     if table is not None:
         return table.create_column(column, *p, **k)
@@ -33,7 +38,7 @@ def create_column(column, table=None, *p, **k):
 def drop_column(column, table=None, *p, **k):
     """Drop a column, given the table
     
-    API to :meth:`column.drop`
+    API to :meth:`ChangesetColumn.drop`
     """
     if table is not None:
         return table.drop_column(column, *p, **k)
@@ -45,7 +50,7 @@ def rename_table(table, name, engine=None):
 
     If Table instance is given, engine is not used.
 
-    API to :meth:`table.rename`
+    API to :meth:`ChangesetTable.rename`
 
     :param table: Table to be renamed
     :param name: new name
@@ -64,7 +69,7 @@ def rename_index(index, name, table=None, engine=None):
     If Index and Table object instances are given,
     table and engine are not used.
 
-    API to :meth:`index.rename`
+    API to :meth:`ChangesetIndex.rename`
 
     :param index: Index to be renamed
     :param name: new name
@@ -82,50 +87,25 @@ def rename_index(index, name, table=None, engine=None):
 def alter_column(*p, **k):
     """Alter a column.
 
-    Parameters: column name, table name, an engine, and the properties
-    of that column to change
+    Direct API to :class:`ColumnDelta`
 
-    API to :meth:`column.alter`
+    :param table: Table or table name (will issue reflection) 
+    :param engine: Will be used for reflection
+    :param alter_metadata: Defaults to True. It will alter changes also to objects.
     """
-    if len(p) and isinstance(p[0], sqlalchemy.Column):
-        col = p[0]
-    else:
-        col = None
+    
+    k.setdefault('alter_metadata', DEFAULT_ALTER_METADATA)
 
-    if 'table' not in k:
-        k['table'] = col.table
+    if 'table' not in k and isinstance(p[0], sqlalchemy.Column):
+        k['table'] = p[0].table
     if 'engine' not in k:
         k['engine'] = k['table'].bind
 
     engine = k['engine']
-    delta = _ColumnDelta(*p, **k)
-
-    delta.result_column.delta = delta
-    delta.result_column.table = delta.table
+    delta = ColumnDelta(*p, **k)
 
     visitorcallable = get_engine_visitor(engine, 'schemachanger')
-    engine._run_visitor(visitorcallable, delta.result_column)
-
-    # Update column
-    if col is not None:
-        # Special case: change column key on rename, if key not
-        # explicit
-        #
-        # Used by SA : table.c.[key]
-        #
-        # This fails if the key was explit AND equal to the column
-        # name.  (It changes the key name when it shouldn't.)
-        #
-        # Not much we can do about it.
-        if 'name' in delta.keys():
-            if (col.name == col.key):
-                newname = delta['name']
-                del col.table.c[col.key]
-                setattr(col, 'key', newname)
-                col.table.c[col.key] = col
-        # Change all other attrs
-        for key, val in delta.iteritems():
-            setattr(col, key, val)
+    engine._run_visitor(visitorcallable, delta)
 
 
 def _to_table(table, engine=None):
@@ -152,122 +132,250 @@ def _to_index(index, table=None, engine=None):
     return ret
 
 
-class _ColumnDelta(dict):
-    """Extracts the differences between two columns/column-parameters"""
-
-    def __init__(self, *p, **k):
-        """Extract ALTER-able differences from two columns.
+class ColumnDelta(DictMixin, sqlalchemy.schema.SchemaItem):
+    """Extracts the differences between two columns/column-parameters
 
         May receive parameters arranged in several different ways:
-         * old_column_object,new_column_object,*parameters Identifies
-            attributes that differ between the two columns.
-            Parameters specified outside of either column are always
-            executed and override column differences.
-         * column_object,[current_name,]*parameters Parameters
-            specified are changed; table name is extracted from column
-            object.  Name is changed to column_object.name from
-            current_name, if current_name is specified. If not
-            specified, name is unchanged.
-         * current_name,table,*parameters 'table' may be either an
-            object or a name
-        """
+
+        * **current_column, new_column, \*p, \*\*kw**
+            Additional parameters can be specified to override column
+            differences.
+
+        * **current_column, \*p, \*\*kw**
+            Additional parameters alter current_column. Table name is extracted
+            from current_column object.
+            Name is changed to current_column.name from current_name,
+            if current_name is specified.
+
+        * **current_col_name, \*p, \*\*kw**
+            Table kw must specified.
+
+        :param table: Table at which current Column should be bound to.\
+        If table name is given, reflection will be used.
+        :type table: string or Table instance
+        :param alter_metadata: If True, it will apply changes to metadata.
+        :type alter_metadata: bool
+        :param metadata: If `alter_metadata` is true, \
+        metadata is used to reflect table names into
+        :type metadata: :class:`MetaData` instance
+        :param engine: When reflecting tables, either engine or metadata must \
+        be specified to acquire engine object.
+        :type engine: :class:`Engine` instance
+        :returns: :class:`ColumnDelta` instance provides interface for altered attributes to \
+        `result_column` through :func:`dict` alike object.
+
+        * :class:`ColumnDelta`.result_column is altered column with new attributes
+
+        * :class:`ColumnDelta`.current_name is current name of column in db
+
+        
+    """
+
+    # Column attributes that can be altered
+    diff_keys = ('name', 'type', 'primary_key', 'nullable',
+        'server_onupdate', 'server_default')
+    diffs = dict()
+    __visit_name__ = 'column'
+
+    def __init__(self, *p, **kw):
+        self.alter_metadata = kw.pop("alter_metadata", False)
+        self.meta = kw.pop("metadata", None)
+        self.engine = kw.pop("engine", None)
+
         # Things are initialized differently depending on how many column
         # parameters are given. Figure out how many and call the appropriate
         # method.
-
         if len(p) >= 1 and isinstance(p[0], sqlalchemy.Column):
             # At least one column specified
             if len(p) >= 2 and isinstance(p[1], sqlalchemy.Column):
                 # Two columns specified
-                func = self._init_2col
+                diffs = self.compare_2_columns(*p, **kw)
             else:
                 # Exactly one column specified
-                func = self._init_1col
+                diffs = self.compare_1_column(*p, **kw)
         else:
             # Zero columns specified
-            func = self._init_0col
-        diffs = func(*p, **k)
-        self._set_diffs(diffs)
+            if not len(p) or not isinstance(p[0], basestring):
+                raise ValueError("First argument must be column name")
+            diffs = self.compare_parameters(*p, **kw)
 
-    # Column attributes that can be altered
-    diff_keys = ('name',
-                 'type',
-                 'nullable',
-                 'default',
-                 'server_default',
-                 'primary_key',
-                 'foreign_key')
+        self.apply_diffs(diffs)
 
-    @property
-    def table(self):
-        if isinstance(self._table, sqlalchemy.Table):
-            return self._table
+    def __repr__(self):
+        return '<ColumnDelta altermetadata=%r, %s>' % (self.alter_metadata,
+            super(ColumnDelta, self).__repr__())
 
-    def _init_0col(self, current_name, *p, **k):
-        p, k = self._init_normalize_params(p, k)
-        table = k.pop('table')
-        self.current_name = current_name
-        self._table = table
-        self.result_column = table.c.get(current_name, None)
+    def __getitem__(self, key):
+        if key not in self.keys():
+            raise KeyError("No such diff key, available: %s" % self.diffs )
+        return getattr(self.result_column, key)
+
+    def __setitem__(self, key, value):
+        if key not in self.keys():
+            raise KeyError("No such diff key, available: %s" % self.diffs )
+        setattr(self.result_column, key, value)
+
+    def __delitem__(self, key):
+        raise NotImplementedError
+
+    def keys(self):
+        return self.diffs.keys()
+
+    def compare_parameters(self, current_name, *p, **k):
+        """Compares Column objects with reflection"""
+        self.table = k.pop('table')
+        self.result_column = self._table.c.get(current_name)
+        if len(p):
+            k = self._extract_parameters(p, k, self.result_column)
         return k
 
-    def _init_1col(self, col, *p, **k):
-        p, k = self._init_normalize_params(p, k)
-        self._table = k.pop('table', None) or col.table
-        self.result_column = col.copy()
-        if 'current_name' in k:
-            # Renamed
-            self.current_name = k.pop('current_name')
-            k.setdefault('name', col.name)
-        else:
-            self.current_name = col.name
+    def compare_1_column(self, col, *p, **k):
+        """Compares one Column object"""
+        self.table = k.pop('table', None) or col.table
+        self.result_column = col
+        if len(p):
+            k = self._extract_parameters(p, k, self.result_column)
         return k
 
-    def _init_2col(self, start_col, end_col, *p, **k):
-        p, k = self._init_normalize_params(p, k)
-        self.result_column = start_col.copy()
-        self._table = k.pop('table', None) or start_col.table \
-            or end_col.table
-        self.current_name = start_col.name
-        for key in ('name', 'nullable', 'default', 'server_default',
-                    'primary_key', 'foreign_key'):
-            val = getattr(end_col, key, None)
-            if getattr(start_col, key, None) != val:
+    def compare_2_columns(self, old_col, new_col, *p, **k):
+        """Compares two Column objects"""
+        self.process_column(new_col)
+        self.table = k.pop('table', None) or old_col.table or new_col.table
+        self.result_column = old_col
+
+        # set differences
+        # leave out some stuff for later comp
+        for key in (set(self.diff_keys) - set(('type',))):
+            val = getattr(new_col, key, None)
+            if getattr(self.result_column, key, None) != val:
                 k.setdefault(key, val)
-        if not self.column_types_eq(start_col.type, end_col.type):
-            k.setdefault('type', end_col.type)
+
+        # inspect types
+        if not self.are_column_types_eq(self.result_column.type, new_col.type):
+            k.setdefault('type', new_col.type)
+
+        if len(p):
+            k = self._extract_parameters(p, k, self.result_column)
         return k
 
-    def _init_normalize_params(self, p, k):
-        p = list(p)
-        if len(p):
-            k.setdefault('name', p.pop(0))
-        if len(p):
-            k.setdefault('type', p.pop(0))
-        # TODO: sequences? FKs?
-        return p, k
-
-    def _set_diffs(self, diffs):
+    def apply_diffs(self, diffs):
+        """Populate dict and column object with new values"""
+        self.diffs = diffs
         for key in self.diff_keys:
             if key in diffs:
-                self[key] = diffs[key]
-                if getattr(self, 'result_column', None) is not None:
-                    setattr(self.result_column, key, diffs[key])
+                setattr(self.result_column, key, diffs[key])
+
+        self.process_column(self.result_column)
+
+        # create an instance of class type if not yet
+        if 'type' in diffs and callable(self.result_column.type):
+            self.result_column.type = self.result_column.type()
+
+        # add column to the table
+        if self.table and self.alter_metadata:
+            self.result_column.add_to_table(self.table)
+
+    def are_column_types_eq(self, old_type, new_type):
+        """Compares two types to be equal"""
+        ret = old_type.__class__ == new_type.__class__
 
-    def column_types_eq(self, this, that):
-        ret = isinstance(this, that.__class__)
-        ret = ret or isinstance(that, this.__class__)
         # String length is a special case
-        if ret and isinstance(that, sqlalchemy.types.String):
-            ret = (getattr(this, 'length', None) == \
-                       getattr(that, 'length', None))
+        if ret and isinstance(new_type, sqlalchemy.types.String):
+            ret = (getattr(old_type, 'length', None) == \
+                       getattr(new_type, 'length', None))
         return ret
 
+    def _extract_parameters(self, p, k, column):
+        """Extracts data from p and modifies diffs"""
+        p = list(p)
+        while len(p):
+            if isinstance(p[0], basestring):
+                k.setdefault('name', p.pop(0))
+            elif isinstance(p[0], sqlalchemy.types.AbstractType):
+                k.setdefault('type', p.pop(0))
+            elif callable(p[0]):
+                p[0] = p[0]()
+            else:
+                break
+
+        if len(p):
+            new_col = column.copy_fixed()
+            new_col._init_items(*p)
+            k = self.compare_2_columns(column, new_col, **k)
+        return k
+
+    def process_column(self, column):
+        """Processes default values for column"""
+        # XXX: this is a snippet from SA processing of positional parameters
+        if column.args:
+            toinit = list(column.args)
+        else:
+            toinit = list()
+
+        if column.server_default is not None:
+            if isinstance(column.server_default, sqlalchemy.FetchedValue):
+                toinit.append(column.server_default)
+            else:
+                toinit.append(sqlalchemy.DefaultClause(column.server_default))
+        if column.server_onupdate is not None:
+            if isinstance(column.server_onupdate, FetchedValue):
+                toinit.append(column.server_default)
+            else:
+                toinit.append(sqlalchemy.DefaultClause(column.server_onupdate,
+                                            for_update=True))
+        if toinit:
+            column._init_items(*toinit)
+        column.args = []
+
+    def _get_table(self):
+        return getattr(self, '_table', None)
+
+    def _set_table(self, table):
+        if isinstance(table, basestring):
+            if self.alter_metadata:
+                if not self.meta:
+                    raise ValueError("metadata must be specified for table"
+                        " reflection when using alter_metadata")
+                meta = self.meta
+                if self.engine:
+                    meta.bind = self.engine
+            else:
+                if not self.engine and not self.meta:
+                    raise ValueError("engine or metadata must be specified"
+                        " to reflect tables")
+                if not self.engine:
+                    self.engine = self.meta.bind
+                meta = sqlalchemy.MetaData(bind=self.engine)
+            self._table = sqlalchemy.Table(table, meta, autoload=True)
+        elif isinstance(table, sqlalchemy.Table):
+            self._table = table
+            if not self.alter_metadata:
+                self._table.meta = sqlalchemy.MetaData(bind=self._table.bind)
+
+    def _get_result_column(self):
+        return getattr(self, '_result_column', None)
+
+    def _set_result_column(self, column):
+        """Set Column to Table based on alter_metadata evaluation."""
+        self.process_column(column)
+        if not hasattr(self, 'current_name'):
+            self.current_name = column.name
+        if self.alter_metadata:
+            self._result_column = column
+            # remove column from table, nothing has changed yet
+            if self.table:
+                column.remove_from_table(self.table)
+        else:
+            self._result_column = column.copy_fixed()
+
+    table = property(_get_table, _set_table)
+    result_column = property(_get_result_column, _set_result_column)
+
 
 class ChangesetTable(object):
     """Changeset extensions to SQLAlchemy tables."""
 
-    def create_column(self, column):
+    def create_column(self, column, **kw):
         """Creates a column.
 
         The column parameter may be a column definition or the name of
@@ -278,7 +386,7 @@ class ChangesetTable(object):
             column = getattr(self.c, str(column))
         column.create(table=self)
 
-    def drop_column(self, column):
+    def drop_column(self, column, **kw):
         """Drop a column, given its name or definition."""
         if not isinstance(column, sqlalchemy.Column):
             # It's a column name
@@ -327,17 +435,16 @@ class ChangesetColumn(object):
         May supply a new column object, or a list of properties to
         change.
 
-        For example; the following are equivalent:
-            col.alter(Column('myint', Integer, nullable=False))
-            col.alter('myint', Integer, nullable=False)
-            col.alter(name='myint', type=Integer, nullable=False)
+        For example; the following are equivalent::
 
-        Column name, type, default, and nullable may be changed
-        here. Note that for column defaults, only PassiveDefaults are
-        managed by the database - changing others doesn't make sense.
+            col.alter(Column('myint', Integer, DefaultClause('foobar')))
+            col.alter('myint', Integer, server_default='foobar', nullable=False)
+            col.alter(DefaultClause('foobar'), name='myint', type=Integer, nullable=False)
 
-        :param table: Table to be altered
-        :param engine: Engine to be used
+        Column name, type, server_default, and nullable may be changed
+        here.
+
+        Direct API to :func:`alter_column`
         """
         if 'table' not in k:
             k['table'] = self.table
@@ -371,8 +478,8 @@ class ChangesetColumn(object):
         """
         if table is not None:
             self.table = table
-        self.remove_from_table(self.table)
         engine = self.table.bind
+        self.remove_from_table(self.table, unset_table=False)
         visitorcallable = get_engine_visitor(engine, 'columndropper')
         engine._run_visitor(visitorcallable, self, *args, **kwargs)
         return self
@@ -381,12 +488,31 @@ class ChangesetColumn(object):
         if table and not self.table:
             self._set_parent(table)
 
-    def remove_from_table(self, table):
+    def remove_from_table(self, table, unset_table=True):
         # TODO: remove indexes, primary keys, constraints, etc
+        if unset_table:
+            self.table = None
         if table.c.contains_column(self):
             table.c.remove(self)
             
+    # TODO: this is fixed in 0.6
+    def copy_fixed(self, **kw):
+        """Create a copy of this ``Column``, with all attributes."""
+        return sqlalchemy.Column(self.name, self.type, self.default,
+            key=self.key,
+            primary_key=self.primary_key,
+            nullable=self.nullable,
+            quote=self.quote,
+            index=self.index,
+            unique=self.unique,
+            onupdate=self.onupdate,
+            autoincrement=self.autoincrement,
+            server_default=self.server_default,
+            server_onupdate=self.server_onupdate,
+            *[c.copy(**kw) for c in self.constraints])
+
     def _check_sanity_constraints(self, name):
+
         obj = getattr(self, name)
         if (getattr(self, name[:-5]) and not obj):
             raise InvalidConstraintError("Column.create() accepts index_name,"
@@ -412,3 +538,15 @@ class ChangesetIndex(object):
         visitorcallable = get_engine_visitor(engine, 'schemachanger')
         engine._run_visitor(visitorcallable, self, *args, **kwargs)
         self.name = name
+
+
+class ChangesetDefaultClause(object):
+    """Implements comparison between :class:`DefaultClause` instances"""
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            if self.arg == other.arg:
+                return True
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
diff --git a/test/changeset/test_changeset.py b/test/changeset/test_changeset.py
index d141e12..0e698f7 100644
--- a/test/changeset/test_changeset.py
+++ b/test/changeset/test_changeset.py
@@ -5,13 +5,14 @@ from sqlalchemy import *
 
 from migrate import changeset
 from migrate.changeset import *
-from migrate.changeset.schema import _ColumnDelta
+from migrate.changeset.schema import ColumnDelta
 from test import fixture
 
 
 class TestAddDropColumn(fixture.DB):
     """Test add/drop column through all possible interfaces
-    also test for constraints"""
+    also test for constraints
+    """
     level = fixture.DB.CONNECT
     table_name = 'tmp_adddropcol'
     table_int = 0
@@ -272,12 +273,10 @@ class TestAddDropColumn(fixture.DB):
         self.assertEqual(u'foobar', row['data'])
 
         col.drop()
-
+    
     # TODO: test sequence
-    # TODO: test that if column is appended on creation and removed on deletion
-    # TODO: test column.alter with all changes at one time
     # TODO: test quoting
-    # TODO: test drop default
+    # TODO: test non-autoname constraints
 
 
 class TestRename(fixture.DB):
@@ -445,23 +444,6 @@ class TestColumnChange(fixture.DB):
         self.table.c.data   # Should not raise exception
         self.assertEquals(num_rows(self.table.c.data,content), 1)
         
-    #@fixture.usedb()
-    #def test_fk(self):
-    #    """Can add/drop foreign key constraints to/from a column
-    #    Not supported
-    #    """
-    #    self.assert_(self.table.c.data.foreign_key is None)
-
-    #    # add
-    #    self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id))
-    #    self.refresh_table(self.table.name)
-    #    self.assert_(self.table.c.data.foreign_key is not None)
-
-    #    # drop
-    #    self.table.c.data.alter(foreign_key=None)
-    #    self.refresh_table(self.table.name)
-    #    self.assert_(self.table.c.data.foreign_key is None)
-
     @fixture.usedb()
     def test_type(self):
         """Can change a column's type"""
@@ -508,6 +490,9 @@ class TestColumnChange(fixture.DB):
         #self.assertEquals(self.table.c.data.server_default.arg,default)
         # TextClause returned by autoload
         self.assert_(default in str(self.table.c.data.server_default.arg))
+        self.engine.execute(self.table.insert(), id=12)
+        row = self.table.select(autocommit=True).execute().fetchone()
+        self.assertEqual(row['data'], default)
 
         # Column object
         default = 'your_default'
@@ -515,13 +500,15 @@ class TestColumnChange(fixture.DB):
         self.refresh_table(self.table.name)
         self.assert_(default in str(self.table.c.data.server_default.arg))
 
-        # Remove default
+        # Drop/remove default
         self.table.c.data.alter(server_default=None)
+        self.assertEqual(self.table.c.data.server_default, None)
+
         self.refresh_table(self.table.name)
         # server_default isn't necessarily None for Oracle
         #self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
         self.engine.execute(self.table.insert(), id=11)
-        row = self.table.select().execute().fetchone()
+        row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
         self.assert_(row['data'] is None, row['data'])
         
 
@@ -541,80 +528,225 @@ class TestColumnChange(fixture.DB):
         self.refresh_table(self.table.name)
         self.assertEquals(self.table.c.data.nullable, True)
 
-    #@fixture.usedb()
-    #def test_pk(self):
-    #    """Can add/drop a column to/from its table's primary key
-    #    Not supported
-    #    """
-    #    self.engine.echo = True
-    #    self.assertEquals(len(self.table.primary_key), 1)
+    @fixture.usedb()
+    def test_alter_metadata(self):
+        """Test if alter_metadata is respected"""
 
-    #    # Entire column definition
-    #    self.table.c.data.alter(Column('data', String, primary_key=True))
-    #    self.refresh_table(self.table.name)
-    #    self.assertEquals(len(self.table.primary_key), 2)
+        self.table.c.data.alter(Column('data', String(100)))
 
-    #    # Just the new status
-    #    self.table.c.data.alter(primary_key=False)
-    #    self.refresh_table(self.table.name)
-    #    self.assertEquals(len(self.table.primary_key), 1)
+        self.assert_(isinstance(self.table.c.data.type, String))
+        self.assertEqual(self.table.c.data.type.length, 100)
+
+        # nothing should change
+        self.table.c.data.alter(Column('data', String(200)), alter_metadata=False)
+        self.assert_(isinstance(self.table.c.data.type, String))
+        self.assertEqual(self.table.c.data.type.length, 100)
+
+    @fixture.usedb()
+    def test_alter_all(self):
+        """Tests all alter changes at one time"""
+        # test for each db separately
+        # since currently some dont support everything
+
+        # test pre settings
+        self.assertEqual(self.table.c.data.nullable, True)
+        self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
+        self.assertEqual(self.table.c.data.name, 'data')
+        self.assertTrue(isinstance(self.table.c.data.type, String))
+        self.assertTrue(self.table.c.data.type.length, 40)
+
+        kw = dict(nullable=False,
+                 server_default='foobar',
+                 name='data_new',
+                 type=String(50),
+                 alter_metadata=True)
+        if self.engine.name == 'firebird':
+            del kw['nullable']
+        self.table.c.data.alter(**kw)
+        
+        # test altered objects
+        self.assertEqual(self.table.c.data.server_default.arg, 'foobar')
+        if not self.engine.name == 'firebird':
+            self.assertEqual(self.table.c.data.nullable, False)
+        self.assertEqual(self.table.c.data.name, 'data_new')
+        self.assertEqual(self.table.c.data.type.length, 50)
+
+        self.refresh_table(self.table.name)
+
+        # test post settings
+        if not self.engine.name == 'firebird':
+            self.assertEqual(self.table.c.data_new.nullable, False)
+        self.assertEqual(self.table.c.data_new.name, 'data_new')
+        self.assertTrue(isinstance(self.table.c.data_new.type, String))
+        self.assertTrue(self.table.c.data_new.type.length, 50)
+
+        # insert data and assert default
+        self.table.insert(values={'id': 10}).execute()
+        row = self.table.select(autocommit=True).execute().fetchone()
+        self.assertEqual(u'foobar', row['data_new'])
 
 
-class TestColumnDelta(fixture.Base):
-    def test_deltas(self):
-        def mkcol(name='id', type=String, *p, **k):
-            return Column(name, type, *p, **k)
+class TestColumnDelta(fixture.DB):
+    """Tests ColumnDelta class"""
 
-        def verify(expected, original, *p, **k):
-            delta = _ColumnDelta(original, *p, **k)
-            result = delta.keys()
-            result.sort()
-            self.assertEquals(expected, result)
-            return delta
+    level = fixture.DB.CONNECT
+    table_name = 'tmp_coldelta'
+    table_int = 0
 
-        col_orig = mkcol(primary_key=True)
+    def _setup(self, url):
+        super(TestColumnDelta, self)._setup(url)
+        self.meta = MetaData()
+        self.table = Table(self.table_name, self.meta,
+            Column('ids', String(10)),
+        )
+        self.meta.bind = self.engine
+        if self.engine.has_table(self.table.name):
+            self.table.drop()
+        self.table.create()
 
-        verify([], col_orig)
-        verify(['name'], col_orig, 'ids')
-        # Parameters are always executed, even if they're 'unchanged'
-        # (We can't assume given column is up-to-date)
-        verify(['name', 'primary_key', 'type'],
-               col_orig, 'id', Integer, primary_key=True)
-        verify(['name', 'primary_key', 'type'],
-               col_orig, name='id', type=Integer, primary_key=True)
+    def _teardown(self):
+        if self.engine.has_table(self.table.name):
+            self.table.drop()
+        self.meta.clear()
+        super(TestColumnDelta,self)._teardown()
 
-        # Can compare two columns and find differences
-        col_new = mkcol(name='ids', primary_key=True)
-        verify([], col_orig, col_orig)
-        verify(['name'], 'ids', table=Table('test', MetaData()), name='hey')
-        verify(['name'], col_orig, col_orig, 'ids')
-        verify(['name'], col_orig, col_orig, name='ids')
-        verify(['name'], col_orig, col_new)
-        verify(['name','type'], col_orig, col_new, type=String)
+    def mkcol(self, name='id', type=String, *p, **k):
+        return Column(name, type, *p, **k)
 
-        # Change name, given an up-to-date definition and the current name
-        delta = verify(['name'], col_new, current_name='id')
-        self.assertEquals(delta.get('name'), 'ids')
+    def verify(self, expected, original, *p, **k):
+        self.delta = ColumnDelta(original, *p, **k)
+        result = self.delta.keys()
+        result.sort()
+        self.assertEquals(expected, result)
+        return self.delta
 
-        # Change other params at the same time
-        verify(['name', 'type'], col_new, current_name='id', type=String)
+    def test_deltas_two_columns(self):
+        """Testing ColumnDelta with two columns"""
+        col_orig = self.mkcol(primary_key=True)
+        col_new = self.mkcol(name='ids', primary_key=True)
+        self.verify([], col_orig, col_orig)
+        self.verify(['name'], col_orig, col_orig, 'ids')
+        self.verify(['name'], col_orig, col_orig, name='ids')
+        self.verify(['name'], col_orig, col_new)
+        self.verify(['name', 'type'], col_orig, col_new, type=String)
 
         # Type comparisons
-        verify([], mkcol(type=String), mkcol(type=String))
-        verify(['type'], mkcol(type=String), mkcol(type=Integer))
-        verify(['type'], mkcol(type=String), mkcol(type=String(42)))
-        verify([], mkcol(type=String(42)), mkcol(type=String(42)))
-        verify(['type'], mkcol(type=String(24)), mkcol(type=String(42)))
+        self.verify([], self.mkcol(type=String), self.mkcol(type=String))
+        self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer))
+        self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42)))
+        self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42)))
+        self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42)))
+        self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24)))
 
         # Other comparisons
-        verify(['primary_key'], mkcol(nullable=False), mkcol(primary_key=True))
+        self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True))
 
         # PK implies nullable=False
-        verify(['nullable', 'primary_key'],
-               mkcol(nullable=True), mkcol(primary_key=True))
-        verify([], mkcol(primary_key=True), mkcol(primary_key=True))
-        verify(['nullable'], mkcol(nullable=True), mkcol(nullable=False))
-        verify([], mkcol(nullable=True), mkcol(nullable=True))
-        verify(['default'], mkcol(default=None), mkcol(default='42'))
-        verify([], mkcol(default=None), mkcol(default=None))
-        verify([], mkcol(default='42'), mkcol(default='42'))
+        self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True))
+        self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True))
+        self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False))
+        self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True))
+        self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None))
+        self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42'))
+
+        # test server default
+        delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar')))
+        self.assertEqual(delta['server_default'].arg, 'foobar')
+
+        self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar')))
+        self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar')))
+
+        # test alter_metadata
+        col = self.mkcol(server_default='foobar')
+        self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True)
+        self.assert_(isinstance(col.type, Text))
+
+        col = self.mkcol()
+        self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=True)
+        self.assert_(isinstance(col.type, Text))
+        self.assertEqual(col.name, 'beep')
+        self.assertEqual(col.server_default.arg, 'foobar')
+
+        col = self.mkcol()
+        self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=False)
+        self.assertFalse(isinstance(col.type, Text))
+        self.assertNotEqual(col.name, 'beep')
+        self.assertFalse(col.server_default)
+
+    @fixture.usedb()
+    def test_deltas_zero_columns(self):
+        """Testing ColumnDelta with zero columns"""
+
+        self.verify(['name'], 'ids', table=self.table, name='hey')
+
+        # test reflection
+        self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine)
+        self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta)
+
+        # check if alter_metadata is respected
+        self.meta.clear()
+        delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=True, metadata=self.meta)
+        self.assert_(self.table.name in self.meta)
+        self.assertEqual(delta.result_column.type.length, 80)
+        self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80)
+
+        self.meta.clear()
+        self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, engine=self.engine)
+        self.assert_(self.table.name not in self.meta)
+
+        self.meta.clear()
+        self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, metadata=self.meta)
+        self.assert_(self.table.name not in self.meta)
+
+        # test defaults
+        self.meta.clear()
+        self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', alter_metadata=True, metadata=self.meta)
+        self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar'
+
+        # test missing parameters
+        self.assertRaises(ValueError, ColumnDelta, table=self.table.name)
+        self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True)
+        self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False)
+
+    def test_deltas_one_column(self):
+        """Testing ColumnDelta with one column"""
+        col_orig = self.mkcol(primary_key=True)
+
+        self.verify([], col_orig)
+        self.verify(['name'], col_orig, 'ids')
+        # Parameters are always executed, even if they're 'unchanged'
+        # (We can't assume given column is up-to-date)
+        self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True)
+        self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True)
+
+        # Change name, given an up-to-date definition and the current name
+        delta = self.verify(['name'], col_orig, name='blah')
+        self.assertEquals(delta.get('name'), 'blah')
+        self.assertEquals(delta.current_name, 'id')
+
+        # check if alter_metadata is respected
+        col_orig = self.mkcol(primary_key=True)
+        self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True)
+        self.assert_(isinstance(col_orig.type, Text))
+        self.assertEqual(col_orig.name, 'id12')
+
+        col_orig = self.mkcol(primary_key=True)
+        self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=False)
+        self.assert_(isinstance(col_orig.type, String))
+        self.assertEqual(col_orig.name, 'id')
+
+        # test server default
+        col_orig = self.mkcol(primary_key=True)
+        delta = self.verify(['server_default'], col_orig, DefaultClause('foobar'))
+        self.assertEqual(delta['server_default'].arg, 'foobar')
+
+        delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar'))
+        self.assertEqual(delta['server_default'].arg, 'foobar')
+
+        # no change
+        col_orig = self.mkcol(server_default=DefaultClause('foobar'))
+        delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType)
+        self.assert_(isinstance(delta.result_column.type, PickleType))
+
+        # TODO: test server on update
+        # TODO: test bind metadata