Add LDAP test for authentication only

Change-Id: I395bfc2e390fb735f7d4dc1838eaad5a48aa7d0c
This commit is contained in:
Igor Degtiarov 2016-07-06 16:31:38 +03:00
parent 8b80c45f02
commit 98bedc5de3
11 changed files with 291 additions and 23 deletions

View File

@ -17,6 +17,7 @@ from fuelweb_test import logger
from proboscis import asserts
from stacklight_tests import base_test
from stacklight_tests.elasticsearch_kibana.kibana_ui import api as ui_api
from stacklight_tests.elasticsearch_kibana import plugin_settings
@ -34,13 +35,18 @@ class ElasticsearchPluginApi(base_test.PluginApi):
[{'host': self.get_elasticsearch_vip(), 'port': 9200}])
return self._es_client
@property
def kibana_port(self):
def kibana_port(self, admin_role=True):
if self._kibana_port is None:
if self.kibana_protocol == 'http':
self._kibana_port = 80
if admin_role:
if self.kibana_protocol == 'http':
self._kibana_port = 80
else:
self._kibana_port = 443
else:
self._kibana_port = 443
if self.kibana_protocol == 'http':
self._kibana_port = 8000
else:
self._kibana_port = 8443
return self._kibana_port
@property
@ -75,9 +81,15 @@ class ElasticsearchPluginApi(base_test.PluginApi):
else:
return self.helpers.get_vip_address('kibana')
def get_kibana_url(self):
def get_kibana_url(self, admin_role=True, credentials=None):
if credentials:
return "{0}://{1}:{2}@{3}:{4}/".format(
self.kibana_protocol, credentials[0], credentials[1],
self.get_kibana_vip(), self.kibana_port(admin_role)
)
return "{0}://{1}:{2}/".format(
self.kibana_protocol, self.get_kibana_vip(), self.kibana_port)
self.kibana_protocol, self.get_kibana_vip(),
self.kibana_port(admin_role))
def check_plugin_online(self):
elasticsearch_url = self.get_elasticsearch_url()
@ -95,6 +107,20 @@ class ElasticsearchPluginApi(base_test.PluginApi):
self.settings.kibana_password)
)
def check_plugin_ldap(self, authz=False, uadmin=('uadmin', 'uadmin'),
uviewer=('uviewer', 'uviewer')):
"""Check dashboard is available when using LDAP for authentication.
:param authz: adds checking LDAP for authorisation
:type authz: boolean
"""
url_admin = self.get_kibana_url(credentials=uadmin)
url_viewer = self.get_kibana_url(admin_role=(False if authz else True),
credentials=uviewer)
ui_api.check_kibana_ldap(url_admin, uadmin[0], authz)
ui_api.check_kibana_ldap(url_viewer, uviewer[0], authz)
def check_elasticsearch_nodes_count(self, expected_count):
logger.debug("Get information about Elasticsearch nodes")
url = self.get_elasticsearch_url(path='_nodes')

View File

@ -0,0 +1,30 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelweb_test import logger
from proboscis import asserts
from stacklight_tests.elasticsearch_kibana.kibana_ui import pages
from stacklight_tests.helpers.ui_tester import ui_driver
def check_kibana_ldap(kibana_url, user, authz=False):
logger.info("Checking Kibana service at {} with LDAP authorization for {} "
"user".format(kibana_url, user))
expects = {'uadmin': 'Logs', 'uviewer': 'Fatal Error'}
with ui_driver(kibana_url, "Kibana") as driver:
home_page = pages.MainPage(driver)
home_page.is_main_page()
response = home_page.save_dashboard()
asserts.assert_equal(expects[user] if authz else 'Logs', response)

View File

@ -0,0 +1,42 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from selenium.common import exceptions
from selenium.webdriver.common import by
from stacklight_tests.helpers.ui import base_pages
class MainPage(base_pages.PageObject):
_save_button_locator = (
by.By.XPATH, '//button[@aria-label="Save Dashboard"]')
_submit_button_locator = (
by.By.XPATH, '//button[@class="btn btn-primary"][@type="submit"]')
_error_field_locator = (by.By.CLASS_NAME, 'panel-title')
def __init__(self, driver):
super(MainPage, self).__init__(driver)
self._page_title = "Logs - Dashboard - Kibana"
def is_main_page(self):
return (self.is_the_current_page() and
self._is_element_visible(*self._main_menu_locator))
def save_dashboard(self):
self._get_element(*self._save_button_locator).click()
self._get_element(*self._submit_button_locator).click()
try:
self.driver.switch_to.alert.accept()
except exceptions.NoAlertPresentException:
pass
return self._get_element(*self._error_field_locator).text

View File

@ -24,7 +24,7 @@ from stacklight_tests.helpers.ui import ui_settings
@contextlib.contextmanager
def ui_driver(url, wait_element, title):
def ui_driver(url, title, wait_element='/html'):
vdisplay = None
# Start a virtual display server for running the tests headless.
if ui_settings.headless_mode:

