List group roles on project or domain

The patch provides support for listing roles assigned to group
on domain or project.

Change-Id: Idd101409ba6b72342110d1124db759ba66d915d0
This commit is contained in:
Chi Lo 2019-05-03 04:34:05 -07:00
parent d641a3f850
commit ba48667aee
16 changed files with 379 additions and 116 deletions

View File

@ -338,7 +338,7 @@ def add_to_parser(service_sub):
help='force delete groups region',
action="store_true")
# groups roles
# assign groups roles
parser_assign_group_roles = subparsers.add_parser(
'assign_group_roles',
help='[<"X-RANGER-Client" '
@ -352,11 +352,12 @@ def add_to_parser(service_sub):
'datafile', type=argparse.FileType('r'),
help='<data file with group role(s) assignment JSON>')
# unassign group roles
parser_unassign_group_role = subparsers.add_parser(
'unassign_group_role',
help='[<"X-RANGER-Client" '
'header>] <group id> <role> < --customer <customer id> '
'or <--domain <domain_name> >')
'header>] <group id> <role> <--customer <customer id>> '
'or <--domain <domain_name>>')
parser_unassign_group_role.add_argument(
'client', **cli_common.ORM_CLIENT_KWARGS)
parser_unassign_group_role.add_argument(
@ -368,6 +369,24 @@ def add_to_parser(service_sub):
parser_unassign_group_role.add_argument(
'--domain', type=str, help='domain name')
# list group roles
h1 = '[<"X-RANGER-Client" header>]'
h2 = '<group id> <--region <name>> <--customer <customer id>> ' \
'or <--domain <domain_name>>'
parser_list_group_roles = subparsers.add_parser('list_group_roles',
help='%s %s' % (h1, h2))
parser_list_group_roles.add_argument(
'client', **cli_common.ORM_CLIENT_KWARGS)
parser_list_group_roles.add_argument(
'groupid', type=str, help='<groupid id>')
parser_list_group_roles.add_argument(
'--region', type=str, help='region name')
parser_list_group_roles.add_argument(
'--customer', type=str, help='customer id')
parser_list_group_roles.add_argument(
'--domain', type=str, help='domain name')
# groups - add group default users
parser_add_group_default_users = subparsers.add_parser(
'add_group_default_users',
@ -534,6 +553,15 @@ def cmd_details(args):
args.role,
assignment_type,
assignment_value)
elif args.subcmd == 'list_group_roles':
param = ''
if args.region:
param += '%sregion=%s' % (preparm(param), args.region)
if args.customer:
param += '%scustomer=%s' % (preparm(param), args.customer)
if args.domain:
param += '%sdomain=%s' % (preparm(param), args.domain)
return requests.get, 'groups/%s/roles/%s' % (args.groupid, param)
elif args.subcmd == 'add_group_default_users':
return requests.post, 'groups/%s/users' % args.groupid
elif args.subcmd == 'delete_group_default_user':

View File

@ -8,7 +8,7 @@ from orm.services.customer_manager.cms_rest.logger import get_logger
from orm.services.customer_manager.cms_rest.logic.error_base import ErrorStatus
from orm.services.customer_manager.cms_rest.logic.group_logic import GroupLogic
from orm.services.customer_manager.cms_rest.model.GroupModels import \
RoleAssignment, RoleResultWrapper
RoleAssignment, RoleResult, RoleResultWrapper
from orm.services.customer_manager.cms_rest.utils import authentication
LOG = get_logger(__name__)
@ -108,3 +108,29 @@ class RoleController(rest.RestController):
raise err_utils.get_error(request.transaction_id,
status_code=500,
error_details=str(exception))
@wsexpose(RoleResult, str, str, str, str, rest_content_types='json')
def get_all(self, group_id, region=None, customer=None, domain=None):
LOG.info("RoleController - GetRolelist")
authentication.authorize(request, 'groups:get_all_roles')
try:
group_logic = GroupLogic()
result = group_logic.get_group_roles_by_criteria(group_id,
region,
customer,
domain)
return result
except ErrorStatus as exception:
LOG.log_exception("RoleController - Failed to GetRolelist",
exception)
raise err_utils.get_error(request.transaction_id,
status_code=exception.status_code,
message=exception.message)
except Exception as exception:
LOG.log_exception("RoleController - Failed to GetRolelist",
exception)
raise err_utils.get_error(request.transaction_id,
status_code=500,
error_details=str(exception))

View File

@ -376,27 +376,43 @@ class DataManager(object):
def add_groups_role_on_domain(self,
group_id, role_id, region_id, domain):
self.add_groups_role(role_id, group_id)
try:
self.add_groups_role(role_id, group_id)
groups_domain_role = GroupsDomainRole(role_id=role_id,
group_id=group_id,
domain_name=domain,
region_id=region_id)
self.session.add(groups_domain_role)
self.flush()
return groups_domain_role
groups_domain_role = GroupsDomainRole(role_id=role_id,
group_id=group_id,
domain_name=domain,
region_id=region_id)
self.session.add(groups_domain_role)
self.flush()
return groups_domain_role
except ErrorStatus:
message = "Duplicate Entry - GroupsDomainRole record: " \
"group[{0}], region[{1}], domain[{2}], role[{3}]".format(
group_id, region_id, domain, role_id)
raise ErrorStatus(409.2, message)
except Exception:
raise
def add_groups_role_on_customer(self,
group_id, role_id, region_id, customer_id):
self.add_groups_role(role_id, group_id)
try:
self.add_groups_role(role_id, group_id)
groups_customer_role = GroupsCustomerRole(role_id=role_id,
group_id=group_id,
customer_id=customer_id,
region_id=region_id)
self.session.add(groups_customer_role)
self.flush()
return groups_customer_role
groups_customer_role = GroupsCustomerRole(role_id=role_id,
group_id=group_id,
customer_id=customer_id,
region_id=region_id)
self.session.add(groups_customer_role)
self.flush()
return groups_customer_role
except ErrorStatus:
message = "Duplicate Entry - GroupsCustomerRole record: " \
"group[{0}], region[{1}], customer[{2}], role[{3}]".format(
group_id, region_id, customer_id, role_id)
raise ErrorStatus(409.2, message)
except Exception:
raise
def add_groups_user(self,
group_id, user_id, region_id, domain):

View File

@ -40,11 +40,11 @@ class GroupsCustomerRoleRecord:
str(groups_customer_role), exception)
raise
def get_group_customer_role_by_keys(self,
group_uuid,
role_id,
region_name,
customer_uuid):
def get_customer_role_by_keys(self,
group_uuid,
role_id,
region_name,
customer_id):
region_record = RegionRecord(self.session)
region_id = region_record.get_region_id_from_name(region_name)
if region_id is None:
@ -53,18 +53,41 @@ class GroupsCustomerRoleRecord:
region_name))
try:
group = self.session.query(GroupsCustomerRole).filter(
and_, (
GroupsCustomerRole.group_id == group_uuid,
GroupsCustomerRole.customer_id == customer_uuid,
GroupsCustomerRole.region_id == region_id,
GroupsCustomerRole.role_id == role_id))
GroupsCustomerRole.group_id == group_uuid,
GroupsCustomerRole.customer_id == customer_id,
GroupsCustomerRole.region_id == region_id,
GroupsCustomerRole.role_id == role_id)
return group.first()
except Exception as exception:
message = "Failed to get group/project by keys: " \
" group_uuid:%s customer_uuid:%s region_name:%s " \
" group_uuid:%s customer_id:%s region_name:%s " \
" role_id:%s " \
% group_uuid, customer_uuid, region_id, role_id
% group_uuid, customer_id, region_id, role_id
LOG.log_exception(message, exception)
raise
def get_customer_roles_by_criteria(self,
group_uuid,
region_name,
customer_id):
region_record = RegionRecord(self.session)
region_id = region_record.get_region_id_from_name(region_name)
if region_id is None:
raise ValueError(
'region with the region name {0} not found'.format(
region_name))
try:
group = self.session.query(GroupsCustomerRole).filter(
GroupsCustomerRole.group_id == group_uuid,
GroupsCustomerRole.customer_id == customer_id,
GroupsCustomerRole.region_id == region_id)
return group.all()
except Exception as exception:
message = "Failed to get roles by criteria: " \
" group_uuid:%s customer_id:%s region_name:%s " \
% group_uuid, customer_id, region_name
LOG.log_exception(message, exception)
raise
@ -107,11 +130,11 @@ class GroupsCustomerRoleRecord:
self.session.flush()
if result.rowcount == 0:
LOG.warn('customer with the customer id {0} not found'.format(
customer_id))
raise ValueError(
'customer with the customer id {0} not found'.format(
customer_id))
message = "GroupsCustomerRole record not found - group[{0}], " \
"region[{1}], customer[{2}], role[{3}]". format(
group_uuid, region_id, customer_id, role_id)
LOG.warn(message)
raise ValueError(message)
LOG.debug("num records deleted: " + str(result.rowcount))
return result

