Add support for Federation Protocols

Change-Id: I6b60b8ce6f3650adff4732f06b212da787866183
This commit is contained in:
Mark Chappell 2020-03-23 10:32:40 +01:00
parent 6721498c91
commit 49c3bba906
3 changed files with 246 additions and 0 deletions

View File

@ -16,6 +16,7 @@ from openstack.identity.v3 import application_credential as \
from openstack.identity.v3 import credential as _credential
from openstack.identity.v3 import domain as _domain
from openstack.identity.v3 import endpoint as _endpoint
from openstack.identity.v3 import federation_protocol as _federation_protocol
from openstack.identity.v3 import group as _group
from openstack.identity.v3 import identity_provider as _identity_provider
from openstack.identity.v3 import limit as _limit
@ -1322,6 +1323,153 @@ class Proxy(proxy.Proxy):
user_id=user.id,
ignore_missing=ignore_missing)
def create_federation_protocol(self, idp_id, **attrs):
"""Create a new federation protocol from attributes
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is to be
attached to.
:param dict attrs: Keyword arguments which will be used to create a
:class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`, comprised of the properties on the
FederationProtocol class.
:returns: The results of federation protocol creation
:rtype: :class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
"""
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._create(_federation_protocol.FederationProtocol,
idp_id=idp_id, **attrs)
def delete_federation_protocol(self, idp_id, protocol,
ignore_missing=True):
"""Delete a federation protocol
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is attached to.
Can be None if protocol is a
:class:`~openstack.identity.v3.federation_protocol.
FederationProtocol` instance.
:param protocol: The ID of a federation protocol or a
:class:`~openstack.identity.v3.federation_protocol.
FederationProtocol` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be raised
when the federation protocol does not exist. When set to
``True``, no exception will be set when attempting to delete a
nonexistent federation protocol.
:returns: ``None``
"""
cls = _federation_protocol.FederationProtocol
if idp_id is None and isinstance(protocol, cls):
idp_id = protocol.idp_id
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
self._delete(cls, protocol,
ignore_missing=ignore_missing, idp_id=idp_id)
def find_federation_protocol(self, idp_id, protocol,
ignore_missing=True):
"""Find a single federation protocol
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is attached to.
:param protocol: The name or ID of a federation protocol.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be raised
when the resource does not exist. When set to ``True``, None will
be returned when attempting to find a nonexistent resource.
:returns: One federation protocol or None
:rtype: :class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
"""
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._find(_federation_protocol.FederationProtocol, protocol,
ignore_missing=ignore_missing, idp_id=idp_id)
def get_federation_protocol(self, idp_id, protocol):
"""Get a single federation protocol
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is attached to.
Can be None if protocol is a
:class:`~openstack.identity.v3.federation_protocol.
:param protocol: The value can be the ID of a federation protocol or a
:class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
instance.
:returns: One federation protocol
:rtype: :class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
cls = _federation_protocol.FederationProtocol
if idp_id is None and isinstance(protocol, cls):
idp_id = protocol.idp_id
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._get(cls, protocol, idp_id=idp_id)
def federation_protocols(self, idp_id, **query):
"""Retrieve a generator of federation protocols
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is attached to.
:param kwargs query: Optional query parameters to be sent to limit
the resources being returned.
:returns: A generator of federation protocol instances.
:rtype: :class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
"""
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._list(_federation_protocol.FederationProtocol,
idp_id=idp_id, **query)
def update_federation_protocol(self, idp_id, protocol, **attrs):
"""Update a federation protocol
:param idp_id: The ID of the identity provider or a
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
representing the identity provider the protocol is attached to.
Can be None if protocol is a
:class:`~openstack.identity.v3.federation_protocol.
:param protocol: Either the ID of a federation protocol or a
:class:`~openstack.identity.v3.federation_protocol.
FederationProtocol` instance.
:attrs kwargs: The attributes to update on the federation protocol
represented by ``value``.
:returns: The updated federation protocol
:rtype: :class:`~openstack.identity.v3.federation_protocol.
FederationProtocol`
"""
cls = _federation_protocol.FederationProtocol
if (idp_id is None) and (isinstance(protocol, cls)):
idp_id = protocol.idp_id
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._update(cls, protocol, idp_id=idp_id, **attrs)
def create_mapping(self, **attrs):
"""Create a new mapping from attributes

View File

@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
class FederationProtocol(resource.Resource):
resource_key = 'protocol'
resources_key = 'protocols'
base_path = '/OS-FEDERATION/identity_providers/%(idp_id)s/protocols'
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
create_exclude_id_from_body = True
create_method = 'PUT'
commit_method = 'PATCH'
_query_mapping = resource.QueryParameters(
'id',
)
# Properties
#: name of the protocol (read only) *Type: string*
name = resource.Body('id')
#: The ID of the identity provider the protocol is attached to.
# *Type: string*
idp_id = resource.URI('idp_id')
#: The definition of the protocol
# *Type: dict*
mapping_id = resource.Body('mapping_id')

View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.tests.unit import base
from openstack.identity.v3 import federation_protocol
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'id': IDENTIFIER,
'idp_id': 'example_idp',
'mapping_id': 'example_mapping',
}
class TestFederationProtocol(base.TestCase):
def test_basic(self):
sot = federation_protocol.FederationProtocol()
self.assertEqual('protocol', sot.resource_key)
self.assertEqual('protocols', sot.resources_key)
self.assertEqual(
'/OS-FEDERATION/identity_providers/%(idp_id)s/protocols',
sot.base_path)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertTrue(sot.create_exclude_id_from_body)
self.assertEqual('PATCH', sot.commit_method)
self.assertEqual('PUT', sot.create_method)
self.assertDictEqual(
{
'id': 'id',
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
def test_make_it(self):
sot = federation_protocol.FederationProtocol(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['idp_id'], sot.idp_id)
self.assertEqual(EXAMPLE['mapping_id'], sot.mapping_id)