commit
e3b7f64dbc
@ -68,6 +68,7 @@ class Containers(object):
|
||||
orig_token, dest_storage_cnx, dest_storage_url, dest_token,
|
||||
container_name):
|
||||
|
||||
# Is this needed to be done before the head?
|
||||
try:
|
||||
orig_container_stats, orig_objects = swiftclient.get_container(
|
||||
None, orig_token, container_name, http_conn=orig_storage_cnx,
|
||||
@ -76,6 +77,7 @@ class Containers(object):
|
||||
logging.info("ERROR: getting container: %s, %s" % (
|
||||
container_name, e.http_reason))
|
||||
return
|
||||
|
||||
try:
|
||||
swiftclient.head_container(
|
||||
"", dest_token, container_name, http_conn=dest_storage_cnx
|
||||
|
@ -14,10 +14,10 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import uuid
|
||||
import random
|
||||
import datetime
|
||||
import random
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
STORAGE_ORIG = 'http://storage-orig.com'
|
||||
STORAGE_DEST = 'http://storage-dest.com'
|
||||
|
@ -14,51 +14,52 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import swiftclient
|
||||
import logging
|
||||
|
||||
import keystoneclient
|
||||
import swiftclient
|
||||
|
||||
import base as test_base
|
||||
import swsync.accounts
|
||||
from fakes import FakeSWConnection, TENANTS_LIST, STORAGE_ORIG, \
|
||||
STORAGE_DEST, FakeSWClient, FakeKS, CONFIGDICT, CONTAINERS_LIST, \
|
||||
fake_get_config
|
||||
import tests.units.base
|
||||
import tests.units.fakes as fakes
|
||||
|
||||
|
||||
class TestAccount(test_base.TestCase):
|
||||
class TestAccount(tests.units.base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestAccount, self).setUp()
|
||||
self.accounts_cls = swsync.accounts.Accounts()
|
||||
self._stubs()
|
||||
|
||||
def get_account(self, *args, **kwargs):
|
||||
return ({'x-account-container-count': len(CONTAINERS_LIST)},
|
||||
[x[0] for x in CONTAINERS_LIST])
|
||||
return ({'x-account-container-count': len(fakes.CONTAINERS_LIST)},
|
||||
[x[0] for x in fakes.CONTAINERS_LIST])
|
||||
|
||||
def _stubs(self):
|
||||
self.stubs.Set(keystoneclient.v2_0, 'client', FakeKS)
|
||||
self.stubs.Set(swiftclient.client, 'Connection', FakeSWConnection)
|
||||
self.stubs.Set(swsync.accounts, 'get_config', fake_get_config)
|
||||
self.stubs.Set(keystoneclient.v2_0, 'client', fakes.FakeKS)
|
||||
self.stubs.Set(swiftclient.client, 'Connection',
|
||||
fakes.FakeSWConnection)
|
||||
self.stubs.Set(swsync.accounts, 'get_config', fakes.fake_get_config)
|
||||
self.stubs.Set(swiftclient, 'get_account', self.get_account)
|
||||
self.stubs.Set(swiftclient, 'http_connection',
|
||||
FakeSWClient.http_connection)
|
||||
fakes.FakeSWClient.http_connection)
|
||||
|
||||
def test_get_swift_auth(self):
|
||||
tenant_name = 'foo1'
|
||||
ret = self.accounts_cls.get_swift_auth(
|
||||
"http://test.com", tenant_name, "user", "password")
|
||||
tenant_id = TENANTS_LIST[tenant_name]['id']
|
||||
self.assertEquals(ret[0], "%s/v1/AUTH_%s" % (STORAGE_DEST,
|
||||
tenant_id = fakes.TENANTS_LIST[tenant_name]['id']
|
||||
self.assertEquals(ret[0], "%s/v1/AUTH_%s" % (fakes.STORAGE_DEST,
|
||||
tenant_id))
|
||||
|
||||
def test_get_ks_auth_orig(self):
|
||||
_, kwargs = self.accounts_cls.get_ks_auth_orig()()
|
||||
k = CONFIGDICT['auth']['keystone_origin_admin_credentials']
|
||||
k = fakes.CONFIGDICT['auth']['keystone_origin_admin_credentials']
|
||||
tenant_name, username, password = k.split(':')
|
||||
|
||||
self.assertEquals(kwargs['tenant_name'], tenant_name)
|
||||
self.assertEquals(kwargs['username'], username)
|
||||
self.assertEquals(kwargs['password'], password)
|
||||
k = CONFIGDICT['auth']['keystone_origin']
|
||||
k = fakes.CONFIGDICT['auth']['keystone_origin']
|
||||
self.assertEquals(k, kwargs['auth_url'])
|
||||
|
||||
def test_process(self):
|
||||
@ -71,11 +72,12 @@ class TestAccount(test_base.TestCase):
|
||||
ret.append((orig_storage_url, dest_storage_url))
|
||||
self.accounts_cls.sync_account = sync_account
|
||||
self.accounts_cls.process()
|
||||
tenant_list_ids = sorted(TENANTS_LIST[x]['id'] for x in TENANTS_LIST)
|
||||
tenant_list_ids = sorted(fakes.TENANTS_LIST[x]['id']
|
||||
for x in fakes.TENANTS_LIST)
|
||||
ret_orig_storage_id = sorted(
|
||||
x[0][x[0].find('AUTH_') + 5:] for x in ret)
|
||||
self.assertEquals(tenant_list_ids, ret_orig_storage_id)
|
||||
[self.assertTrue(y[1].startswith(STORAGE_DEST)) for y in ret]
|
||||
[self.assertTrue(y[1].startswith(fakes.STORAGE_DEST)) for y in ret]
|
||||
|
||||
def test_sync_account(self):
|
||||
ret = []
|
||||
@ -85,13 +87,60 @@ class TestAccount(test_base.TestCase):
|
||||
ret.append(args)
|
||||
self.accounts_cls.container_cls = Containers()
|
||||
|
||||
tenant_name = TENANTS_LIST.keys()[0]
|
||||
orig_storage_url = "%s/AUTH_%s" % (STORAGE_ORIG,
|
||||
TENANTS_LIST[tenant_name]['id'])
|
||||
dest_storage_url = "%s/AUTH_%s" % (STORAGE_DEST,
|
||||
TENANTS_LIST[tenant_name]['id'])
|
||||
tenant_name = fakes.TENANTS_LIST.keys()[0]
|
||||
tenant_id = fakes.TENANTS_LIST[tenant_name]['id']
|
||||
orig_storage_url = "%s/AUTH_%s" % (fakes.STORAGE_ORIG,
|
||||
tenant_id)
|
||||
dest_storage_url = "%s/AUTH_%s" % (fakes.STORAGE_DEST,
|
||||
tenant_id)
|
||||
self.accounts_cls.sync_account(orig_storage_url, "otoken",
|
||||
dest_storage_url, "dtoken")
|
||||
ret_container_list = sorted(x[7] for x in ret)
|
||||
default_container_list = sorted(x[0]['name'] for x in CONTAINERS_LIST)
|
||||
default_container_list = sorted(x[0]['name']
|
||||
for x in fakes.CONTAINERS_LIST)
|
||||
self.assertEquals(ret_container_list, default_container_list)
|
||||
|
||||
def test_sync_exception_get_account(self):
|
||||
called = []
|
||||
|
||||
def fake_info(self, *args):
|
||||
called.append("called")
|
||||
|
||||
def get_account(*args, **kwargs):
|
||||
raise swiftclient.client.ClientException("TESTED")
|
||||
self.stubs.Set(swiftclient, 'get_account', get_account)
|
||||
self.stubs.Set(logging, 'info', fake_info)
|
||||
self.accounts_cls.sync_account("http://foo", "token",
|
||||
"http://bar", "token2")
|
||||
self.assertTrue(called)
|
||||
|
||||
def test_sync_account_detect_we_need_to_delete_some_stuff(self):
|
||||
# I should get my lazy ass up and just use self.mox stuff
|
||||
ret = []
|
||||
called = []
|
||||
|
||||
class Containers():
|
||||
def delete_container(*args, **kwargs):
|
||||
called.append("TESTED")
|
||||
|
||||
def sync(*args, **kwargs):
|
||||
pass
|
||||
|
||||
self.accounts_cls.container_cls = Containers()
|
||||
|
||||
def get_account(*args, **kwargs):
|
||||
#ORIG
|
||||
if len(ret) == 0:
|
||||
ret.append("TESTED")
|
||||
return ({'x-account-container-count': 1},
|
||||
[{'name': 'foo'}])
|
||||
#DEST
|
||||
else:
|
||||
return ({'x-account-container-count': 2},
|
||||
[{'name': 'foo', 'name': 'bar'}])
|
||||
|
||||
raise swiftclient.client.ClientException("TESTED")
|
||||
self.stubs.Set(swiftclient, 'get_account', get_account)
|
||||
self.accounts_cls.sync_account("http://foo", "token",
|
||||
"http://bar", "token2")
|
||||
self.assertTrue(called)
|
||||
|
@ -14,15 +14,15 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
import urlparse
|
||||
|
||||
import swiftclient
|
||||
|
||||
import swsync.containers
|
||||
|
||||
import base as test_base
|
||||
from fakes import STORAGE_ORIG, STORAGE_DEST, TENANTS_LIST, \
|
||||
CONTAINERS_LIST, CONTAINER_HEADERS, gen_object
|
||||
import tests.units.base as test_base
|
||||
import tests.units.fakes as fakes
|
||||
|
||||
|
||||
class TestContainers(test_base.TestCase):
|
||||
@ -31,11 +31,13 @@ class TestContainers(test_base.TestCase):
|
||||
self.container_cls = swsync.containers.Containers()
|
||||
|
||||
self.tenant_name = 'foo1'
|
||||
self.tenant_id = TENANTS_LIST[self.tenant_name]['id']
|
||||
self.orig_storage_url = '%s/AUTH_%s' % (STORAGE_ORIG, self.tenant_id)
|
||||
self.tenant_id = fakes.TENANTS_LIST[self.tenant_name]['id']
|
||||
self.orig_storage_url = '%s/AUTH_%s' % (fakes.STORAGE_ORIG,
|
||||
self.tenant_id)
|
||||
self.orig_storage_cnx = (urlparse.urlparse(self.orig_storage_url),
|
||||
None)
|
||||
self.dest_storage_url = '%s/AUTH_%s' % (STORAGE_DEST, self.tenant_id)
|
||||
self.dest_storage_url = '%s/AUTH_%s' % (fakes.STORAGE_DEST,
|
||||
self.tenant_id)
|
||||
self.dest_storage_cnx = (urlparse.urlparse(self.dest_storage_url),
|
||||
None)
|
||||
|
||||
@ -49,9 +51,9 @@ class TestContainers(test_base.TestCase):
|
||||
raise swiftclient.client.ClientException('Not Here')
|
||||
|
||||
def get_container(_, token, name, **kwargs):
|
||||
for clist in CONTAINERS_LIST:
|
||||
for clist in fakes.CONTAINERS_LIST:
|
||||
if clist[0]['name'] == name:
|
||||
return (CONTAINER_HEADERS, clist[1])
|
||||
return (fakes.CONTAINER_HEADERS, clist[1])
|
||||
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
self.stubs.Set(swiftclient, 'put_container', put_container)
|
||||
@ -64,6 +66,38 @@ class TestContainers(test_base.TestCase):
|
||||
)
|
||||
self.assertEqual(len(get_cnt_called), 1)
|
||||
|
||||
def test_sync_when_container_nothere_raise_when_cant_create(self):
|
||||
put_cnt_called = []
|
||||
called_info = []
|
||||
|
||||
def fake_info(self, *args):
|
||||
called_info.append("called")
|
||||
self.stubs.Set(logging, 'info', fake_info)
|
||||
|
||||
def put_container(*args, **kwargs):
|
||||
put_cnt_called.append("TESTED")
|
||||
raise swiftclient.client.ClientException('TESTED')
|
||||
|
||||
def get_container(_, token, name, **kwargS):
|
||||
for clist in fakes.CONTAINERS_LIST:
|
||||
if clist[0]['name'] == name:
|
||||
return (fakes.CONTAINER_HEADERS, clist[1])
|
||||
|
||||
def head_container(*args, **kwargs):
|
||||
raise swiftclient.client.ClientException('Not Here')
|
||||
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
self.stubs.Set(swiftclient, 'put_container', put_container)
|
||||
self.stubs.Set(swiftclient, 'head_container', head_container)
|
||||
|
||||
self.container_cls.sync(
|
||||
self.orig_storage_cnx, self.orig_storage_url, 'token',
|
||||
self.dest_storage_cnx, self.dest_storage_url, 'token',
|
||||
'cont1'
|
||||
)
|
||||
self.assertEqual(len(put_cnt_called), 1)
|
||||
self.assertEqual(len(called_info), 1)
|
||||
|
||||
def test_delete_dest(self):
|
||||
# probably need to change that to mox properly
|
||||
get_cnt_called = []
|
||||
@ -82,15 +116,15 @@ class TestContainers(test_base.TestCase):
|
||||
def get_container(*args, **kwargs):
|
||||
# MASTER
|
||||
if not get_cnt_called:
|
||||
cont = CONTAINERS_LIST[0][0]
|
||||
objects = list(CONTAINERS_LIST[0][1])
|
||||
cont = fakes.CONTAINERS_LIST[0][0]
|
||||
objects = list(fakes.CONTAINERS_LIST[0][1])
|
||||
get_cnt_called.append(True)
|
||||
# TARGET
|
||||
else:
|
||||
cont = CONTAINERS_LIST[0][0]
|
||||
objects = list(CONTAINERS_LIST[0][1])
|
||||
cont = fakes.CONTAINERS_LIST[0][0]
|
||||
objects = list(fakes.CONTAINERS_LIST[0][1])
|
||||
# Add an object to target.
|
||||
objects.append(gen_object('NEWOBJ'))
|
||||
objects.append(fakes.gen_object('NEWOBJ'))
|
||||
|
||||
return (cont, objects)
|
||||
|
||||
@ -123,14 +157,14 @@ class TestContainers(test_base.TestCase):
|
||||
def get_container(*args, **kwargs):
|
||||
# MASTER
|
||||
if not get_cnt_called:
|
||||
cont = CONTAINERS_LIST[0][0]
|
||||
objects = list(CONTAINERS_LIST[0][1])
|
||||
objects.append(gen_object('NEWOBJ'))
|
||||
cont = fakes.CONTAINERS_LIST[0][0]
|
||||
objects = list(fakes.CONTAINERS_LIST[0][1])
|
||||
objects.append(fakes.gen_object('NEWOBJ'))
|
||||
get_cnt_called.append(True)
|
||||
# TARGET
|
||||
else:
|
||||
cont = CONTAINERS_LIST[0][0]
|
||||
objects = list(CONTAINERS_LIST[0][1])
|
||||
cont = fakes.CONTAINERS_LIST[0][0]
|
||||
objects = list(fakes.CONTAINERS_LIST[0][1])
|
||||
|
||||
return (cont, objects)
|
||||
|
||||
@ -151,3 +185,88 @@ class TestContainers(test_base.TestCase):
|
||||
'cont1')
|
||||
|
||||
self.assertEqual(sync_object_called[0][-1][1], 'NEWOBJ')
|
||||
|
||||
def test_sync_raise_exceptions_get_container_on_orig(self):
|
||||
called = []
|
||||
|
||||
def get_container(*args, **kwargs):
|
||||
called.append("TESTED")
|
||||
raise swiftclient.client.ClientException("TESTED")
|
||||
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
self.container_cls.sync(
|
||||
self.orig_storage_cnx,
|
||||
self.orig_storage_url,
|
||||
'token',
|
||||
self.dest_storage_cnx,
|
||||
self.dest_storage_url,
|
||||
'token',
|
||||
'cont1')
|
||||
self.assertEqual(len(called), 1)
|
||||
|
||||
def test_sync_raise_exceptions_get_container_on_dest(self):
|
||||
called = []
|
||||
called_on_dest = []
|
||||
|
||||
def get_container(*args, **kwargs):
|
||||
#ORIG
|
||||
if len(called) == 0:
|
||||
called.append("TESTED")
|
||||
return ({}, [{'name': 'PARISESTMAGIQUE',
|
||||
'last_modified': '2010'}])
|
||||
#DEST
|
||||
else:
|
||||
called_on_dest.append("TESTED")
|
||||
raise swiftclient.client.ClientException("TESTED")
|
||||
|
||||
def head_container(*args, **kwargs):
|
||||
pass
|
||||
|
||||
self.stubs.Set(swiftclient, 'head_container', head_container)
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
self.container_cls.sync(
|
||||
self.orig_storage_cnx,
|
||||
self.orig_storage_url,
|
||||
'token',
|
||||
self.dest_storage_cnx,
|
||||
self.dest_storage_url,
|
||||
'token',
|
||||
'cont1')
|
||||
self.assertEqual(len(called_on_dest), 1)
|
||||
self.assertEqual(len(called), 1)
|
||||
|
||||
def test_delete_container(self):
|
||||
delete_called = []
|
||||
orig_containers = [{'name': 'foo'}]
|
||||
dest_containers = [{'name': 'foo'}, {'name': 'bar'}]
|
||||
|
||||
def get_container(*args, **kwargs):
|
||||
return ({}, [{'name': 'PARISESTMAGIQUE', 'last_modified': '2010'}])
|
||||
|
||||
def delete(*args, **kwargs):
|
||||
delete_called.append("TESTED")
|
||||
|
||||
self.container_cls.delete_object = delete
|
||||
self.stubs.Set(swiftclient, 'delete_container', delete)
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
|
||||
self.container_cls.delete_container(
|
||||
"cnx1", "token1", orig_containers, dest_containers)
|
||||
|
||||
self.assertEqual(len(delete_called), 2)
|
||||
|
||||
def test_delete_container_raise_exception(self):
|
||||
called = []
|
||||
orig_containers = [{'name': 'foo'}]
|
||||
dest_containers = [{'name': 'foo'}, {'name': 'bar'}]
|
||||
|
||||
def get_container(*args, **kwargs):
|
||||
called.append("TESTED")
|
||||
raise swiftclient.client.ClientException("TESTED")
|
||||
|
||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||
|
||||
self.container_cls.delete_container(
|
||||
"cnx1", "token1", orig_containers, dest_containers)
|
||||
|
||||
self.assertEqual(len(called), 1)
|
||||
|
@ -20,8 +20,7 @@
|
||||
import unittest
|
||||
|
||||
from middlewares import last_modified as middleware
|
||||
from swift.common.swob import Request
|
||||
from swift.common.swob import Response
|
||||
import swift.common.swob as swob
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
@ -32,8 +31,8 @@ class FakeApp(object):
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
status, headers, body = self.status_headers_body
|
||||
return Response(status=status, headers=headers,
|
||||
body=body)(env, start_response)
|
||||
return swob.Response(status=status, headers=headers,
|
||||
body=body)(env, start_response)
|
||||
|
||||
|
||||
class FakeRequest(object):
|
||||
@ -44,7 +43,7 @@ class FakeRequest(object):
|
||||
class TestLastModifiedMiddleware(unittest.TestCase):
|
||||
|
||||
def _make_request(self, path, **kwargs):
|
||||
req = Request.blank("/v1/AUTH_account/%s" % path, **kwargs)
|
||||
req = swob.Request.blank("/v1/AUTH_account/%s" % path, **kwargs)
|
||||
return req
|
||||
|
||||
def setUp(self):
|
||||
|
@ -14,13 +14,13 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import swiftclient
|
||||
import eventlet
|
||||
import swift
|
||||
from eventlet import sleep, Timeout
|
||||
import swiftclient
|
||||
|
||||
import base as test_base
|
||||
import swsync.objects as swobjects
|
||||
from fakes import STORAGE_ORIG, STORAGE_DEST, TENANTS_LIST
|
||||
import tests.units.base as test_base
|
||||
import tests.units.fakes as fakes
|
||||
|
||||
|
||||
def fake_http_connect(status, body='', headers={}, resp_waitfor=None,
|
||||
@ -31,14 +31,14 @@ def fake_http_connect(status, body='', headers={}, resp_waitfor=None,
|
||||
self.status = status
|
||||
self.body = body
|
||||
if connect_waitfor:
|
||||
sleep(int(connect_waitfor))
|
||||
eventlet.sleep(int(connect_waitfor))
|
||||
|
||||
def getheaders(self):
|
||||
return headers
|
||||
|
||||
def getresponse(self):
|
||||
if resp_waitfor:
|
||||
sleep(int(resp_waitfor))
|
||||
eventlet.sleep(int(resp_waitfor))
|
||||
return self
|
||||
|
||||
def read(self, amt=None):
|
||||
@ -55,9 +55,11 @@ class TestObject(test_base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestObject, self).setUp()
|
||||
self.tenant_name = 'foo1'
|
||||
self.tenant_id = TENANTS_LIST[self.tenant_name]['id']
|
||||
self.orig_storage_url = "%s/AUTH_%s" % (STORAGE_ORIG, self.tenant_id)
|
||||
self.dest_storage_url = "%s/AUTH_%s" % (STORAGE_DEST, self.tenant_id)
|
||||
self.tenant_id = fakes.TENANTS_LIST[self.tenant_name]['id']
|
||||
self.orig_storage_url = "%s/AUTH_%s" % (fakes.STORAGE_ORIG,
|
||||
self.tenant_id)
|
||||
self.dest_storage_url = "%s/AUTH_%s" % (fakes.STORAGE_DEST,
|
||||
self.tenant_id)
|
||||
|
||||
def test_quote(self):
|
||||
utf8_chars = u'\uF10F\uD20D\uB30B\u9409\u8508\u5605\u3703\u1801'
|
||||
@ -157,7 +159,7 @@ class TestObject(test_base.TestCase):
|
||||
new_connect = fake_http_connect(200, connect_waitfor=2)
|
||||
self.stubs.Set(swift.common.bufferedhttp,
|
||||
'http_connect_raw', new_connect)
|
||||
self.assertRaises(Timeout,
|
||||
self.assertRaises(eventlet.Timeout,
|
||||
swobjects.get_object,
|
||||
self.orig_storage_url, "token", "cont1", "obj1",
|
||||
conn_timeout=1)
|
||||
@ -166,7 +168,7 @@ class TestObject(test_base.TestCase):
|
||||
new_connect = fake_http_connect(200, resp_waitfor=2)
|
||||
self.stubs.Set(swift.common.bufferedhttp,
|
||||
'http_connect_raw', new_connect)
|
||||
self.assertRaises(Timeout,
|
||||
self.assertRaises(eventlet.Timeout,
|
||||
swobjects.get_object,
|
||||
self.orig_storage_url, "token", "cont1", "obj1",
|
||||
response_timeout=1)
|
||||
|
@ -15,10 +15,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import ConfigParser
|
||||
from cStringIO import StringIO
|
||||
import cStringIO as StringIO
|
||||
|
||||
import base as test_base
|
||||
import swsync.utils
|
||||
import tests.units.base as test_base
|
||||
|
||||
|
||||
class TestAccount(test_base.TestCase):
|
||||
@ -29,43 +29,43 @@ class TestAccount(test_base.TestCase):
|
||||
swsync.utils.parse_ini, "/tmp/foo")
|
||||
|
||||
def test_parse_ini_bad_file(self):
|
||||
s = StringIO("foo=bar")
|
||||
s = StringIO.StringIO("foo=bar")
|
||||
self.assertRaises(ConfigParser.MissingSectionHeaderError,
|
||||
swsync.utils.parse_ini, s)
|
||||
|
||||
def test_parse_ini(self):
|
||||
s = StringIO("[foo]\nfoo=bar")
|
||||
s = StringIO.StringIO("[foo]\nfoo=bar")
|
||||
self.assertIsInstance(swsync.utils.parse_ini(s),
|
||||
ConfigParser.RawConfigParser)
|
||||
|
||||
def test_get_config(self):
|
||||
s = StringIO("[foo]\nkey=bar")
|
||||
s = StringIO.StringIO("[foo]\nkey=bar")
|
||||
cfg = swsync.utils.parse_ini(s)
|
||||
self.assertEqual(swsync.utils.get_config('foo', 'key', _config=cfg),
|
||||
'bar')
|
||||
|
||||
def test_get_config_no_section(self):
|
||||
s = StringIO("[pasla]\nkey=bar")
|
||||
s = StringIO.StringIO("[pasla]\nkey=bar")
|
||||
cfg = swsync.utils.parse_ini(s)
|
||||
self.assertRaises(swsync.utils.ConfigurationError,
|
||||
swsync.utils.get_config,
|
||||
'foo', 'key', _config=cfg)
|
||||
|
||||
def test_get_config_with_default(self):
|
||||
s = StringIO("[foo]\n")
|
||||
s = StringIO.StringIO("[foo]\n")
|
||||
cfg = swsync.utils.parse_ini(s)
|
||||
self.assertEqual(swsync.utils.get_config('foo', 'key', default='MEME',
|
||||
_config=cfg),
|
||||
'MEME')
|
||||
|
||||
def test_get_config_auto_parsed(self):
|
||||
s = StringIO("[foo]\nkey=bar")
|
||||
s = StringIO.StringIO("[foo]\nkey=bar")
|
||||
cfg = swsync.utils.parse_ini(s)
|
||||
self.stubs.Set(swsync.utils, 'CONFIG', cfg)
|
||||
self.assertEqual(swsync.utils.get_config('foo', 'key'), 'bar')
|
||||
|
||||
def test_get_config_no_value(self):
|
||||
s = StringIO("[foo]\n")
|
||||
s = StringIO.StringIO("[foo]\n")
|
||||
cfg = swsync.utils.parse_ini(s)
|
||||
self.assertRaises(swsync.utils.ConfigurationError,
|
||||
swsync.utils.get_config,
|
||||
|
Loading…
x
Reference in New Issue
Block a user