View File

@ -40,11 +40,11 @@ class GroupsDomainRoleRecord:
str(groups_domain_role), exception)
raise
def get_group_domain_role_by_keys(self,
group_uuid,
region_name,
domain,
role_id):
def get_domain_role_by_keys(self,
group_uuid,
region_name,
domain,
role_id):
region_record = RegionRecord(self.session)
region_id = region_record.get_region_id_from_name(region_name)
if region_id is None:
@ -53,11 +53,10 @@ class GroupsDomainRoleRecord:
region_name))
try:
group = self.session.query(GroupsDomainRole).filter(
and_(
GroupsDomainRole.group_id == group_uuid,
GroupsDomainRole.domain_name == domain,
GroupsDomainRole.region_id == region_id,
GroupsDomainRole.role_id == role_id))
GroupsDomainRole.group_id == group_uuid,
GroupsDomainRole.domain_name == domain,
GroupsDomainRole.region_id == region_id,
GroupsDomainRole.role_id == role_id)
return group.first()
except Exception as exception:
@ -67,6 +66,30 @@ class GroupsDomainRoleRecord:
LOG.log_exception(message, exception)
raise
def get_domain_roles_by_criteria(self,
group_uuid,
region_name,
domain):
region_record = RegionRecord(self.session)
region_id = region_record.get_region_id_from_name(region_name)
if region_id is None:
raise ValueError(
'region with the region name {0} not found'.format(
region_name))
try:
group = self.session.query(GroupsDomainRole).filter(
GroupsDomainRole.group_id == group_uuid,
GroupsDomainRole.domain_name == domain,
GroupsDomainRole.region_id == region_id)
return group.all()
except Exception as exception:
message = "Failed to get roles by criteria: " \
" group_uuid:%s domain:%s region_name:%s " \
% group_uuid, domain, region_name
LOG.log_exception(message, exception)
raise
def get_domain_roles_for_group(self, group_uuid):
groups_domain_roles = []
@ -105,11 +128,11 @@ class GroupsDomainRoleRecord:
self.session.flush()
if result.rowcount == 0:
LOG.warn('domain with domain name {0} not found'.format(
domain))
raise ValueError(
'domain with domain name {0} not found'.format(
domain))
message = "GroupsDomainRole record not found - group[{0}], " \
"region[{1}], domain[{2}], role[{3}]". format(
group_uuid, region_id, domain, role_id)
LOG.warn(message)
raise ValueError(message)
LOG.debug("num records deleted: " + str(result.rowcount))
return result

