241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import sys
|
|
|
|
import six
|
|
import testresources
|
|
from testtools import matchers
|
|
|
|
from keystoneclient import exceptions
|
|
from keystoneclient.tests import client_fixtures
|
|
from keystoneclient.tests import utils as test_utils
|
|
from keystoneclient import utils
|
|
|
|
|
|
class FakeResource(object):
|
|
pass
|
|
|
|
|
|
class FakeManager(object):
|
|
|
|
resource_class = FakeResource
|
|
|
|
resources = {
|
|
'1234': {'name': 'entity_one'},
|
|
'8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0': {'name': 'entity_two'},
|
|
'\xe3\x82\xbdtest': {'name': u'\u30bdtest'},
|
|
'5678': {'name': '9876'}
|
|
}
|
|
|
|
def get(self, resource_id):
|
|
try:
|
|
return self.resources[str(resource_id)]
|
|
except KeyError:
|
|
raise exceptions.NotFound(resource_id)
|
|
|
|
def find(self, name=None):
|
|
if name == '9999':
|
|
# NOTE(morganfainberg): special case that raises NoUniqueMatch.
|
|
raise exceptions.NoUniqueMatch()
|
|
for resource_id, resource in self.resources.items():
|
|
if resource['name'] == str(name):
|
|
return resource
|
|
raise exceptions.NotFound(name)
|
|
|
|
|
|
class FindResourceTestCase(test_utils.TestCase):
|
|
|
|
def setUp(self):
|
|
super(FindResourceTestCase, self).setUp()
|
|
self.manager = FakeManager()
|
|
|
|
def test_find_none(self):
|
|
self.assertRaises(exceptions.CommandError,
|
|
utils.find_resource,
|
|
self.manager,
|
|
'asdf')
|
|
|
|
def test_find_by_integer_id(self):
|
|
output = utils.find_resource(self.manager, 1234)
|
|
self.assertEqual(output, self.manager.resources['1234'])
|
|
|
|
def test_find_by_str_id(self):
|
|
output = utils.find_resource(self.manager, '1234')
|
|
self.assertEqual(output, self.manager.resources['1234'])
|
|
|
|
def test_find_by_uuid(self):
|
|
uuid = '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0'
|
|
output = utils.find_resource(self.manager, uuid)
|
|
self.assertEqual(output, self.manager.resources[uuid])
|
|
|
|
def test_find_by_unicode(self):
|
|
name = '\xe3\x82\xbdtest'
|
|
output = utils.find_resource(self.manager, name)
|
|
self.assertEqual(output, self.manager.resources[name])
|
|
|
|
def test_find_by_str_name(self):
|
|
output = utils.find_resource(self.manager, 'entity_one')
|
|
self.assertEqual(output, self.manager.resources['1234'])
|
|
|
|
def test_find_by_int_name(self):
|
|
output = utils.find_resource(self.manager, 9876)
|
|
self.assertEqual(output, self.manager.resources['5678'])
|
|
|
|
def test_find_no_unique_match(self):
|
|
self.assertRaises(exceptions.CommandError,
|
|
utils.find_resource,
|
|
self.manager,
|
|
9999)
|
|
|
|
|
|
class FakeObject(object):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
|
|
class PrintTestCase(test_utils.TestCase):
|
|
def setUp(self):
|
|
super(PrintTestCase, self).setUp()
|
|
self.old_stdout = sys.stdout
|
|
self.stdout = six.moves.cStringIO()
|
|
self.addCleanup(setattr, self, 'stdout', None)
|
|
sys.stdout = self.stdout
|
|
self.addCleanup(setattr, sys, 'stdout', self.old_stdout)
|
|
|
|
def test_print_list_unicode(self):
|
|
name = six.u('\u540d\u5b57')
|
|
objs = [FakeObject(name)]
|
|
# NOTE(Jeffrey4l) If the text's encode is proper, this method will not
|
|
# raise UnicodeEncodeError exceptions
|
|
utils.print_list(objs, ['name'])
|
|
output = self.stdout.getvalue()
|
|
# In Python 2, output will be bytes, while in Python 3, it will not.
|
|
# Let's decode the value if needed.
|
|
if isinstance(output, six.binary_type):
|
|
output = output.decode('utf-8')
|
|
self.assertIn(name, output)
|
|
|
|
def test_print_dict_unicode(self):
|
|
name = six.u('\u540d\u5b57')
|
|
utils.print_dict({'name': name})
|
|
output = self.stdout.getvalue()
|
|
# In Python 2, output will be bytes, while in Python 3, it will not.
|
|
# Let's decode the value if needed.
|
|
if isinstance(output, six.binary_type):
|
|
output = output.decode('utf-8')
|
|
self.assertIn(name, output)
|
|
|
|
|
|
class TestPositional(test_utils.TestCase):
|
|
|
|
@utils.positional(1)
|
|
def no_vars(self):
|
|
# positional doesn't enforce anything here
|
|
return True
|
|
|
|
@utils.positional(3, utils.positional.EXCEPT)
|
|
def mixed_except(self, arg, kwarg1=None, kwarg2=None):
|
|
# self, arg, and kwarg1 may be passed positionally
|
|
return (arg, kwarg1, kwarg2)
|
|
|
|
@utils.positional(3, utils.positional.WARN)
|
|
def mixed_warn(self, arg, kwarg1=None, kwarg2=None):
|
|
# self, arg, and kwarg1 may be passed positionally, only a warning
|
|
# is emitted
|
|
return (arg, kwarg1, kwarg2)
|
|
|
|
def test_nothing(self):
|
|
self.assertTrue(self.no_vars())
|
|
|
|
def test_mixed_except(self):
|
|
self.assertEqual((1, 2, 3), self.mixed_except(1, 2, kwarg2=3))
|
|
self.assertEqual((1, 2, 3), self.mixed_except(1, kwarg1=2, kwarg2=3))
|
|
self.assertEqual((1, None, None), self.mixed_except(1))
|
|
self.assertRaises(TypeError, self.mixed_except, 1, 2, 3)
|
|
|
|
def test_mixed_warn(self):
|
|
logger_message = six.moves.cStringIO()
|
|
handler = logging.StreamHandler(logger_message)
|
|
handler.setLevel(logging.DEBUG)
|
|
|
|
logger = logging.getLogger(utils.__name__)
|
|
level = logger.getEffectiveLevel()
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(handler)
|
|
|
|
self.addCleanup(logger.removeHandler, handler)
|
|
self.addCleanup(logger.setLevel, level)
|
|
|
|
self.mixed_warn(1, 2, 3)
|
|
|
|
self.assertIn('takes at most 3 positional', logger_message.getvalue())
|
|
|
|
@utils.positional(enforcement=utils.positional.EXCEPT)
|
|
def inspect_func(self, arg, kwarg=None):
|
|
return (arg, kwarg)
|
|
|
|
def test_inspect_positions(self):
|
|
self.assertEqual((1, None), self.inspect_func(1))
|
|
self.assertEqual((1, 2), self.inspect_func(1, kwarg=2))
|
|
self.assertRaises(TypeError, self.inspect_func)
|
|
self.assertRaises(TypeError, self.inspect_func, 1, 2)
|
|
|
|
@utils.positional.classmethod(1)
|
|
def class_method(cls, a, b):
|
|
return (cls, a, b)
|
|
|
|
@utils.positional.method(1)
|
|
def normal_method(self, a, b):
|
|
self.assertIsInstance(self, TestPositional)
|
|
return (self, a, b)
|
|
|
|
def test_class_method(self):
|
|
self.assertEqual((TestPositional, 1, 2), self.class_method(1, b=2))
|
|
self.assertRaises(TypeError, self.class_method, 1, 2)
|
|
|
|
def test_normal_method(self):
|
|
self.assertEqual((self, 1, 2), self.normal_method(1, b=2))
|
|
self.assertRaises(TypeError, self.normal_method, 1, 2)
|
|
|
|
|
|
class HashSignedTokenTestCase(test_utils.TestCase,
|
|
testresources.ResourcedTestCase):
|
|
"""Unit tests for utils.hash_signed_token()."""
|
|
|
|
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
|
|
|
|
def test_default_md5(self):
|
|
"""The default hash method is md5."""
|
|
token = self.examples.SIGNED_TOKEN_SCOPED
|
|
if six.PY3:
|
|
token = token.encode('utf-8')
|
|
token_id_default = utils.hash_signed_token(token)
|
|
token_id_md5 = utils.hash_signed_token(token, mode='md5')
|
|
self.assertThat(token_id_default, matchers.Equals(token_id_md5))
|
|
# md5 hash is 32 chars.
|
|
self.assertThat(token_id_default, matchers.HasLength(32))
|
|
|
|
def test_sha256(self):
|
|
"""Can also hash with sha256."""
|
|
token = self.examples.SIGNED_TOKEN_SCOPED
|
|
if six.PY3:
|
|
token = token.encode('utf-8')
|
|
token_id = utils.hash_signed_token(token, mode='sha256')
|
|
# sha256 hash is 64 chars.
|
|
self.assertThat(token_id, matchers.HasLength(64))
|
|
|
|
|
|
def load_tests(loader, tests, pattern):
|
|
return testresources.OptimisingTestSuite(tests)
|