From c6883c0d472df8007db1f6c9a9930d1c53c717ae Mon Sep 17 00:00:00 2001 From: iElectric Date: Tue, 2 Jun 2009 19:50:31 +0000 Subject: [PATCH] refactor api.py a bit, lots of PEP8 love --- migrate/versioning/api.py | 97 ++++---- migrate/versioning/util/__init__.py | 20 +- setup.cfg | 7 +- setup.py | 3 +- test/fixture/shell.py | 36 +-- test/versioning/test_shell.py | 332 +++++++++++++++------------- 6 files changed, 271 insertions(+), 224 deletions(-) diff --git a/migrate/versioning/api.py b/migrate/versioning/api.py index ffa3370..80ee842 100644 --- a/migrate/versioning/api.py +++ b/migrate/versioning/api.py @@ -19,26 +19,26 @@ from sqlalchemy import create_engine from migrate.versioning import (exceptions, repository, schema, version, script as script_) # command name conflict -from migrate.versioning.util import asbool +from migrate.versioning.util import asbool, catch_known_errors __all__ = [ -'help', -'create', -'script', -'script_sql', -'make_update_script_for_model', -'version', -'source', -'version_control', -'db_version', -'upgrade', -'downgrade', -'drop_version_control', -'manage', -'test', -'compare_model_to_db', -'create_model', -'update_db_from_model', + 'help', + 'create', + 'script', + 'script_sql', + 'make_update_script_for_model', + 'version', + 'source', + 'version_control', + 'db_version', + 'upgrade', + 'downgrade', + 'drop_version_control', + 'manage', + 'test', + 'compare_model_to_db', + 'create_model', + 'update_db_from_model', ] cls_repository = repository.Repository @@ -65,7 +65,7 @@ def help(cmd=None, **opts): ret = ret.replace('%prog', sys.argv[0]) return ret - +@catch_known_errors def create(repository, name, **opts): """%prog create REPOSITORY_PATH NAME [--table=TABLE] @@ -75,13 +75,11 @@ def create(repository, name, **opts): 'migrate_version'. This table is created in all version-controlled databases. """ - try: - rep = cls_repository.create(repository, name, **opts) - except exceptions.PathFoundError, e: - raise exceptions.KnownError("The path %s already exists" % e.args[0]) + rep = cls_repository.create(repository, name, **opts) -def script(description, repository=None, **opts): +@catch_known_errors +def script(description, repository, **opts): """%prog script [--repository=REPOSITORY_PATH] DESCRIPTION Create an empty change script using the next unused version number @@ -90,16 +88,12 @@ def script(description, repository=None, **opts): For instance, manage.py script "Add initial tables" creates: repository/versions/001_Add_initial_tables.py """ - try: - if repository is None: - raise exceptions.UsageError("A repository must be specified") - repos = cls_repository(repository) - repos.create_script(description, **opts) - except exceptions.PathFoundError, e: - raise exceptions.KnownError("The path %s already exists" % e.args[0]) + repos = cls_repository(repository) + repos.create_script(description, **opts) -def script_sql(database, repository=None, **opts): +@catch_known_errors +def script_sql(database, repository, **opts): """%prog script_sql [--repository=REPOSITORY_PATH] DATABASE Create empty change SQL scripts for given DATABASE, where DATABASE @@ -107,16 +101,11 @@ def script_sql(database, repository=None, **opts): or generic ('default'). For instance, manage.py script_sql postgres creates: - repository/versions/001_upgrade_postgres.sql and - repository/versions/001_downgrade_postgres.sql + repository/versions/001_postgres_upgrade.sql and + repository/versions/001_postgres_postgres.sql """ - try: - if repository is None: - raise exceptions.UsageError("A repository must be specified") - repos = cls_repository(repository) - repos.create_script_sql(database, **opts) - except exceptions.PathFoundError, e: - raise exceptions.KnownError("The path %s already exists" % e.args[0]) + repos = cls_repository(repository) + repos.create_script_sql(database, **opts) def test(repository, url=None, **opts): @@ -130,21 +119,14 @@ def test(repository, url=None, **opts): engine = create_engine(url) repos = cls_repository(repository) script = repos.version(None).script() + # Upgrade print "Upgrading...", - try: - script.run(engine, 1) - except: - print "ERROR" - raise + script.run(engine, 1) print "done" print "Downgrading...", - try: - script.run(engine, -1) - except: - print "ERROR" - raise + script.run(engine, -1) print "done" print "Success" @@ -172,6 +154,7 @@ def source(version, dest=None, repository=None, **opts): if dest is not None: dest = open(dest, 'w') dest.write(ret) + dest.close() ret = None return ret @@ -298,7 +281,7 @@ def drop_version_control(url, repository, **opts): """ echo = asbool(opts.get('echo', False)) engine = create_engine(url, echo=echo) - schema=cls_schema(engine, repository) + schema = cls_schema(engine, repository) schema.drop() @@ -347,6 +330,8 @@ def create_model(url, repository, **opts): print cls_schema.create_model(engine, repository, declarative) +# TODO: get rid of this? if we don't add back path param +@catch_known_errors def make_update_script_for_model(url, oldmodel, model, repository, **opts): """%prog make_update_script_for_model URL OLDMODEL MODEL REPOSITORY_PATH @@ -357,12 +342,8 @@ def make_update_script_for_model(url, oldmodel, model, repository, **opts): """ # TODO: get rid of EXPERIMENTAL label echo = asbool(opts.get('echo', False)) engine = create_engine(url, echo=echo) - try: - print cls_script_python.make_update_script_for_model( - engine, oldmodel, model, repository, **opts) - except exceptions.PathFoundError, e: - # TODO: get rid of this? if we don't add back path param - raise exceptions.KnownError("The path %s already exists" % e.args[0]) + print cls_script_python.make_update_script_for_model( + engine, oldmodel, model, repository, **opts) def update_db_from_model(url, model, repository, **opts): diff --git a/migrate/versioning/util/__init__.py b/migrate/versioning/util/__init__.py index aaa7edf..9942f82 100644 --- a/migrate/versioning/util/__init__.py +++ b/migrate/versioning/util/__init__.py @@ -1,5 +1,12 @@ -from keyedinstance import KeyedInstance -from importpath import import_path +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from decorator import decorator + +from migrate.versioning import exceptions +from migrate.versioning.util.keyedinstance import KeyedInstance +from migrate.versioning.util.importpath import import_path + def loadModel(model): ''' Import module and use module-level variable -- assume model is of form "mod1.mod2.varname". ''' @@ -23,3 +30,12 @@ def asbool(obj): else: raise ValueError("String is not true/false: %r" % obj) return bool(obj) + +@decorator +def catch_known_errors(f, *a, **kw): + """Decorator that catches known api usage errors""" + + try: + f(*a, **kw) + except exceptions.PathFoundError, e: + raise exceptions.KnownError("The path %s already exists" % e.args[0]) diff --git a/setup.cfg b/setup.cfg index 546e1dd..d31d1f7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,8 +3,11 @@ source-dir = docs build-dir = docs/_build [egg_info] -tag_svn_revision=1 -tag_build=.dev +tag_svn_revision = 1 +tag_build = .dev + +[nosetests] +pdb = true [aliases] release = egg_info -RDb '' diff --git a/setup.py b/setup.py index 2ffe8a0..681a54e 100644 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ except ImportError: pass test_requirements = ['nose >= 0.10'] +required_deps = ['sqlalchemy >= 0.5', 'decorator'] setup( name = "sqlalchemy-migrate", @@ -26,7 +27,7 @@ Inspired by Ruby on Rails' migrations, Migrate provides a way to deal with datab Migrate extends SQLAlchemy to have database changeset handling. It provides a database change repository mechanism which can be used from the command line as well as from inside python code. """, - install_requires = ['sqlalchemy >= 0.5'], + install_requires = required_deps, extras_require = { 'testing': test_requirements, 'docs' : ['sphinx >= 0.5'], diff --git a/test/fixture/shell.py b/test/fixture/shell.py index 043070c..b6d6741 100644 --- a/test/fixture/shell.py +++ b/test/fixture/shell.py @@ -1,45 +1,55 @@ -from pathed import * +#!/usr/bin/env python +# -*- coding: utf-8 -*- + import os import shutil import sys +from test.fixture.pathed import * + + class Shell(Pathed): """Base class for command line tests""" - def execute(self,command,*p,**k): + def execute(self, command, *p, **k): """Return the fd of a command; can get output (stdout/err) and exitcode""" # We might be passed a file descriptor for some reason; if so, just return it - if type(command) is file: + if isinstance(command, file): return command + # Redirect stderr to stdout # This is a bit of a hack, but I've not found a better way py_path = os.environ.get('PYTHONPATH', '') py_path_list = py_path.split(':') py_path_list.append(os.path.abspath('.')) os.environ['PYTHONPATH'] = ':'.join(py_path_list) - fd=os.popen(command+' 2>&1',*p,**k) + fd = os.popen(command + ' 2>&1') + if py_path: py_path = os.environ['PYTHONPATH'] = py_path else: del os.environ['PYTHONPATH'] return fd - def output_and_exitcode(self,*p,**k): - fd=self.execute(*p,**k) + + def output_and_exitcode(self, *p, **k): + fd=self.execute(*p, **k) output = fd.read().strip() exitcode = fd.close() if k.pop('emit',False): print output - return (output,exitcode) - def exitcode(self,*p,**k): + return (output, exitcode) + + def exitcode(self, *p, **k): """Execute a command and return its exit code ...without printing its output/errors """ - ret = self.output_and_exitcode(*p,**k) + ret = self.output_and_exitcode(*p, **k) return ret[1] - def assertFailure(self,*p,**k): - output,exitcode = self.output_and_exitcode(*p,**k) + def assertFailure(self, *p, **k): + output,exitcode = self.output_and_exitcode(*p, **k) assert (exitcode), output - def assertSuccess(self,*p,**k): - output,exitcode = self.output_and_exitcode(*p,**k) + + def assertSuccess(self, *p, **k): + output,exitcode = self.output_and_exitcode(*p, **k) #self.assert_(not exitcode, output) assert (not exitcode), output diff --git a/test/versioning/test_shell.py b/test/versioning/test_shell.py index abdc927..27d20f3 100644 --- a/test/versioning/test_shell.py +++ b/test/versioning/test_shell.py @@ -1,41 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os import sys +import shutil import traceback -from StringIO import StringIO from types import FileType -import os,shutil -from test import fixture -from migrate.versioning.repository import Repository -from migrate.versioning import genmodel, shell +from StringIO import StringIO + from sqlalchemy import MetaData,Table -python_version = sys.version[0:3] +from migrate.versioning.repository import Repository +from migrate.versioning import genmodel, shell +from test import fixture + + +python_version = sys.version[:3] class Shell(fixture.Shell): - _cmd=os.path.join('python migrate', 'versioning', 'shell.py') + + _cmd = os.path.join('python migrate', 'versioning', 'shell.py') + @classmethod - def cmd(cls,*p): - p = map(lambda s: str(s),p) - ret = ' '.join([cls._cmd]+p) - return ret + def cmd(cls, *args): + safe_parameters = map(lambda arg: str(arg), args) + return ' '.join([cls._cmd] + safe_parameters) + def execute(self, shell_cmd, runshell=None, **kwargs): """A crude simulation of a shell command, to speed things up""" + # If we get an fd, the command is already done - if isinstance(shell_cmd, FileType) or isinstance(shell_cmd, StringIO): + if isinstance(shell_cmd, (FileType, StringIO)): return shell_cmd + # Analyze the command; see if we can 'fake' the shell try: # Forced to run in shell? - #if runshell or '--runshell' in sys.argv: + # if runshell or '--runshell' in sys.argv: if runshell: raise Exception # Remove the command prefix if not shell_cmd.startswith(self._cmd): raise Exception - cmd = shell_cmd[(len(self._cmd)+1):] + cmd = shell_cmd[(len(self._cmd) + 1):] params = cmd.split(' ') command = params[0] - except: - return super(Shell,self).execute(shell_cmd) + except: + return super(Shell, self).execute(shell_cmd) # Redirect stdout to an object; redirect stderr to stdout fd = StringIO() @@ -47,18 +58,18 @@ class Shell(fixture.Shell): try: try: shell.main(params, **kwargs) - except SystemExit,e: + except SystemExit, e: # Simulate the exit status - fd_close=fd.close + fd_close = fd.close def close_(): fd_close() return e.args[0] fd.close = close_ - except Exception,e: + except Exception, e: # Print the exception, but don't re-raise it traceback.print_exc() # Simulate a nonzero exit status - fd_close=fd.close + fd_close = fd.close def close_(): fd_close() return 2 @@ -70,13 +81,14 @@ class Shell(fixture.Shell): fd.seek(0) return fd - def cmd_version(self,repos_path): - fd = self.execute(self.cmd('version',repos_path)) - ret = int(fd.read().strip()) + def cmd_version(self, repos_path): + fd = self.execute(self.cmd('version', repos_path)) + result = int(fd.read().strip()) self.assertSuccess(fd) - return ret - def cmd_db_version(self,url,repos_path): - fd = self.execute(self.cmd('db_version',url,repos_path)) + return result + + def cmd_db_version(self, url, repos_path): + fd = self.execute(self.cmd('db_version', url, repos_path)) txt = fd.read() #print txt ret = int(txt.strip()) @@ -86,129 +98,143 @@ class Shell(fixture.Shell): class TestShellCommands(Shell): """Tests migrate.py commands""" - def test_run(self): - """Runs; displays help""" - # Force this to run in shell... - self.assertSuccess(self.cmd('-h'),runshell=True) - self.assertSuccess(self.cmd('--help'),runshell=True) - def test_help(self): + """Displays default help dialog""" + self.assertSuccess(self.cmd('-h'), runshell=True) + self.assertSuccess(self.cmd('--help'), runshell=True) + self.assertSuccess(self.cmd('help'), runshell=True) + + def test_help_commands(self): """Display help on a specific command""" - self.assertSuccess(self.cmd('-h'),runshell=True) - self.assertSuccess(self.cmd('--help'),runshell=True) for cmd in shell.api.__all__: - fd=self.execute(self.cmd('help',cmd)) + fd = self.execute(self.cmd('help', cmd)) # Description may change, so best we can do is ensure it shows up - #self.assertNotEquals(fd.read(),'') output = fd.read() - self.assertNotEquals(output,'') + self.assertNotEquals(output, '') self.assertSuccess(fd) def test_create(self): """Repositories are created successfully""" - repos=self.tmp_repos() - name='name' + repos = self.tmp_repos() + # Creating a file that doesn't exist should succeed - cmd=self.cmd('create',repos,name) + cmd = self.cmd('create', repos, 'repository_name') self.assertSuccess(cmd) + # Files should actually be created self.assert_(os.path.exists(repos)) + # The default table should not be None repos_ = Repository(repos) - self.assertNotEquals(repos_.config.get('db_settings','version_table'),'None') + self.assertNotEquals(repos_.config.get('db_settings', 'version_table'), 'None') + # Can't create it again: it already exists self.assertFailure(cmd) def test_script(self): """We can create a migration script via the command line""" - repos=self.tmp_repos() - self.assertSuccess(self.cmd('create',repos,'repository_name')) + repos = self.tmp_repos() + self.assertSuccess(self.cmd('create', repos, 'repository_name')) + self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'Desc')) self.assert_(os.path.exists('%s/versions/001_Desc.py' % repos)) - # 's' instead of 'script' should work too + self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'More')) self.assert_(os.path.exists('%s/versions/002_More.py' % repos)) + self.assertSuccess(self.cmd('script', '--repository=%s' % repos, '"Some Random name"'), runshell=True) + self.assert_(os.path.exists('%s/versions/003_Some_Random_name.py' % repos)) + def test_script_sql(self): """We can create a migration sql script via the command line""" - repos=self.tmp_repos() - self.assertSuccess(self.cmd('create',repos,'repository_name')) + repos = self.tmp_repos() + self.assertSuccess(self.cmd('create', repos, 'repository_name')) + self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'mydb')) self.assert_(os.path.exists('%s/versions/001_mydb_upgrade.sql' % repos)) self.assert_(os.path.exists('%s/versions/001_mydb_downgrade.sql' % repos)) # Test creating a second - self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'mydb')) - self.assert_(os.path.exists('%s/versions/002_mydb_upgrade.sql' % repos)) - self.assert_(os.path.exists('%s/versions/002_mydb_downgrade.sql' % repos)) - + self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'postgres')) + self.assert_(os.path.exists('%s/versions/002_postgres_upgrade.sql' % repos)) + self.assert_(os.path.exists('%s/versions/002_postgres_downgrade.sql' % repos)) def test_manage(self): """Create a project management script""" - script=self.tmp_py() + script = self.tmp_py() self.assert_(not os.path.exists(script)) + # No attempt is made to verify correctness of the repository path here - self.assertSuccess(self.cmd('manage',script,'--repository=/path/to/repository')) + self.assertSuccess(self.cmd('manage', script, '--repository=/path/to/repository')) self.assert_(os.path.exists(script)) class TestShellRepository(Shell): """Shell commands on an existing repository/python script""" + def setUp(self): """Create repository, python change script""" - self.path_repos=repos=self.tmp_repos() - self.assertSuccess(self.cmd('create',repos,'repository_name')) + self.path_repos = repos = self.tmp_repos() + self.assertSuccess(self.cmd('create', repos, 'repository_name')) def test_version(self): """Correctly detect repository version""" # Version: 0 (no scripts yet); successful execution - fd=self.execute(self.cmd('version','--repository=%s'%self.path_repos)) - self.assertEquals(fd.read().strip(),"0") + fd = self.execute(self.cmd('version','--repository=%s' % self.path_repos)) + self.assertEquals(fd.read().strip(), "0") self.assertSuccess(fd) + # Also works as a positional param - fd=self.execute(self.cmd('version',self.path_repos)) - self.assertEquals(fd.read().strip(),"0") + fd = self.execute(self.cmd('version', self.path_repos)) + self.assertEquals(fd.read().strip(), "0") self.assertSuccess(fd) + # Create a script and version should increment self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc')) - fd=self.execute(self.cmd('version',self.path_repos)) - self.assertEquals(fd.read().strip(),"1") + fd = self.execute(self.cmd('version',self.path_repos)) + self.assertEquals(fd.read().strip(), "1") self.assertSuccess(fd) def test_source(self): """Correctly fetch a script's source""" self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc')) - filename='%s/versions/001_Desc.py' % self.path_repos - source=open(filename).read() - self.assert_(source.find('def upgrade')>=0) - # Version is now 1 - fd=self.execute(self.cmd('version',self.path_repos)) - self.assert_(fd.read().strip()=="1") - self.assertSuccess(fd) - # Output/verify the source of version 1 - fd=self.execute(self.cmd('source',1,'--repository=%s'%self.path_repos)) - result=fd.read() - self.assertSuccess(fd) - self.assert_(result.strip()==source.strip()) - # We can also send the source to a file... test that too - self.assertSuccess(self.cmd('source',1,filename,'--repository=%s'%self.path_repos)) - self.assert_(os.path.exists(filename)) - fd=open(filename) - result=fd.read() - self.assert_(result.strip()==source.strip()) -class TestShellDatabase(Shell,fixture.DB): + filename = '%s/versions/001_Desc.py' % self.path_repos + source = open(filename).read() + self.assert_(source.find('def upgrade') >= 0) + + # Version is now 1 + fd = self.execute(self.cmd('version', self.path_repos)) + self.assert_(fd.read().strip() == "1") + self.assertSuccess(fd) + + # Output/verify the source of version 1 + fd = self.execute(self.cmd('source', 1, '--repository=%s' % self.path_repos)) + result = fd.read() + self.assertSuccess(fd) + self.assert_(result.strip() == source.strip()) + + # We can also send the source to a file... test that too + self.assertSuccess(self.cmd('source', 1, filename, '--repository=%s'%self.path_repos)) + self.assert_(os.path.exists(filename)) + fd = open(filename) + result = fd.read() + self.assert_(result.strip() == source.strip()) + +class TestShellDatabase(Shell, fixture.DB): """Commands associated with a particular database""" # We'll need to clean up after ourself, since the shell creates its own txn; # we need to connect to the DB to see if things worked - level=fixture.DB.CONNECT + + level = fixture.DB.CONNECT @fixture.usedb() def test_version_control(self): """Ensure we can set version control on a database""" - path_repos=repos=self.tmp_repos() - self.assertSuccess(self.cmd('create',path_repos,'repository_name')) - self.exitcode(self.cmd('drop_version_control',self.url,path_repos)) - self.assertSuccess(self.cmd('version_control',self.url,path_repos)) + path_repos = repos = self.tmp_repos() + self.assertSuccess(self.cmd('create', path_repos, 'repository_name')) + self.exitcode(self.cmd('drop_version_control', self.url, path_repos)) + self.assertSuccess(self.cmd('version_control', self.url, path_repos)) + # Clean up self.assertSuccess(self.cmd('drop_version_control',self.url,path_repos)) # Attempting to drop vc from a database without it should fail @@ -217,10 +243,11 @@ class TestShellDatabase(Shell,fixture.DB): @fixture.usedb() def test_wrapped_kwargs(self): """Commands with default arguments set by manage.py""" - path_repos=repos=self.tmp_repos() + path_repos = repos = self.tmp_repos() self.assertSuccess(self.cmd('create', 'repository_name'), repository=path_repos) self.exitcode(self.cmd('drop_version_control'), url=self.url, repository=path_repos) self.assertSuccess(self.cmd('version_control'), url=self.url, repository=path_repos) + # Clean up self.assertSuccess(self.cmd('drop_version_control'), url=self.url, repository=path_repos) # Attempting to drop vc from a database without it should fail @@ -229,26 +256,31 @@ class TestShellDatabase(Shell,fixture.DB): @fixture.usedb() def test_version_control_specified(self): """Ensure we can set version control to a particular version""" - path_repos=self.tmp_repos() - self.assertSuccess(self.cmd('create',path_repos,'repository_name')) - self.exitcode(self.cmd('drop_version_control',self.url,path_repos)) + path_repos = self.tmp_repos() + self.assertSuccess(self.cmd('create', path_repos, 'repository_name')) + self.exitcode(self.cmd('drop_version_control', self.url, path_repos)) + # Fill the repository path_script = self.tmp_py() - version=1 + version = 1 for i in range(version): self.assertSuccess(self.cmd('script', '--repository=%s' % path_repos, 'Desc')) + # Repository version is correct - fd=self.execute(self.cmd('version',path_repos)) - self.assertEquals(fd.read().strip(),str(version)) + fd = self.execute(self.cmd('version', path_repos)) + self.assertEquals(fd.read().strip(), str(version)) self.assertSuccess(fd) + # Apply versioning to DB - self.assertSuccess(self.cmd('version_control',self.url,path_repos,version)) + self.assertSuccess(self.cmd('version_control', self.url, path_repos, version)) + # Test version number - fd=self.execute(self.cmd('db_version',self.url,path_repos)) - self.assertEquals(fd.read().strip(),str(version)) + fd = self.execute(self.cmd('db_version', self.url, path_repos)) + self.assertEquals(fd.read().strip(), str(version)) self.assertSuccess(fd) + # Clean up - self.assertSuccess(self.cmd('drop_version_control',self.url,path_repos)) + self.assertSuccess(self.cmd('drop_version_control', self.url, path_repos)) @fixture.usedb() def test_upgrade(self): @@ -256,69 +288,72 @@ class TestShellDatabase(Shell,fixture.DB): # Create a repository repos_name = 'repos_name' repos_path = self.tmp() - self.assertSuccess(self.cmd('create',repos_path,repos_name)) - self.assertEquals(self.cmd_version(repos_path),0) + self.assertSuccess(self.cmd('create', repos_path,repos_name)) + self.assertEquals(self.cmd_version(repos_path), 0) + # Version the DB - self.exitcode(self.cmd('drop_version_control',self.url,repos_path)) - self.assertSuccess(self.cmd('version_control',self.url,repos_path)) + self.exitcode(self.cmd('drop_version_control', self.url, repos_path)) + self.assertSuccess(self.cmd('version_control', self.url, repos_path)) # Upgrades with latest version == 0 - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) - self.assertSuccess(self.cmd('upgrade',self.url,repos_path)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) - self.assertSuccess(self.cmd('upgrade',self.url,repos_path,0)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) - self.assertFailure(self.cmd('upgrade',self.url,repos_path,1)) - self.assertFailure(self.cmd('upgrade',self.url,repos_path,-1)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) + self.assertSuccess(self.cmd('upgrade', self.url, repos_path)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) + self.assertSuccess(self.cmd('upgrade', self.url, repos_path, 0)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) + self.assertFailure(self.cmd('upgrade', self.url, repos_path, 1)) + self.assertFailure(self.cmd('upgrade', self.url, repos_path, -1)) # Add a script to the repository; upgrade the db self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc')) - self.assertEquals(self.cmd_version(repos_path),1) + self.assertEquals(self.cmd_version(repos_path), 1) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) - self.assertSuccess(self.cmd('upgrade',self.url,repos_path)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),1) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) + self.assertSuccess(self.cmd('upgrade', self.url, repos_path)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 1) # Downgrade must have a valid version specified - self.assertFailure(self.cmd('downgrade',self.url, repos_path)) - self.assertFailure(self.cmd('downgrade',self.url, repos_path, '-1', 2)) - #self.assertFailure(self.cmd('downgrade',self.url, repos_path, '1', 2)) - self.assertEquals(self.cmd_db_version(self.url, repos_path),1) + self.assertFailure(self.cmd('downgrade', self.url, repos_path)) + self.assertFailure(self.cmd('downgrade', self.url, repos_path, '-1', 2)) + #self.assertFailure(self.cmd('downgrade', self.url, repos_path, '1', 2)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 1) self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) - self.assertFailure(self.cmd('downgrade',self.url,repos_path,1)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertFailure(self.cmd('downgrade',self.url, repos_path, 1)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) - self.assertSuccess(self.cmd('drop_version_control',self.url,repos_path)) + self.assertSuccess(self.cmd('drop_version_control', self.url, repos_path)) - def _run_test_sqlfile(self,upgrade_script,downgrade_script): + def _run_test_sqlfile(self, upgrade_script, downgrade_script): + # TODO: add test script that checks if db really changed repos_path = self.tmp() repos_name = 'repos' - self.assertSuccess(self.cmd('create',repos_path,repos_name)) - self.exitcode(self.cmd('drop_version_control',self.url,repos_path)) - self.assertSuccess(self.cmd('version_control',self.url,repos_path)) - self.assertEquals(self.cmd_version(repos_path),0) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertSuccess(self.cmd('create', repos_path, repos_name)) + self.exitcode(self.cmd('drop_version_control', self.url, repos_path)) + self.assertSuccess(self.cmd('version_control', self.url, repos_path)) + self.assertEquals(self.cmd_version(repos_path), 0) + self.assertEquals(self.cmd_db_version(self.url,repos_path), 0) - beforeCount = len(os.listdir(os.path.join(repos_path,'versions'))) # hmm, this number changes sometimes based on running from svn + beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos_path, 'postgres')) - self.assertEquals(self.cmd_version(repos_path),1) + self.assertEquals(self.cmd_version(repos_path), 1) self.assertEquals(len(os.listdir(os.path.join(repos_path,'versions'))), beforeCount + 2) + open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script) open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) self.assertRaises(Exception, self.engine.text('select * from t_table').execute) - self.assertSuccess(self.cmd('upgrade',self.url,repos_path)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),1) + self.assertSuccess(self.cmd('upgrade', self.url,repos_path)) + self.assertEquals(self.cmd_db_version(self.url,repos_path), 1) self.engine.text('select * from t_table').execute() - self.assertSuccess(self.cmd('downgrade',self.url,repos_path,0)) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0)) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) self.assertRaises(Exception, self.engine.text('select * from t_table').execute) # The tests below are written with some postgres syntax, but the stuff @@ -335,7 +370,7 @@ class TestShellDatabase(Shell,fixture.DB): drop table t_table; """ self.meta.drop_all() - self._run_test_sqlfile(upgrade_script,downgrade_script) + self._run_test_sqlfile(upgrade_script, downgrade_script) @fixture.usedb(supported='postgres') @@ -358,17 +393,17 @@ class TestShellDatabase(Shell,fixture.DB): repos_name = 'repos_name' repos_path = self.tmp() - self.assertSuccess(self.cmd('create',repos_path,repos_name)) - self.exitcode(self.cmd('drop_version_control',self.url,repos_path)) - self.assertSuccess(self.cmd('version_control',self.url,repos_path)) - self.assertEquals(self.cmd_version(repos_path),0) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertSuccess(self.cmd('create', repos_path, repos_name)) + self.exitcode(self.cmd('drop_version_control', self.url, repos_path)) + self.assertSuccess(self.cmd('version_control', self.url, repos_path)) + self.assertEquals(self.cmd_version(repos_path), 0) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) # Empty script should succeed self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc')) - self.assertSuccess(self.cmd('test',repos_path,self.url)) - self.assertEquals(self.cmd_version(repos_path),1) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertSuccess(self.cmd('test', repos_path, self.url)) + self.assertEquals(self.cmd_version(repos_path), 1) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) # Error script should fail script_path = self.tmp_py() @@ -384,12 +419,13 @@ class TestShellDatabase(Shell,fixture.DB): print 'sdfsgf' raise Exception() """.replace("\n ","\n") - file=open(script_path,'w') + file = open(script_path, 'w') file.write(script_text) file.close() - self.assertFailure(self.cmd('test',repos_path,self.url,'blah blah')) - self.assertEquals(self.cmd_version(repos_path),1) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + + self.assertFailure(self.cmd('test', repos_path, self.url, 'blah blah')) + self.assertEquals(self.cmd_version(repos_path), 1) + self.assertEquals(self.cmd_db_version(self.url, repos_path),0) # Nonempty script using migrate_engine should succeed script_path = self.tmp_py() @@ -412,12 +448,12 @@ class TestShellDatabase(Shell,fixture.DB): # Operations to reverse the above upgrade go here. meta.drop_all() """.replace("\n ","\n") - file=open(script_path,'w') + file = open(script_path, 'w') file.write(script_text) file.close() - self.assertSuccess(self.cmd('test',repos_path,self.url)) - self.assertEquals(self.cmd_version(repos_path),1) - self.assertEquals(self.cmd_db_version(self.url,repos_path),0) + self.assertSuccess(self.cmd('test', repos_path, self.url)) + self.assertEquals(self.cmd_version(repos_path), 1) + self.assertEquals(self.cmd_db_version(self.url, repos_path), 0) @fixture.usedb() def test_rundiffs_in_shell(self):