Allow loading other auth methods in auth_token

Add handlers for loading user specified authentication plugins from the
config file. If an auth plugin is not specified then we fall back to
using the older legacy options.

Change-Id: I4d09a30c0163106dab52062fab6000aaec0efb5d
Closes-Bug: #1372142
Blueprint: pluggable-auth
This commit is contained in:
Jamie Lennox 2014-10-23 17:20:29 +02:00
parent 6ed9ec4194
commit f8b09b565f
2 changed files with 125 additions and 18 deletions

View File

@ -353,6 +353,7 @@ _OPTS = [
_AUTHTOKEN_GROUP = 'keystone_authtoken'
CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_AUTHTOKEN_GROUP)
auth.register_conf_options(CONF, _AUTHTOKEN_GROUP)
_HEADER_TEMPLATE = {
'X%s-Domain-Id': 'domain_id',
@ -833,16 +834,24 @@ class AuthProtocol(object):
_LI('Invalid service token - rejecting request'))
return self._reject_request(env, start_response)
except exceptions.NoMatchingPlugin as e:
msg = _LC('Required auth plugin does not exist. %s') % e
self._LOG.critical(msg)
return self._do_503_error(env, start_response)
except ServiceError as e:
self._LOG.critical(_LC('Unable to obtain admin token: %s'), e)
resp = _MiniResp('Service unavailable', env)
start_response('503 Service Unavailable', resp.headers)
return resp.body
return self._do_503_error(env, start_response)
self._LOG.debug("Received request from %s" % _fmt_msg(env))
return self._call_app(env, start_response)
def _do_503_error(self, env, start_response):
resp = _MiniResp('Service unavailable', env)
start_response('503 Service Unavailable', resp.headers)
return resp.body
def _init_auth_headers(self):
"""Initialize auth header list.
@ -1332,21 +1341,30 @@ class AuthProtocol(object):
timeout=self._conf_get('http_connect_timeout')
))
# NOTE(jamielennox): Loading AuthTokenPlugin here should be exactly
# the same as calling _AuthTokenPlugin.load_from_conf_options(CONF,
# GROUP) however we can't do that because we have to use _conf_get
# to support the paste.ini options.
auth_plugin = _AuthTokenPlugin.load_from_options(
auth_host=self._conf_get('auth_host'),
auth_port=int(self._conf_get('auth_port')),
auth_protocol=self._conf_get('auth_protocol'),
auth_admin_prefix=self._conf_get('auth_admin_prefix'),
admin_user=self._conf_get('admin_user'),
admin_password=self._conf_get('admin_password'),
admin_tenant_name=self._conf_get('admin_tenant_name'),
admin_token=self._conf_get('admin_token'),
identity_uri=self._conf_get('identity_uri'),
log=self._LOG)
# NOTE(jamielennox): The original auth mechanism allowed deployers
# to configure authentication information via paste file. These
# are accessible via _conf_get, however this doesn't work with the
# plugin loading mechanisms. For using auth plugins we only support
# configuring via the CONF file.
auth_plugin = auth.load_from_conf_options(CONF, _AUTHTOKEN_GROUP)
if not auth_plugin:
# NOTE(jamielennox): Loading AuthTokenPlugin here should be
# exactly the same as calling
# _AuthTokenPlugin.load_from_conf_options(CONF, GROUP) however
# we can't do that because we have to use _conf_get to support
# the paste.ini options.
auth_plugin = _AuthTokenPlugin.load_from_options(
auth_host=self._conf_get('auth_host'),
auth_port=int(self._conf_get('auth_port')),
auth_protocol=self._conf_get('auth_protocol'),
auth_admin_prefix=self._conf_get('auth_admin_prefix'),
admin_user=self._conf_get('admin_user'),
admin_password=self._conf_get('admin_password'),
admin_tenant_name=self._conf_get('admin_tenant_name'),
admin_token=self._conf_get('admin_token'),
identity_uri=self._conf_get('identity_uri'),
log=self._LOG)
adap = adapter.Adapter(
sess,

View File

@ -31,6 +31,7 @@ from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
import mock
from oslo.config import fixture as cfg_fixture
from oslo.serialization import jsonutils
from oslo.utils import timeutils
from requests_mock.contrib import fixture as rm_fixture
@ -2505,5 +2506,93 @@ class DefaultAuthPluginTests(testtools.TestCase):
self.assertEqual(token.token_id, plugin.get_token(self.session))
class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
AUTH_URL = 'http://auth.url/prefix'
DISC_URL = 'http://disc.url/prefix'
KEYSTONE_BASE_URL = 'http://keystone.url/prefix'
CRUD_URL = 'http://crud.url/prefix'
# NOTE(jamielennox): use the /v2.0 prefix here because this is what's most
# likely to be in the service catalog and we should be able to ignore it.
KEYSTONE_URL = KEYSTONE_BASE_URL + '/v2.0'
def setUp(self):
super(AuthProtocolLoadingTests, self).setUp()
self.cfg = self.useFixture(cfg_fixture.Config())
def test_loading_password_plugin(self):
# the password options aren't set on config until loading time, but we
# need them set so we can override the values for testing, so force it
opts = auth.get_plugin_options('password')
self.cfg.register_opts(opts, group=auth_token._AUTHTOKEN_GROUP)
project_id = uuid.uuid4().hex
# configure the authentication options
self.cfg.config(auth_plugin='password',
username='testuser',
password='testpass',
auth_url=self.AUTH_URL,
project_id=project_id,
user_domain_id='userdomainid',
group=auth_token._AUTHTOKEN_GROUP)
# admin_token is the token that the service will get back from auth
admin_token_id = uuid.uuid4().hex
admin_token = fixture.V3Token(project_id=project_id)
s = admin_token.add_service('identity', name='keystone')
s.add_standard_endpoints(admin=self.KEYSTONE_URL)
# user_token is the data from the user's inputted token
user_token_id = uuid.uuid4().hex
user_token = fixture.V3Token()
user_token.set_project_scope()
# first touch is to discover the available versions at the auth_url
self.requests.get(self.AUTH_URL,
json=fixture.DiscoveryList(href=self.DISC_URL),
status_code=300)
# then we use the url returned from discovery to actually auth
self.requests.post(self.DISC_URL + '/v3/auth/tokens',
json=admin_token,
headers={'X-Subject-Token': admin_token_id})
# then we do discovery on the URL from the service catalog. In practice
# this is mostly the same URL as before but test the full range.
self.requests.get(self.KEYSTONE_BASE_URL + '/',
json=fixture.DiscoveryList(href=self.CRUD_URL),
status_code=300)
# actually authenticating the user will then use the base url that was
# retrieved from discovery from the service catalog.
self.requests.get(self.CRUD_URL + '/v3/auth/tokens',
request_headers={'X-Subject-Token': user_token_id,
'X-Auth-Token': admin_token_id},
json=user_token)
body = uuid.uuid4().hex
app = auth_token.AuthProtocol(new_app('200 OK', body)(), {})
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = user_token_id
resp = app(req.environ, self.start_fake_response)
self.assertEqual(200, self.response_status)
self.assertEqual(six.b(body), resp[0])
def test_invalid_plugin_503(self):
self.cfg.config(auth_plugin=uuid.uuid4().hex,
group=auth_token._AUTHTOKEN_GROUP)
app = auth_token.AuthProtocol(new_app('200 OK', '')(), {})
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = uuid.uuid4().hex
app(req.environ, self.start_fake_response)
self.assertEqual(503, self.response_status)
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)