View File

@ -150,6 +150,14 @@ class InfluxdbPluginApi(base_test.PluginApi):
def check_grafana_dashboards(self):
ui_api.check_grafana_dashboards(self.get_grafana_url())
def check_plugin_ldap(self, authz=False):
"""Check dashboard is available when using LDAP for authentication.
:param authz: adds checking LDAP for authorisation
:type authz: boolean
"""
ui_api.check_grafana_ldap(self.get_grafana_url(), authz)
def get_nova_instance_creation_time_metrics(self, time_point=None):
"""Gets instance creation metrics for provided interval

View File

@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelweb_test import logger
from proboscis import asserts
from stacklight_tests.helpers.ui_tester import ui_driver
@ -22,7 +22,7 @@ def check_grafana_dashboards(grafana_url):
login_key_xpath = '/html/body/div/div[2]/div/div/div[2]/form/div[2]/button'
with ui_driver(grafana_url, login_key_xpath, "Grafana") as driver:
with ui_driver(grafana_url, "Grafana", login_key_xpath) as driver:
login_page = pages.LoginPage(driver)
login_page.is_login_page()
home_page = login_page.login("grafana", "grafanapass")
@ -46,3 +46,31 @@ def check_grafana_dashboards(grafana_url):
for name in available_dashboards_names:
dashboard_page = home_page.open_dashboard(name)
dashboard_page.get_back_to_home()
def check_grafana_ldap(grafana_url, authz=False, uadmin=("uadmin", "uadmin"),
uviewer=("uviewer", "uviewer")):
_check_available_menu_items_for_user(uadmin, grafana_url, authz)
_check_available_menu_items_for_user(uviewer, grafana_url, authz)
def _check_available_menu_items_for_user(user, url, authz):
logger.info("Checking Grafana service at {} with LDAP authorization "
"for {} user".format(url, user[0]))
login_key_xpath = '//form[1]//button[1]'
admin_panels = ["Dashboards", "Data Sources", "Plugins"]
viewer_panel = list(admin_panels[1]) if authz else admin_panels
with ui_driver(url, "Grafana", login_key_xpath) as driver:
login_page = pages.LoginPage(driver)
login_page.is_login_page()
home_page = login_page.login(*user)
home_page.is_main_page()
menu_items = [name.text for name in home_page.main_menu_items]
msg = "Not all required panels are available in main menu."
asserts.assert_true(
(admin_panels if ("uadmin" in user)
else viewer_panel) == menu_items,
msg
)

View File

@ -27,16 +27,21 @@ class DashboardPage(base_pages.PageObject):
class MainPage(base_pages.PageObject):
_dropdown_menu_locator = (
by.By.CLASS_NAME,
'top-nav-dashboards-btn')
'navbar-brand-btn')
_dashboards_list_locator = (
by.By.CLASS_NAME,
'search-results-container'
'sidemenu'
)
_dashboard_locator = (by.By.CLASS_NAME,
"search-item-dash-db")
_dashboard_main_locator = (
by.By.CLASS_NAME,
"dropdown"
)
def __init__(self, driver):
super(MainPage, self).__init__(driver)
self._page_title = "Grafana - Home"
@ -54,6 +59,12 @@ class MainPage(base_pages.PageObject):
self.open_dropdown_menu()
return self._get_element(*self._dashboards_list_locator)
@property
def main_menu_items(self):
return self.dashboards_list.find_elements(
*self._dashboard_main_locator
)
@property
def dashboards(self):
return self.dashboards_list.find_elements(*self._dashboard_locator)
@ -77,9 +88,7 @@ class MainPage(base_pages.PageObject):
class LoginPage(base_pages.PageObject):
_login_username_field_locator = (by.By.NAME, 'username')
_login_password_field_locator = (by.By.NAME, 'password')
_login_submit_button_locator = (
by.By.CLASS_NAME,
'login-submit-button-row')
_login_submit_button_locator = (by.By.CLASS_NAME, "btn")
def __init__(self, driver):
super(LoginPage, self).__init__(driver)

View File

@ -60,21 +60,25 @@ class InfraAlertingPluginApi(base_test.PluginApi):
def get_nagios_vip(self):
return self.helpers.get_vip_address('infrastructure_alerting_ui')
def check_plugin_online(self):
def check_plugin_online(self, user=None, password=None):
user = user or self.settings.nagios_user
password = password or self.settings.nagios_password
nagios_url = self.get_nagios_url()
logger.info("Nagios UI is at {}".format(nagios_url))
logger.info("Check that the '{}' user is authorized".format(
self.settings.nagios_user))
self.checkers.check_http_get_response(
nagios_url,
auth=(self.settings.nagios_user, self.settings.nagios_password)
)
logger.info("Check that the '{}' user is authorized".format(user))
self.checkers.check_http_get_response(nagios_url,
auth=(user, password))
logger.info("Check that the Nagios UI requires authentication")
self.checkers.check_http_get_response(
nagios_url, expected_code=401,
auth=(self.settings.nagios_user, 'rogue')
auth=(user, 'rogue')
)
def check_plugin_ldap(self, authz=False):
"""Check dashboard is available when using LDAP for authentication."""
logger.info("Checking Nagios service with LDAP authorization")
self.check_plugin_online(user='uadmin', password='uadmin')
def get_authenticated_nagios_url(self):
return "{0}://{1}:{2}@{3}:{4}".format(self.nagios_protocol,
self.settings.nagios_user,

View File

@ -60,6 +60,7 @@ def import_tests():
from stacklight_tests.toolchain import test_detached_plugins # noqa
from stacklight_tests.toolchain import test_functional # noqa
from stacklight_tests.toolchain import test_https_plugins # noqa
from stacklight_tests.toolchain import test_ldap_plugins # noqa
from stacklight_tests.toolchain import test_network_templates # noqa
from stacklight_tests.toolchain import test_neutron # noqa
from stacklight_tests.toolchain import test_post_install # noqa

View File

@ -0,0 +1,120 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelweb_test.helpers.decorators import log_snapshot_after_test
from fuelweb_test import logger
from proboscis import test
from stacklight_tests.helpers import helpers
from stacklight_tests.toolchain import api
@test(groups=["ldap"])
class TestToolchainLDAP(api.ToolchainApi):
"""Class testing the LMA Toolchain plugins with LDAP for authentication."""
@test(depends_on_groups=['prepare_slaves_3'],
groups=["ldap", "deploy_toolchain_with_ldap", "toolchain", "deploy"])
@log_snapshot_after_test
def deploy_toolchain_with_ldap(self):
"""Install the LMA Toolchain plugins with LDAP integration for
authentication
Scenario:
1. Upload the LMA Toolchain plugins to the master node
2. Install the plugins
3. Create the cluster
4. Enable and configure LDAP for plugin authentication
5. Deploy the cluster
6. Upload install_slapd.sh script on controller node
7. On controller node open the firewall for ports 389 and 636
8. Install and configure the LDAP server
9. Check that LMA Toolchain plugins are running
10. Check plugins are available with LDAP for authentication
Duration 120m
"""
fuel_web = self.helpers.fuel_web
self.env.revert_snapshot("ready_with_3_slaves")
self.prepare_plugins()
self.helpers.create_cluster(name=self.__class__.__name__)
self.activate_plugins()
fuel_web.update_nodes(self.helpers.cluster_id,
self.settings.base_nodes, update_interfaces=True)
plugins_ldap = {
"kibana": (self.ELASTICSEARCH_KIBANA, "(uid=%s)"),
"grafana": (self.INFLUXDB_GRAFANA, "(objectClass=*)"),
"nagios": (self.LMA_INFRASTRUCTURE_ALERTING, "(objectClass=*)")
}
ldap_server = fuel_web.get_nailgun_cluster_nodes_by_roles(
self.helpers.cluster_id, roles=["controller", ],
role_status='pending_roles')[0]['hostname']
for name, plugin in plugins_ldap.iteritems():
self._activate_ldap_plugin(plugin[0], plugin[1], name, ldap_server)
self.helpers.deploy_cluster(self.settings.base_nodes)
ldap_node = fuel_web.get_nailgun_cluster_nodes_by_roles(
self.helpers.cluster_id, roles=["controller", ])[0]
with fuel_web.get_ssh_for_nailgun_node(ldap_node) as remote:
remote.upload(
helpers.get_fixture("ldap/install_slapd.sh"),
"/tmp"
)
remote.check_call(
"bash -x /tmp/install_slapd.sh && iptables -I INPUT "
"-p tcp -m multiport --ports 389,636 -m comment --comment "
"'ldap server' -j ACCEPT", verbose=True
)
self.check_plugins_online()
for plugin in plugins_ldap.values():
plugin[0].check_plugin_ldap()
self.env.make_snapshot("deploy_toolchain_with_ldap", is_make=True)
@staticmethod
def _activate_ldap_plugin(plugin, ufilter, dashboard_name, ldap_server,
authz=False, protocol="ldap"):
"""Activate LDAP option for a plugin."""
name = plugin.get_plugin_settings().name
logger.info(
"Enable LDAP for plugin {0}, authorization {1}, protocol: {2}, "
"user search filter: {3}, ldap server node: {4}".format(
name, authz, protocol, ufilter, ldap_server
)
)
options = {
"ldap_enabled/value": True,
"ldap_protocol_for_{}/value".format(dashboard_name): protocol,
"ldap_servers/value": ldap_server,
"ldap_bind_dn/value": "cn=admin,dc=stacklight,dc=ci",
"ldap_bind_password/value": "admin",
"ldap_user_search_base_dns/value": "dc=stacklight,dc=ci",
"ldap_user_search_filter/value": ufilter,
}
if name in ["elasticsearch_kibana", "lma_infrastructure_alerting"]:
options.update({"ldap_user_attribute/value": "uid"})
plugin.activate_plugin(options=options)