Takashi Kajinami dff77df8fc Adapt to recent dependencies
This covers the following updates to fix CI currently broken.

* Fix compatibility with tox 4
* Update hacking to 6.1.0
* Clean up python 2 support and bump minimum supported version to 3.8
* Remove six because python 2 support is removed
* Update job template to use the recent tested python versions
* Replace items by prefixItems to fix schema error
* Build documentation by sphinx command
* Remove "Change Log" section from documentation
* Split requirements for documentation build
* Ensure tox is installed in functional tests
* Fix devstack job

Change-Id: I3b9c5b20aca55332c721d34fd4c41c5b886f60c5
2024-11-27 19:35:39 +01:00

102 lines
3.1 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from os_faults.api import base_driver
from os_faults.api import error
from os_faults import utils
class PowerDriver(base_driver.BaseDriver, metaclass=abc.ABCMeta):
@abc.abstractmethod
def supports(self, host):
"""Returns True if host is supported by the power driver"""
@abc.abstractmethod
def poweroff(self, host):
"""Power off host abruptly"""
@abc.abstractmethod
def poweron(self, host):
"""Power on host"""
@abc.abstractmethod
def reset(self, host):
"""Reset host"""
@abc.abstractmethod
def shutdown(self, host):
"""Graceful shutdown host"""
def snapshot(self, host, snapshot_name, suspend=True):
raise NotImplementedError
def revert(self, host, snapshot_name, resume=True):
raise NotImplementedError
class PowerManager(object):
def __init__(self):
self.power_drivers = []
def add_driver(self, driver):
self.power_drivers.append(driver)
def _map_hosts_to_driver(self, hosts):
driver_host_pairs = []
for host in hosts:
for power_driver in self.power_drivers:
if power_driver.supports(host):
driver_host_pairs.append((power_driver, host))
break
else:
raise error.PowerManagementError(
"No supported driver found for host {}".format(host))
return driver_host_pairs
def _run_command(self, cmd, hosts, **kwargs):
driver_host_pairs = self._map_hosts_to_driver(hosts)
tw = utils.ThreadsWrapper()
for driver, host in driver_host_pairs:
kwargs['host'] = host
fn = getattr(driver, cmd)
tw.start_thread(fn, **kwargs)
tw.join_threads()
if tw.errors:
raise error.PowerManagementError(
'There are some errors when working the driver. '
'Please, check logs for more details.')
def poweroff(self, hosts):
self._run_command('poweroff', hosts)
def poweron(self, hosts):
self._run_command('poweron', hosts)
def reset(self, hosts):
self._run_command('reset', hosts)
def shutdown(self, hosts):
self._run_command('shutdown', hosts)
def snapshot(self, hosts, snapshot_name, suspend=True):
self._run_command('snapshot', hosts,
snapshot_name=snapshot_name, suspend=suspend)
def revert(self, hosts, snapshot_name, resume=True):
self._run_command('revert', hosts,
snapshot_name=snapshot_name, resume=resume)