View File

@ -76,11 +76,10 @@ class GroupsRoleRecord:
self.session.flush()
if result.rowcount == 0:
LOG.warn('role with the role id {0} not found'.format(
role_id))
raise ValueError(
'role with the role id {0} not found'.format(
role_id))
message = "GroupsRole record not found - group[{0}], " \
"role[{1}]".format(group_uuid, role_id)
LOG.warn(message)
raise ValueError(message)
LOG.debug("num records deleted: " + str(result.rowcount))
return result

View File

@ -128,6 +128,11 @@ class Groups(Base, CMSBaseModel):
enabled = True if self.enabled else False
regions = [group_region.to_wsme() for group_region in self.group_regions if
group_region.region_id != -1]
customer_roles = [customer_role.get_role_name() for customer_role in
self.groups_customer_roles]
domain_roles = [domain_role.get_role_name() for domain_role in
self.groups_domain_roles]
roles = sorted(set(customer_roles + domain_roles))
# users = [groups_user.to_wsme() for groups_user in self.groups_users if
# groups_user.region_id == -1]
@ -135,10 +140,12 @@ class Groups(Base, CMSBaseModel):
name=name,
uuid=uuid,
regions=regions,
roles=list(roles),
enabled=enabled,
domain=domain_name)
return result
'''
' GroupsRegion is a DataObject and contains all the fields defined in GroupsRegion
' table record. Defined as SqlAlchemy model map to a table
@ -232,10 +239,6 @@ class GroupsRole(Base, CMSBaseModel):
"group_id": self.group_id
}
def to_wsme(self):
role = GroupWsmeModels.Role(name=self.role.name)
return role
'''
' GroupsCustomerRole is a DataObject and contains all the fields defined in
@ -280,11 +283,8 @@ class GroupsCustomerRole(Base, CMSBaseModel):
"role_name": self.groups_role.role.name
}
def to_wsme(self):
customer = GroupWsmeModels.Customer(customer_uuid=self.customer_id,
group=self.group.name,
role_id=self.role_id)
return customer
def get_role_name(self):
return self.groups_role.role.name
'''
@ -328,6 +328,9 @@ class GroupsDomainRole(Base, CMSBaseModel):
"role_name": self.groups_role.role.name
}
def get_role_name(self):
return self.groups_role.role.name
'''
' GroupsUser is a DataObject and contains all the fields defined in GroupRole

