Adding in integration tests
blueprint integration-tests Change-Id: I3f7c8200a03ea60bca23563b49de8fd85126bf8d
This commit is contained in:
parent
ee9d9d2b3b
commit
e77bbe5e97
@ -73,28 +73,19 @@ class XferController(object):
|
||||
self.conf = conf
|
||||
|
||||
def _xfer_from_db(self, xfer_id, owner):
|
||||
try:
|
||||
return self.db_con.lookup_xfer_request_by_id(
|
||||
xfer_id, owner=owner)
|
||||
except exceptions.StaccatoNotFoundInDBException, db_ex:
|
||||
raise webob.exc.HTTPNotFound(explanation="No such ID %s" % xfer_id,
|
||||
content_type="text/plain")
|
||||
|
||||
def _to_state_machine(self, event, xfer_request, name):
|
||||
try:
|
||||
self.sm.event_occurred(event,
|
||||
xfer_request=xfer_request,
|
||||
db=self.db_con)
|
||||
except exceptions.StaccatoInvalidStateTransitionException, ex:
|
||||
msg = _('You cannot %s a transfer that is in the %s '
|
||||
'state. %s' % (name, xfer_request.state, ex))
|
||||
self._log_request(logging.INFO, msg)
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg,
|
||||
content_type="text/plain")
|
||||
|
||||
@utils.StaccatoErrorToHTTP('Create a new transfer', LOG)
|
||||
def newtransfer(self, request, source_url, destination_url, owner,
|
||||
source_options=None, destination_options=None,
|
||||
start_offset=0, end_offset=None):
|
||||
|
||||
srcurl_parts = urlparse.urlparse(source_url)
|
||||
dsturl_parts = urlparse.urlparse(destination_url)
|
||||
|
||||
@ -129,40 +120,29 @@ class XferController(object):
|
||||
dest_opts=dstopts)
|
||||
return xfer
|
||||
|
||||
@utils.StaccatoErrorToHTTP('Check the status', LOG)
|
||||
def status(self, request, xfer_id, owner):
|
||||
xfer = self._xfer_from_db(xfer_id, owner)
|
||||
return xfer
|
||||
|
||||
def list(self, request, owner):
|
||||
return self.db_con.lookup_xfer_request_all(owner=owner)
|
||||
|
||||
def _xfer_to_dict(self, x):
|
||||
d = {}
|
||||
d['id'] = x.id
|
||||
d['srcurl'] = x.srcurl
|
||||
d['dsturl'] = x.dsturl
|
||||
d['state'] = x.state
|
||||
d['progress'] = x.next_ndx
|
||||
return d
|
||||
@utils.StaccatoErrorToHTTP('List transfers', LOG)
|
||||
def list(self, request, owner, limit=None):
|
||||
return self.db_con.lookup_xfer_request_all(owner=owner, limit=limit)
|
||||
|
||||
@utils.StaccatoErrorToHTTP('Delete a transfer', LOG)
|
||||
def delete(self, request, xfer_id, owner):
|
||||
xfer_request = self._xfer_from_db(xfer_id, owner)
|
||||
self._to_state_machine(Events.EVENT_DELETE,
|
||||
xfer_request,
|
||||
'delete')
|
||||
|
||||
@utils.StaccatoErrorToHTTP('Cancel a transfer', LOG)
|
||||
def xferaction(self, request, xfer_id, owner, xferaction, **kwvals):
|
||||
xfer_request = self._xfer_from_db(xfer_id, owner)
|
||||
self._to_state_machine(Events.EVENT_CANCEL,
|
||||
xfer_request,
|
||||
'cancel')
|
||||
|
||||
def _log_request(self, level, msg, ex=None):
|
||||
# reformat the exception with context, user info, etc
|
||||
if ex:
|
||||
self.log.exception(msg)
|
||||
self.log.log(level, msg)
|
||||
|
||||
|
||||
class XferHeaderDeserializer(os_wsgi.RequestHeadersDeserializer):
|
||||
def default(self, request):
|
||||
@ -201,14 +181,6 @@ class XferDeserializer(os_wsgi.JSONDeserializer):
|
||||
request = self._validate(self._from_json(body), _required, _optional)
|
||||
return request
|
||||
|
||||
def status(self, body):
|
||||
request = self._validate(self._from_json(body), [], [])
|
||||
return request
|
||||
|
||||
def delete(self, body):
|
||||
request = self._validate(self._from_json(body), [], [])
|
||||
return request
|
||||
|
||||
def cancel(self, body):
|
||||
_required = ['xferaction']
|
||||
_optional = ['async']
|
||||
|
@ -33,5 +33,3 @@ def main():
|
||||
server.wait()
|
||||
except RuntimeError as e:
|
||||
fail(1, e)
|
||||
|
||||
main()
|
||||
|
@ -45,4 +45,7 @@ class StaccatoDatabaseException(StaccatoBaseException):
|
||||
|
||||
|
||||
class StaccatoNotFoundInDBException(StaccatoDataBaseException):
|
||||
pass
|
||||
|
||||
def __init__(self, ex, unfound_item):
|
||||
super(StaccatoNotFoundInDBException, self).__init__(self, ex)
|
||||
self.unfound_item = unfound_item
|
@ -2,6 +2,8 @@ import logging
|
||||
import re
|
||||
|
||||
from paste import deploy
|
||||
import webob
|
||||
import webob.exc
|
||||
|
||||
from staccato.common import exceptions
|
||||
from staccato.openstack.common import importutils
|
||||
@ -16,6 +18,45 @@ def not_implemented_decorator(func):
|
||||
return call
|
||||
|
||||
|
||||
class StaccatoErrorToHTTP(object):
|
||||
|
||||
def __init__(self, operation, log):
|
||||
self.operation = operation
|
||||
self.log = log
|
||||
|
||||
def __call__(self, func):
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions.StaccatoNotFoundInDBException as ex:
|
||||
msg = _("Failed to %s. %s not found.") % (self.operation,
|
||||
ex.unfound_item)
|
||||
self.log.error(msg)
|
||||
raise webob.exc.HTTPNotFound(explanation=msg,
|
||||
content_type="text/plain")
|
||||
|
||||
except exceptions.StaccatoInvalidStateTransitionException, ex:
|
||||
msg = _('Failed to %s. You cannot %s a transfer that is in '
|
||||
'the %s state. %s' % (self.operation,
|
||||
ex.attempted_event,
|
||||
ex.current_state,
|
||||
ex))
|
||||
self.log.error(msg)
|
||||
raise webob.exc.HTTPBadRequest(explanation=msg,
|
||||
content_type="text/plain")
|
||||
|
||||
except exceptions.StaccatoParameterError as ex:
|
||||
msg = _('Failed to %s. %s' % (self.operation, ex))
|
||||
self.log.error(msg)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
except Exception as ex:
|
||||
msg = _('Failed to %s. %s' % (self.operation, ex))
|
||||
self.log.error(msg)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
return inner
|
||||
|
||||
|
||||
def load_paste_app(app_name, conf_file, conf):
|
||||
try:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -85,11 +85,11 @@ class StaccatoDB(object):
|
||||
xfer_request = query.one()
|
||||
return xfer_request
|
||||
except orm_exc.NoResultFound, nf_ex:
|
||||
raise exceptions.StaccatoNotFoundInDBException(nf_ex)
|
||||
raise exceptions.StaccatoNotFoundInDBException(nf_ex, xfer_id)
|
||||
except Exception, ex:
|
||||
raise exceptions.StaccatoDataBaseException(ex)
|
||||
|
||||
def lookup_xfer_request_all(self, owner=None, session=None):
|
||||
def lookup_xfer_request_all(self, owner=None, session=None, limit=None):
|
||||
try:
|
||||
if session is None:
|
||||
session = self.get_sessions()
|
||||
@ -98,10 +98,12 @@ class StaccatoDB(object):
|
||||
query = session.query(models.XferRequest)
|
||||
if owner is not None:
|
||||
query = query.filter(models.XferRequest.owner == owner)
|
||||
if limit is not None:
|
||||
query = query.limit(limit)
|
||||
xfer_requests = query.all()
|
||||
return xfer_requests
|
||||
except orm_exc.NoResultFound, nf_ex:
|
||||
raise exceptions.StaccatoNotFoundInDBException(nf_ex)
|
||||
raise exceptions.StaccatoNotFoundInDBException(nf_ex, owner)
|
||||
except Exception, ex:
|
||||
raise exceptions.StaccatoDataBaseException(ex)
|
||||
|
||||
@ -182,7 +184,7 @@ def _get_db_object(CONF):
|
||||
engine = sqlalchemy.create_engine(CONF.sql_connection, **engine_args)
|
||||
engine.connect = wrap_db_error(engine.connect, CONF)
|
||||
engine.connect()
|
||||
except Exception, err:
|
||||
except Exception as err:
|
||||
msg = _("Error configuring registry database with supplied "
|
||||
"sql_connection '%s'. "
|
||||
"Got error:\n%s") % (CONF.sql_connection, err)
|
||||
|
1
staccato/tests/integration/__init__.py
Normal file
1
staccato/tests/integration/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
__author__ = 'jbresnah'
|
104
staccato/tests/integration/base.py
Normal file
104
staccato/tests/integration/base.py
Normal file
@ -0,0 +1,104 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os.path
|
||||
|
||||
import json
|
||||
|
||||
import staccato.common.config as config
|
||||
import staccato.common.utils as staccato_utils
|
||||
import staccato.openstack.common.pastedeploy as os_pastedeploy
|
||||
import staccato.tests.utils as test_utils
|
||||
|
||||
TESTING_API_PASTE_CONF = """
|
||||
[pipeline:staccato-api]
|
||||
pipeline = unauthenticated-context rootapp
|
||||
|
||||
# Use this pipeline for keystone auth
|
||||
[pipeline:staccato-api-keystone]
|
||||
pipeline = authtoken context rootapp
|
||||
|
||||
[app:rootapp]
|
||||
use = egg:Paste#urlmap
|
||||
/v1: apiv1app
|
||||
/: apiversions
|
||||
|
||||
[app:apiversions]
|
||||
paste.app_factory = staccato.openstack.common.pastedeploy:app_factory
|
||||
openstack.app_factory = staccato.api.versions:VersionApp
|
||||
|
||||
[app:apiv1app]
|
||||
paste.app_factory = staccato.openstack.common.pastedeploy:app_factory
|
||||
openstack.app_factory = staccato.api.v1.xfer:API
|
||||
|
||||
[filter:unauthenticated-context]
|
||||
paste.filter_factory = staccato.openstack.common.pastedeploy:filter_factory
|
||||
openstack.filter_factory = staccato.api.v1.xfer:UnauthTestMiddleware
|
||||
|
||||
[filter:authtoken]
|
||||
paste.filter_factory = keystoneclient.middleware.auth_token:filter_factory
|
||||
delay_auth_decision = true
|
||||
|
||||
[filter:context]
|
||||
paste.filter_factory = staccato.openstack.common.pastedeploy:filter_factory
|
||||
openstack.filter_factory = staccato.api.v1.xfer:AuthContextMiddleware
|
||||
"""
|
||||
|
||||
|
||||
class ApiTestBase(test_utils.TempFileCleanupBaseTest):
|
||||
def setUp(self):
|
||||
super(ApiTestBase, self).setUp()
|
||||
self.test_dir = self.get_tempdir()
|
||||
self.sql_connection = 'sqlite://'
|
||||
self.conf = config.get_config_object(args=[])
|
||||
self.config(sql_connection=self.sql_connection)
|
||||
self.write_protocol_module_file()
|
||||
self.config(db_auto_create=True)
|
||||
self.needs_database = True
|
||||
|
||||
def get_http_client(self):
|
||||
staccato_api = self._load_paste_app(
|
||||
'staccato-api', TESTING_API_PASTE_CONF, self.conf)
|
||||
return test_utils.Httplib2WsgiAdapter(staccato_api)
|
||||
|
||||
def _load_paste_app(self, name, paste_conf, conf):
|
||||
conf_file_path = os.path.join(self.test_dir, '%s-paste.ini' % name)
|
||||
with open(conf_file_path, 'wb') as conf_file:
|
||||
conf_file.write(paste_conf)
|
||||
conf_file.flush()
|
||||
|
||||
return os_pastedeploy.paste_deploy_app(conf_file_path,
|
||||
name,
|
||||
conf)
|
||||
|
||||
def tearDown(self):
|
||||
super(ApiTestBase, self).tearDown()
|
||||
|
||||
def config(self, **kw):
|
||||
group = kw.pop('group', None)
|
||||
for k, v in kw.iteritems():
|
||||
self.conf.set_override(k, v, group)
|
||||
|
||||
def write_protocol_module_file(self, protocols=None):
|
||||
if protocols is None:
|
||||
protocols = {
|
||||
"file": [{"module": "staccato.protocols.file.FileProtocol"}],
|
||||
"http": [{"module": "staccato.protocols.http.HttpProtocol"}]
|
||||
}
|
||||
temp_file = self.get_tempfile()
|
||||
with open(temp_file, 'w') as fp:
|
||||
json.dump(protocols, fp)
|
||||
|
||||
self.config(protocol_policy=temp_file)
|
||||
return temp_file
|
||||
|
||||
|
234
staccato/tests/integration/test_api.py
Normal file
234
staccato/tests/integration/test_api.py
Normal file
@ -0,0 +1,234 @@
|
||||
import json
|
||||
|
||||
import staccato.tests.integration.base as base
|
||||
|
||||
|
||||
class TestApiNoSchedulerBasicFunctions(base.ApiTestBase):
|
||||
|
||||
def _list_transfers(self, http_client):
|
||||
path = "/v1/transfers"
|
||||
response, content = http_client.request(path, 'GET')
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
return data
|
||||
|
||||
def _cancel_transfer(self, http_client, id):
|
||||
data_json = {'xferaction': 'cancel'}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
path = "/v1/transfers/%s/action" % id
|
||||
return http_client.request(path, 'POST', headers=headers, body=data)
|
||||
|
||||
def _delete_transfer(self, http_client, id):
|
||||
path = "/v1/transfers/%s" % id
|
||||
return http_client.request(path, 'DELETE')
|
||||
|
||||
def _status_transfer(self, http_client, id):
|
||||
path = "/v1/transfers/%s" % id
|
||||
return http_client.request(path, 'GET')
|
||||
|
||||
def _create_xfer(self, http_client, src='file:///etc/group',
|
||||
dst='file:///dev/null'):
|
||||
path = "/v1/transfers"
|
||||
|
||||
data_json = {'source_url': src,
|
||||
'destination_url': dst}
|
||||
data = json.dumps(data_json)
|
||||
|
||||
headers = {'content-type': 'application/json'}
|
||||
response, content = http_client.request(path, 'POST', body=data,
|
||||
headers=headers)
|
||||
return response, content
|
||||
|
||||
def test_get_simple_empty_list(self):
|
||||
http_client = self.get_http_client()
|
||||
data = self._list_transfers(http_client)
|
||||
self.assertEqual([], data)
|
||||
|
||||
def test_simple_create_transfer(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
def test_simple_create_transfer_list(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = self._list_transfers(http_client)
|
||||
self.assertEqual(len(data), 1)
|
||||
|
||||
def test_simple_create_transfer_list_delete(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = self._list_transfers(http_client)
|
||||
self.assertEqual(len(data), 1)
|
||||
response, content = self._delete_transfer(http_client, data[0]['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
data = self._list_transfers(http_client)
|
||||
self.assertEqual(data, [])
|
||||
|
||||
def test_simple_create_transfer_status(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
response, content = self._status_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
data_status = json.loads(content)
|
||||
self.assertEquals(data, data_status)
|
||||
|
||||
def test_delete_unknown(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._delete_transfer(http_client, 'notreal')
|
||||
self.assertEqual(response.status, 404)
|
||||
|
||||
def test_delete_twice(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
response, content = self._delete_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
response, content = self._delete_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 404)
|
||||
|
||||
def test_status_unknown(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._delete_transfer(http_client, 'notreal')
|
||||
self.assertEqual(response.status, 404)
|
||||
|
||||
def test_status_after_delete(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
response, content = self._delete_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
response, content = self._status_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 404)
|
||||
|
||||
def test_create_state(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(data['state'], 'STATE_NEW')
|
||||
|
||||
def test_create_cancel(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
response, content = self._cancel_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
response, content = self._status_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(data['state'], 'STATE_CANCELED')
|
||||
|
||||
def test_create_cancel_delete(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
response, content = self._cancel_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
response, content = self._status_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
response, content = self._delete_transfer(http_client, data['id'])
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
def test_cancel_unknown(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._cancel_transfer(http_client, 'notreal')
|
||||
self.assertEqual(response.status, 404)
|
||||
|
||||
def test_simple_create_bad_source(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client, src="bad_form")
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_simple_create_bad_dest(self):
|
||||
http_client = self.get_http_client()
|
||||
response, content = self._create_xfer(http_client, dst="bad_form")
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_bad_update(self):
|
||||
http_client = self.get_http_client()
|
||||
data_json = {'notaction': 'cancel'}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
path = "/v1/transfers/%s/action" % id
|
||||
response, content = http_client.request(path, 'POST', headers=headers,
|
||||
body=data)
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_bad_action(self):
|
||||
http_client = self.get_http_client()
|
||||
data_json = {'xferaction': 'applesauce'}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
path = "/v1/transfers/%s/action" % id
|
||||
response, content = http_client.request(path, 'POST', headers=headers, body=data)
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_create_url_options(self):
|
||||
path = "/v1/transfers"
|
||||
http_client = self.get_http_client()
|
||||
|
||||
data_json = {'source_url': 'file:///etc/group',
|
||||
'destination_url': 'file:///dev/null',
|
||||
'source_options': {'key': 10},
|
||||
'destination_options': [1, 3, 5]}
|
||||
data = json.dumps(data_json)
|
||||
|
||||
headers = {'content-type': 'application/json'}
|
||||
response, content = http_client.request(path, 'POST', body=data,
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
data_out = json.loads(content)
|
||||
self.assertEqual(data_json['source_options'],
|
||||
data_out['source_options'])
|
||||
self.assertEqual(data_json['destination_options'],
|
||||
data_out['destination_options'])
|
||||
|
||||
def test_create_missing_url(self):
|
||||
path = "/v1/transfers"
|
||||
http_client = self.get_http_client()
|
||||
|
||||
data_json = {'source_url': 'file:///etc/group'}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
response, content = http_client.request(path, 'POST', body=data,
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_create_uknown_option(self):
|
||||
path = "/v1/transfers"
|
||||
http_client = self.get_http_client()
|
||||
|
||||
data_json = {'source_url': 'file:///etc/group',
|
||||
'destination_url': 'file:///dev/zero',
|
||||
'random': 90}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
response, content = http_client.request(path, 'POST', body=data,
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
def test_list_limit(self):
|
||||
http_client = self.get_http_client()
|
||||
for i in range(10):
|
||||
response, content = self._create_xfer(http_client)
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
path = "/v1/transfers"
|
||||
data_json = {'limit': 5}
|
||||
data = json.dumps(data_json)
|
||||
headers = {'content-type': 'application/json'}
|
||||
response, content = http_client.request(path, 'GET', body=data,
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(len(data), 5)
|
@ -1,8 +1,10 @@
|
||||
import tempfile
|
||||
import testtools
|
||||
import staccato.db as db
|
||||
import os
|
||||
import json
|
||||
import webob
|
||||
|
||||
import staccato.db as db
|
||||
|
||||
TEST_CONF = """
|
||||
[DEFAULT]
|
||||
@ -17,8 +19,11 @@ FILE_ONLY_PROTOCOL = {
|
||||
"file": [{"module": "staccato.protocols.file.FileProtocol"}]
|
||||
}
|
||||
|
||||
class BaseTestCase(testtools.TestCase):
|
||||
pass
|
||||
|
||||
class TempFileCleanupBaseTest(testtools.TestCase):
|
||||
|
||||
class TempFileCleanupBaseTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TempFileCleanupBaseTest, self).setUp()
|
||||
@ -54,4 +59,50 @@ class TempFileCleanupBaseTest(testtools.TestCase):
|
||||
pass
|
||||
|
||||
def get_tempfile(self):
|
||||
return tempfile.mkstemp()[1]
|
||||
fname = tempfile.mkstemp()[1]
|
||||
self.files_to_delete.append(fname)
|
||||
return fname
|
||||
|
||||
def get_tempdir(self):
|
||||
return tempfile.mkdtemp()
|
||||
|
||||
|
||||
class Httplib2WsgiAdapter(object):
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
def request(self, uri, method="GET", body=None, headers=None):
|
||||
req = webob.Request.blank(uri, method=method, headers=headers)
|
||||
req.body = body
|
||||
resp = req.get_response(self.app)
|
||||
return Httplib2WebobResponse(resp), resp.body
|
||||
|
||||
|
||||
class Httplib2WebobResponse(object):
|
||||
def __init__(self, webob_resp):
|
||||
self.webob_resp = webob_resp
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self.webob_resp.status_code
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.webob_resp.headers[key]
|
||||
|
||||
def get(self, key):
|
||||
return self.webob_resp.headers[key]
|
||||
|
||||
|
||||
class HttplibWsgiAdapter(object):
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
self.req = None
|
||||
|
||||
def request(self, method, url, body=None, headers={}):
|
||||
self.req = webob.Request.blank(url, method=method, headers=headers)
|
||||
self.req.body = body
|
||||
|
||||
def getresponse(self):
|
||||
response = self.req.get_response(self.app)
|
||||
return FakeHTTPResponse(response.status_code, response.headers,
|
||||
response.body)
|
||||
|
Loading…
x
Reference in New Issue
Block a user