Merge "Adds missing db registry api tests for Tasks"

This commit is contained in:
Jenkins 2014-09-13 18:33:29 +00:00 committed by Gerrit Code Review
commit 6513c58e7f
2 changed files with 39 additions and 28 deletions

View File

@ -1326,11 +1326,11 @@ class TaskTests(test_utils.BaseTestCase):
def setUp(self):
super(TaskTests, self).setUp()
self.owner_id1 = str(uuid.uuid4())
self.owner_id = str(uuid.uuid4())
self.adm_context = context.RequestContext(is_admin=True,
auth_tok='user:user:admin')
self.context = context.RequestContext(
is_admin=False, auth_tok='user:user:user', user=self.owner_id1)
is_admin=False, auth_tok='user:user:user', user=self.owner_id)
self.db_api = db_tests.get_db(self.config)
self.fixtures = self.build_task_fixtures()
db_tests.reset_db(self.db_api)
@ -1373,10 +1373,10 @@ class TaskTests(test_utils.BaseTestCase):
def test_task_get_all_with_filter(self):
for fixture in self.fixtures:
self.db_api.task_create(self.context,
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
import_tasks = self.db_api.task_get_all(self.context,
import_tasks = self.db_api.task_get_all(self.adm_context,
filters={'type': 'import'})
self.assertTrue(import_tasks)
@ -1388,7 +1388,7 @@ class TaskTests(test_utils.BaseTestCase):
def test_task_get_all_as_admin(self):
tasks = []
for fixture in self.fixtures:
task = self.db_api.task_create(self.context,
task = self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks.append(task)
import_tasks = self.db_api.task_get_all(self.adm_context)
@ -1397,28 +1397,28 @@ class TaskTests(test_utils.BaseTestCase):
def test_task_get_all_marker(self):
for fixture in self.fixtures:
self.db_api.task_create(self.context,
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks = self.db_api.task_get_all(self.context, sort_key='id')
tasks = self.db_api.task_get_all(self.adm_context, sort_key='id')
task_ids = [t['id'] for t in tasks]
tasks = self.db_api.task_get_all(self.context, sort_key='id',
tasks = self.db_api.task_get_all(self.adm_context, sort_key='id',
marker=task_ids[0])
self.assertEqual(len(tasks), 2)
def test_task_get_all_limit(self):
for fixture in self.fixtures:
self.db_api.task_create(self.context,
self.db_api.task_create(self.adm_context,
build_task_fixture(**fixture))
tasks = self.db_api.task_get_all(self.context, limit=2)
tasks = self.db_api.task_get_all(self.adm_context, limit=2)
self.assertEqual(2, len(tasks))
# A limit of None should not equate to zero
tasks = self.db_api.task_get_all(self.context, limit=None)
tasks = self.db_api.task_get_all(self.adm_context, limit=None)
self.assertEqual(3, len(tasks))
# A limit of zero should actually mean zero
tasks = self.db_api.task_get_all(self.context, limit=0)
tasks = self.db_api.task_get_all(self.adm_context, limit=0)
self.assertEqual(0, len(tasks))
def test_task_get_all_owned(self):
@ -1459,13 +1459,13 @@ class TaskTests(test_utils.BaseTestCase):
'expires_at': expires_at
}
task = self.db_api.task_create(self.context, fixture)
task = self.db_api.task_create(self.adm_context, fixture)
self.assertIsNotNone(task)
self.assertIsNotNone(task['id'])
task_id = task['id']
task = self.db_api.task_get(self.context, task_id)
task = self.db_api.task_get(self.adm_context, task_id)
self.assertIsNotNone(task)
self.assertEqual(task['id'], task_id)
@ -1504,8 +1504,8 @@ class TaskTests(test_utils.BaseTestCase):
'updated_at': now
}
task1 = self.db_api.task_create(self.context, fixture1)
task2 = self.db_api.task_create(self.context, fixture2)
task1 = self.db_api.task_create(self.adm_context, fixture1)
task2 = self.db_api.task_create(self.adm_context, fixture2)
self.assertIsNotNone(task1)
self.assertIsNotNone(task2)
@ -1513,7 +1513,7 @@ class TaskTests(test_utils.BaseTestCase):
task1_id = task1['id']
task2_id = task2['id']
task_fixtures = {task1_id: fixture1, task2_id: fixture2}
tasks = self.db_api.task_get_all(self.context)
tasks = self.db_api.task_get_all(self.adm_context)
self.assertEqual(len(tasks), 2)
self.assertEqual(set((tasks[0]['id'], tasks[1]['id'])),
@ -1535,7 +1535,7 @@ class TaskTests(test_utils.BaseTestCase):
def test_task_create(self):
task_id = str(uuid.uuid4())
self.context.tenant = str(uuid.uuid4())
self.context.tenant = self.context.owner
values = {
'id': task_id,
'owner': self.context.owner,
@ -1543,7 +1543,7 @@ class TaskTests(test_utils.BaseTestCase):
'status': 'pending',
}
task_values = build_task_fixture(**values)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task['id'], task_id)
self.assertEqual(task['owner'], self.context.owner)
@ -1564,7 +1564,7 @@ class TaskTests(test_utils.BaseTestCase):
'message': None,
}
task_values = build_task_fixture(**values)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task['id'], task_id)
self.assertEqual(task['owner'], self.context.owner)
@ -1579,14 +1579,14 @@ class TaskTests(test_utils.BaseTestCase):
result = {'foo': 'bar'}
task_values = build_task_fixture(owner=self.context.owner,
result=result)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
task_id = task['id']
fixture = {
'status': 'processing',
'message': 'This is a error string',
}
task = self.db_api.task_update(self.context, task_id, fixture)
task = self.db_api.task_update(self.adm_context, task_id, fixture)
self.assertEqual(task['id'], task_id)
self.assertEqual(task['owner'], self.context.owner)
@ -1607,11 +1607,11 @@ class TaskTests(test_utils.BaseTestCase):
input=None,
result=None,
message=None)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
task_id = task['id']
fixture = {'status': 'processing'}
task = self.db_api.task_update(self.context, task_id, fixture)
task = self.db_api.task_update(self.adm_context, task_id, fixture)
self.assertEqual(task['id'], task_id)
self.assertEqual(task['owner'], self.context.owner)
@ -1628,27 +1628,27 @@ class TaskTests(test_utils.BaseTestCase):
def test_task_delete(self):
task_values = build_task_fixture(owner=self.context.owner)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task['deleted'], False)
self.assertIsNone(task['deleted_at'])
task_id = task['id']
self.db_api.task_delete(self.context, task_id)
self.db_api.task_delete(self.adm_context, task_id)
self.assertRaises(exception.TaskNotFound, self.db_api.task_get,
self.context, task_id)
def test_task_delete_as_admin(self):
task_values = build_task_fixture(owner=self.context.owner)
task = self.db_api.task_create(self.context, task_values)
task = self.db_api.task_create(self.adm_context, task_values)
self.assertIsNotNone(task)
self.assertEqual(task['deleted'], False)
self.assertIsNone(task['deleted_at'])
task_id = task['id']
self.db_api.task_delete(self.context, task_id)
self.db_api.task_delete(self.adm_context, task_id)
del_task = self.db_api.task_get(self.adm_context,
task_id,
force_show_deleted=True)

View File

@ -100,3 +100,14 @@ class TestRegistryMetadefDriver(base_metadef.TestMetadefDriver,
def tearDown(self):
self.registry_server.stop()
super(TestRegistryMetadefDriver, self).tearDown()
class TestTasksDriver(base.TaskTests, FunctionalInitWrapper):
def setUp(self):
db_tests.load(get_db, reset_db)
super(TestTasksDriver, self).setUp()
self.addCleanup(db_tests.reset)
def tearDown(self):
self.registry_server.stop()
super(TestTasksDriver, self).tearDown()