View File

@ -49,6 +49,6 @@
"groups:add_group_default_users": "rule:admin_or_support",
"groups:delete_group_default_user": "rule:admin",
"groups:add_group_region_users": "rule:admin_or_support",
"groups:delete_group_region_user": "rule:admin"
"groups:delete_group_region_user": "rule:admin",
"groups:get_all_roles": "rule:admin_or_support_or_viewer_or_creator"
}

View File

@ -10,15 +10,17 @@ from orm.services.customer_manager.cms_rest.data.data_manager import \
DataManager
from orm.services.customer_manager.cms_rest.logger import get_logger
from orm.services.customer_manager.cms_rest.logic.error_base import (
DuplicateEntryError, ErrorStatus, NotFound)
DuplicateEntryError, ErrorStatus)
from orm.services.customer_manager.cms_rest.model.GroupModels import (
GroupResultWrapper,
GroupSummary,
GroupSummaryResponse,
RegionResultWrapper,
RegionUserResultWrapper,
RoleResult,
RoleResultWrapper,
UserResultWrapper,
RegionUserResultWrapper)
RegionResultWrapper)
from orm.services.customer_manager.cms_rest.rds_proxy import RdsProxy
@ -425,33 +427,26 @@ class GroupLogic(object):
region_id = group_region.region_id
if assignment_type == "domain":
result = domain.remove_domain_role_from_group(
domain.remove_domain_role_from_group(
group_uuid, region_id, assignment_value, role_id)
elif assignment_type == "customer":
customer_id = datamanager.get_customer_id_by_uuid(
assignment_value)
if customer_id is None:
raise ErrorStatus(
404,
"customer uuid [{}] does not exist".format(
assignment_value))
result = customer.remove_customer_role_from_group(
customer.remove_customer_role_from_group(
group_uuid, region_id, customer_id, role_id)
if result.rowcount == 0:
raise NotFound("Record not found for unassignment type: {}"
" value: {} group: {} region id: {} role"
" {} ".format(
assignment_type, assignment_value,
group_uuid, region_id, role_name))
if (not customer.check_groups_customer_role_exist(
role_id, group_uuid) and
not domain.check_groups_domain_role_exist(
role_id, group_uuid)):
result = groups_role.remove_role_from_group(group_uuid,
role_id)
if result.rowcount == 0:
raise NotFound("Record not found for group: {}"
" role: {}".format(group_uuid,
role_name))
groups_role.remove_role_from_group(group_uuid, role_id)
datamanager.flush()
group = group_record.read_group_by_uuid(group_uuid)
@ -464,12 +459,6 @@ class GroupLogic(object):
LOG.info("Role unassgined - type: {} value: {} group: {} "
"role {} ".format(assignment_type, assignment_value,
group_uuid, role_name))
except NotFound as e:
datamanager.rollback()
LOG.log_exception("Failed to unassign role, record not found",
e.message)
raise NotFound("Failed to unassign role, not found - %s" %
e.message)
except Exception as exp:
datamanager.rollback()
raise
@ -706,6 +695,37 @@ class GroupLogic(object):
response.groups.append(groups)
return response
def get_group_roles_by_criteria(
self, group_uuid, region_name, customer_uuid, domain_name):
if region_name is None:
raise ErrorStatus(400, "region must be specified in request "
"uri query.")
if customer_uuid is not None and domain_name is not None:
raise ErrorStatus(400, "customer and domain cannot be used at "
"the same time for query in request uri.")
if customer_uuid is None and domain_name is None:
raise ErrorStatus(400, "customer or domain is required for query "
"in request uri.")
datamanager = DataManager()
customer_id = datamanager.get_customer_id_by_uuid(customer_uuid)
if customer_uuid is not None:
record = datamanager.get_record('groups_customer_role')
sql_roles = record.get_customer_roles_by_criteria(
group_uuid, region_name, customer_id)
else:
record = datamanager.get_record('groups_domain_role')
sql_roles = record.get_domain_roles_by_criteria(
group_uuid, region_name, domain_name)
roles = []
if sql_roles:
roles = [sql_role.groups_role.role.name for sql_role in sql_roles
if sql_role and sql_role.groups_role.role.name]
return RoleResult(roles=roles)
def delete_group_by_uuid(self, group_id):
datamanager = DataManager()

View File

@ -127,10 +127,11 @@ class Group(Model):
enabled = wsme.wsattr(bool, mandatory=True)
regions = wsme.wsattr([Region], mandatory=False)
users = wsme.wsattr([User], mandatory=False)
roles = wsme.wsattr([str], mandatory=False)
def __init__(self, description="", name="", enabled=False, roles=[],
regions=[], users=[], status="", domain='default', uuid=None):
def __init__(self, description="", name="", enabled=False,
regions=[], users=[], status="", domain='default',
uuid=None):
"""Create a new Group.
:param description: Server name
@ -143,6 +144,7 @@ class Group(Model):
self.enabled = enabled
self.regions = regions
self.users = users
self.roles = roles
if uuid is not None:
self.uuid = uuid

View File

@ -1,7 +1,7 @@
from orm.services.customer_manager.cms_rest.data.sql_alchemy \
import models as sql_models
from orm.services.customer_manager.cms_rest.logic.error_base \
import ErrorStatus, NotFound
import ErrorStatus
from orm.services.customer_manager.cms_rest.logic import group_logic
import orm.services.customer_manager.cms_rest.model.GroupModels as models
from orm.tests.unit.cms import FunctionalTest
@ -108,7 +108,7 @@ class TestGroupLogic(FunctionalTest):
def test_get_group_list_by_criteria(self):
logic = group_logic.GroupLogic()
result = logic.get_group_list_by_criteria(None, None, None, None)
logic.get_group_list_by_criteria(None, None, None, None)
self.assertTrue(data_manager_mock.get_record.called)
self.assertTrue(record_mock.get_groups_by_criteria.called)
@ -317,12 +317,46 @@ class TestGroupLogic(FunctionalTest):
self.assertRaises(ErrorStatus, logic.unassign_roles, 'group_uuid',
'role', 'wrong_type', 'customer_id', 'some_trans_id')
def test_unassign_roles_from_group_group_role_not_found(self):
global flow_type
flow_type = 4
def test_get_group_roles_by_criteria_on_customer(self):
logic = group_logic.GroupLogic()
self.assertRaises(NotFound, logic.unassign_roles, 'group_uuid',
'role', 'customer', 'customer_id', 'some_trans_id')
logic.get_group_roles_by_criteria(
'group_uuid', 'region', 'customer', None)
self.assertTrue(data_manager_mock.get_customer_id_by_uuid.called)
self.assertTrue(data_manager_mock.get_record.called)
self.assertTrue(record_mock.get_customer_roles_by_criteria.called)
def test_get_group_roles_by_criteria_on_domain(self):
logic = group_logic.GroupLogic()
logic.get_group_roles_by_criteria(
'group_uuid', 'region', None, 'domain')
self.assertTrue(data_manager_mock.get_customer_id_by_uuid.called)
self.assertTrue(data_manager_mock.get_record.called)
self.assertTrue(record_mock.get_domain_roles_by_criteria.called)
def test_get_group_roles_by_criteria_missing_required_parms(self):
logic = group_logic.GroupLogic()
with self.assertRaises(ErrorStatus) as cm:
logic.get_group_roles_by_criteria('group', None, None, None)
self.assertEqual(cm.exception.status_code, 400)
self.assertIn('region must be specified', cm.exception.message)
def test_get_group_roles_by_criteria_missing_optional_parms(self):
logic = group_logic.GroupLogic()
with self.assertRaises(ErrorStatus) as cm:
logic.get_group_roles_by_criteria('group', 'region', None, None)
self.assertEqual(cm.exception.status_code, 400)
self.assertIn('customer or domain is required', cm.exception.message)
def test_get_group_roles_by_criteria_conflicting_optional_parms(self):
logic = group_logic.GroupLogic()
with self.assertRaises(ErrorStatus) as cm:
logic.get_group_roles_by_criteria(
'group', 'region', 'customer', 'domain')
self.assertEqual(cm.exception.status_code, 400)
self.assertIn('customer and domain cannot be used at the same time',
cm.exception.message)
def get_mock_datamanager():
@ -335,6 +369,8 @@ def get_mock_datamanager():
data_manager_mock = mock.MagicMock()
record_mock = mock.MagicMock()
record_mock.get_groups_by_criteria.return_value = [sql_group]
record_mock.get_customer_roles_by_criteria.return_value = []
record_mock.get_domain_roles_by_criteria.return_value = []
result_mock = mock.Mock()
result_mock.rowcount = 1
@ -376,9 +412,6 @@ def get_mock_datamanager():
record_mock.delete_group_by_uuid.side_effect = SystemError()
elif flow_type == 3:
record_mock.check_groups_customer_role_exist.return_value = True
elif flow_type == 4:
result_mock.rowcount = 0
record_mock.remove_role_from_group.return_value = result_mock
else:
record_mock.read_group_by_uuid.side_effect = SystemError()
record_mock.delete_region_for_group.side_effect = SystemError()

View File

@ -120,6 +120,47 @@ class TestGroupsRoleController(FunctionalTest):
# assert
self.assertEqual(response.status_int, 404)
def test_list_group_roles(self):
# given
requests.get = mock.MagicMock(return_value=ResponseMock(200))
# when
response = self.app.get(
'/v1/orm/groups/{groups id}/roles/?region=region')
# assert
assert group_logic_mock.get_group_roles_by_criteria.called
def test_list_group_roles_fail(self):
# given
requests.get = mock.MagicMock()
roles.GroupLogic.return_error = 1
roles.err_utils.get_error = mock.MagicMock(
return_value=ClientSideError("blabla", 500))
# when
response = self.app.get(
'/v1/orm/groups/{groups id}/roles/?region=region',
expect_errors=True)
# assert
self.assertEqual(response.status_int, 500)
def test_list_group_roles_bad_request(self):
# given
requests.get = mock.MagicMock()
roles.GroupLogic.return_error = 2
roles.err_utils.get_error = mock.MagicMock(
return_value=ClientSideError("blabla", 404))
# when
response = self.app.get(
'/v1/orm/groups/{groups id}/roles/?region=region',
expect_errors=True)
# assert
self.assertEqual(response.status_int, 404)
def get_mock_group_logic():
global group_logic_mock
@ -136,18 +177,25 @@ def get_mock_group_logic():
links={},
created='1')
list_res = GroupModels.RoleResult(roles=[])
group_logic_mock.assign_roles.return_value = res
group_logic_mock.unassign_roles.return_value = res1
group_logic_mock.get_group_roles_by_criteria.return_value = list_res
elif roles.GroupLogic.return_error == 1:
group_logic_mock.assign_roles.side_effect = SystemError()
group_logic_mock.unassign_roles.side_effect = SystemError()
group_logic_mock.get_group_roles_by_criteria.side_effect = \
SystemError()
else:
group_logic_mock.assign_roles.side_effect = ErrorStatus(
status_code=404)
group_logic_mock.unassign_roles.side_effect = ErrorStatus(
status_code=404)
group_logic_mock.get_group_roles_by_criteria.side_effect = ErrorStatus(
status_code=404)
return group_logic_mock

View File

@ -44,6 +44,8 @@ class CmsTests(TestCase):
args.force_delete is False
args.role = 'test_role_name'
args.assignment_value = 'test_role_assignment_value'
args.customer = 'test_customer'
args.domain = None
subcmd_to_result = {
'create_customer': (requests.post, 'customers/',),
@ -97,6 +99,9 @@ class CmsTests(TestCase):
'&contains=%s' % (args.region,
args.starts_with,
args.contains)),
'list_group_roles': (
requests.get, 'groups/%s/roles/?region=%s&customer=%s' % (
args.groupid, args.region, args.customer)),
'add_group_default_users': (
requests.post, 'groups/%s/users' % args.groupid,),
'delete_group_default_user': (

View File

@ -144,3 +144,17 @@ assign_group_roles = {
}
unassign_group_role = _delete
list_group_roles = {
'status_code': [200],
'response_body': {
'type': 'object',
'properties': {
'roles': {
'type': 'array',
'items': {'type': 'string'}
}
},
'required': ['roles']
}
}

View File

@ -70,3 +70,8 @@ class GrpClient(base_client.RangerClientBase):
assignmenet_type,
assignment_value)
return self.delete_request(uri, schema.unassign_group_role)
def list_group_roles(self, group_id, params):
uri = '%s/%s/orm/groups/%s/roles/%s' % (
self.cms_url, self.version, group_id, params)
return self.get_request(uri, schema.list_group_roles)

View File

@ -130,13 +130,13 @@ class TestTempestGrp(grp_base.GrpBaseOrmTest):
test_group_id)
@decorators.idempotent_id('afe5c72f-499b-493f-b61b-68bbaca12b7a')
def test_assign_unassign_role_to_group_on_domain(self):
def test_assign_unassign_list_role_to_group_on_domain(self):
role = {
'roles': ["admin"],
'domain': CONF.ranger.domain
}
# assign role
post_body = [role]
_, body = self.grp_client.assign_group_roles(self.setup_group_id,
*post_body)
@ -144,25 +144,34 @@ class TestTempestGrp(grp_base.GrpBaseOrmTest):
self.assertEqual(body['roles'][0]['domain'], role['domain'])
self.assertEqual(body['roles'][0]['roles'][0], role['roles'][0])
# list role
region_name = [
region['name'] for region in self.setup_group['regions']]
list_filter = "?region=%s&domain=%s" % (region_name[0],
CONF.ranger.domain)
_, body = self.grp_client.list_group_roles(self.setup_group_id,
list_filter)
self.assertEqual(body['roles'], role['roles'])
# unassign role
_, body = self.grp_client.unassign_group_role(self.setup_group_id,
role['roles'][0],
'domain',
role['domain'])
self._wait_for_group_status(self.setup_group_id, 'Success')
# Once the get groups role function is implemented, it will be
# added here to retreive the role and call assert to verfify that
# the role has indeed been unassigned.
self.assertEqual(body, '')
_, body = self.grp_client.list_group_roles(self.setup_group_id,
list_filter)
self.assertEqual(body['roles'], [])
@decorators.idempotent_id('67f5e46e-9267-4cbb-84d6-ee8521370e23')
def test_assign_unassign_role_to_group_on_customer(self):
def test_assign_unassign_list_role_to_group_on_customer(self):
role = {
'roles': ["admin"],
'customer': self.setup_customer_id
}
# assign role
post_body = [role]
_, body = self.grp_client.assign_group_roles(self.setup_group_id,
*post_body)
@ -170,13 +179,22 @@ class TestTempestGrp(grp_base.GrpBaseOrmTest):
self.assertEqual(body['roles'][0]['customer'], role['customer'])
self.assertEqual(body['roles'][0]['roles'][0], role['roles'][0])
# list role
region_name = [
region['name'] for region in self.setup_group['regions']]
list_filter = "?region=%s&customer=%s" % (region_name[0],
self.setup_customer_id)
_, body = self.grp_client.list_group_roles(self.setup_group_id,
list_filter)
self.assertEqual(body['roles'], role['roles'])
# unassign role
_, body = self.grp_client.unassign_group_role(self.setup_group_id,
role['roles'][0],
'customer',
role['customer'])
self._wait_for_group_status(self.setup_group_id, 'Success')
# Once the get groups role function is implemented, it will be
# added here to retreive the role and call assert to verfify that
# the role has indeed been unassigned.
self.assertEqual(body, '')
_, body = self.grp_client.list_group_roles(self.setup_group_id,
list_filter)
self.assertEqual(body['roles'], [])