pre-commit: Prepare for bump

Add all the automatically-generated changes separately to make the
manual changes needed more obvious.

This was generated bump bumping ruff to the latest version (v0.9.3) and
running:

  pre-commit run -a

Before undoing the changes to '.pre-commit-config.yaml'. The only needed
manual change was the removal of a now-unused 'typing' import.

Change-Id: I8b6ff24311baff77546089541467a87c84a1218d
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2025-02-19 10:46:44 +00:00
parent f8f58f35e0
commit 4beac2a236
38 changed files with 134 additions and 137 deletions

View File

@ -25,8 +25,7 @@ def import_image(conn):
# Url where glance can download the image
uri = (
'https://download.cirros-cloud.net/0.4.0/'
'cirros-0.4.0-x86_64-disk.img'
'https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img'
)
# Build the image attributes and import the image.

View File

@ -19,7 +19,7 @@ import typing as ty
def setup_logging(
name: str,
handlers: ty.Optional[ty.List[logging.Handler]] = None,
handlers: ty.Optional[list[logging.Handler]] = None,
level: ty.Optional[int] = None,
) -> logging.Logger:
"""Set up logging for a named logger.
@ -54,7 +54,7 @@ def enable_logging(
stream: ty.Optional[ty.TextIO] = None,
format_stream: bool = False,
format_template: str = '%(asctime)s %(levelname)s: %(name)s %(message)s',
handlers: ty.Optional[ty.List[logging.Handler]] = None,
handlers: ty.Optional[list[logging.Handler]] = None,
) -> None:
"""Enable logging output.

View File

@ -527,16 +527,14 @@ class Node(_common.Resource):
if service_steps is not None:
if target != 'service':
raise ValueError(
'Service steps can only be provided with '
'"service" target'
'Service steps can only be provided with "service" target'
)
body['service_steps'] = service_steps
if rescue_password is not None:
if target != 'rescue':
raise ValueError(
'Rescue password can only be provided with '
'"rescue" target'
'Rescue password can only be provided with "rescue" target'
)
body['rescue_password'] = rescue_password

View File

@ -38,8 +38,7 @@ class BaseBlockStorageProxy(proxy.Proxy, metaclass=abc.ABCMeta):
volume_obj = self.get_volume(volume)
if not volume_obj:
raise exceptions.SDKException(
f"Volume {volume} given to create_image could "
f"not be found"
f"Volume {volume} given to create_image could not be found"
)
volume_id = volume_obj['id']
data = self.post(

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack import exceptions
from openstack import resource
@ -178,7 +177,7 @@ class Backup(resource.Resource):
:return: Updated backup instance
"""
url = utils.urljoin(self.base_path, self.id, "restore")
body: ty.Dict[str, ty.Dict] = {'restore': {}}
body: dict[str, dict] = {'restore': {}}
if volume_id:
body['restore']['volume_id'] = volume_id
if name:

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.common import metadata
from openstack import format
@ -142,7 +141,7 @@ class Volume(resource.Resource, metadata.MetadataMixin):
self, session, status=None, attach_status=None, migration_status=None
):
"""Reset volume statuses (admin operation)"""
body: ty.Dict[str, ty.Dict[str, str]] = {'os-reset_status': {}}
body: dict[str, dict[str, str]] = {'os-reset_status': {}}
if status:
body['os-reset_status']['status'] = status
if attach_status:

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack import exceptions
from openstack import resource
@ -194,7 +193,7 @@ class Backup(resource.Resource):
:return: Updated backup instance
"""
url = utils.urljoin(self.base_path, self.id, "restore")
body: ty.Dict[str, ty.Dict] = {'restore': {}}
body: dict[str, dict] = {'restore': {}}
if volume_id:
body['restore']['volume_id'] = volume_id
if name:

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.common import metadata
from openstack import exceptions
@ -161,7 +160,7 @@ class Volume(resource.Resource, metadata.MetadataMixin):
self, session, status=None, attach_status=None, migration_status=None
):
"""Reset volume statuses (admin operation)"""
body: ty.Dict[str, ty.Dict[str, str]] = {'os-reset_status': {}}
body: dict[str, dict[str, str]] = {'os-reset_status': {}}
if status:
body['os-reset_status']['status'] = status
if attach_status:

View File

@ -202,7 +202,10 @@ class BaremetalCloudMixin(openstackcloud._OpenStackCloudMixin):
Example::
[{'address': 'aa:bb:cc:dd:ee:01'}, {'address': 'aa:bb:cc:dd:ee:02'}]
[
{'address': 'aa:bb:cc:dd:ee:01'},
{'address': 'aa:bb:cc:dd:ee:02'},
]
Alternatively, you can provide an array of MAC addresses.
:param wait: Boolean value, defaulting to false, to wait for the node

View File

@ -924,8 +924,7 @@ class IdentityCloudMixin(openstackcloud._OpenStackCloudMixin):
dom = self.get_domain(domain)
if not dom:
raise exceptions.SDKException(
f"Creating group {name} failed: Invalid domain "
f"{domain}"
f"Creating group {name} failed: Invalid domain {domain}"
)
group_ref['domain_id'] = dom['id']

View File

@ -1309,8 +1309,7 @@ class NetworkCloudMixin(_network_common.NetworkCommonCloudMixin):
kwargs['is_default'] = default
else:
self.log.debug(
"'qos-default' extension is not available on "
"target cloud"
"'qos-default' extension is not available on target cloud"
)
return self.network.create_qos_policy(**kwargs)
@ -1343,8 +1342,7 @@ class NetworkCloudMixin(_network_common.NetworkCommonCloudMixin):
kwargs['is_default'] = default
else:
self.log.debug(
"'qos-default' extension is not available on "
"target cloud"
"'qos-default' extension is not available on target cloud"
)
if not kwargs:
@ -2552,7 +2550,13 @@ class NetworkCloudMixin(_network_common.NetworkCommonCloudMixin):
:param allowed_address_pairs: Allowed address pairs list (Optional)
For example::
[{"ip_address": "23.23.23.1", "mac_address": "fa:16:3e:c4:cd:3f"}, ...]
[
{
"ip_address": "23.23.23.1",
"mac_address": "fa:16:3e:c4:cd:3f",
},
...,
]
:param extra_dhcp_opts: Extra DHCP options. (Optional).
For example::
@ -2631,7 +2635,13 @@ class NetworkCloudMixin(_network_common.NetworkCommonCloudMixin):
:param allowed_address_pairs: Allowed address pairs list (Optional)
For example::
[{"ip_address": "23.23.23.1", "mac_address": "fa:16:3e:c4:cd:3f"}, ...]
[
{
"ip_address": "23.23.23.1",
"mac_address": "fa:16:3e:c4:cd:3f",
},
...,
]
:param extra_dhcp_opts: Extra DHCP options. (Optional).
For example::

View File

@ -15,7 +15,6 @@ import concurrent.futures
import copy
import functools
import queue
import typing as ty
import warnings
import weakref
@ -492,7 +491,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
:raises: :class:`~openstack.exceptions.SDKException` on invalid range
expressions.
"""
filtered: ty.List[object] = []
filtered: list[object] = []
for key, range_value in filters.items():
# We always want to operate on the full data set so that
@ -697,7 +696,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
for dep in v.get('after', []):
dep_graph.add_edge(dep, k)
cleanup_resources: ty.Dict[str, resource.Resource] = {}
cleanup_resources: dict[str, resource.Resource] = {}
for service in dep_graph.walk(timeout=wait_timeout):
fn = None

View File

@ -105,7 +105,7 @@ class QuotaSet(resource.Resource):
body.pop("self", None)
# Process body_attrs to strip usage and reservation out
normalized_attrs: ty.Dict[str, ty.Any] = dict(
normalized_attrs: dict[str, ty.Any] = dict(
reservation={},
usage={},
)

View File

@ -696,7 +696,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
:param locked_reason: The reason for locking the server.
:returns: None
"""
body: ty.Dict[str, ty.Any] = {"lock": None}
body: dict[str, ty.Any] = {"lock": None}
if locked_reason is not None:
body["lock"] = {
"locked_reason": locked_reason,
@ -724,7 +724,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
provided, the server will use the existing image. (Optional)
:returns: None
"""
body: ty.Dict[str, ty.Any] = {"rescue": {}}
body: dict[str, ty.Any] = {"rescue": {}}
if admin_pass is not None:
body["rescue"]["adminPass"] = admin_pass
if image_ref is not None:
@ -761,7 +761,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
(Optional) (Only supported before microversion 2.14)
:returns: None
"""
body: ty.Dict[str, ty.Any] = {"evacuate": {}}
body: dict[str, ty.Any] = {"evacuate": {}}
if host is not None:
body["evacuate"]["host"] = host
if admin_pass is not None:
@ -855,7 +855,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
"greater."
)
body: ty.Dict[str, ty.Any] = {"migrate": None}
body: dict[str, ty.Any] = {"migrate": None}
if host:
body["migrate"] = {"host": host}
self._action(session, body)
@ -877,7 +877,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
(Optional)
:returns: None
"""
body: ty.Dict[str, ty.Any] = {"os-getConsoleOutput": {}}
body: dict[str, ty.Any] = {"os-getConsoleOutput": {}}
if length is not None:
body["os-getConsoleOutput"]["length"] = length
resp = self._action(session, body)
@ -989,7 +989,7 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
disk_over_commit,
):
microversion = None
body: ty.Dict[str, ty.Any] = {
body: dict[str, ty.Any] = {
'host': None,
}
if block_migration == 'auto':

View File

@ -14,7 +14,6 @@
import copy
import os.path
import typing as ty
from urllib import parse
import warnings
@ -194,7 +193,7 @@ def from_conf(conf, session=None, service_types=None, **kwargs):
f"'{st}') was present in the config.",
)
continue
opt_dict: ty.Dict[str, str] = {}
opt_dict: dict[str, str] = {}
# Populate opt_dict with (appropriately processed) Adapter conf opts
try:
ks_load_adap.process_conf_options(conf[project_name], opt_dict)

View File

@ -132,7 +132,7 @@ def _fix_argv(argv):
argv[index] = "=".join(split_args)
# Save both for later so we can throw an error about dupes
processed[new].add(orig)
overlap: ty.List[str] = []
overlap: list[str] = []
for new, old in processed.items():
if len(old) > 1:
overlap.extend(old)
@ -303,8 +303,8 @@ class OpenStackConfig:
self._cache_expiration_time = 0
self._cache_path = CACHE_PATH
self._cache_class = 'dogpile.cache.null'
self._cache_arguments: ty.Dict[str, ty.Any] = {}
self._cache_expirations: ty.Dict[str, int] = {}
self._cache_arguments: dict[str, ty.Any] = {}
self._cache_expirations: dict[str, int] = {}
self._influxdb_config = {}
if 'cache' in self.cloud_config:
cache_settings = _util.normalize_keys(self.cloud_config['cache'])
@ -537,7 +537,7 @@ class OpenStackConfig:
return self._expand_regions(regions)
else:
# crappit. we don't have a region defined.
new_cloud: ty.Dict[str, ty.Any] = {}
new_cloud: dict[str, ty.Any] = {}
our_cloud = self.cloud_config['clouds'].get(cloud, {})
self._expand_vendor_profile(cloud, new_cloud, our_cloud)
if 'regions' in new_cloud and new_cloud['regions']:

View File

@ -15,7 +15,6 @@
import glob
import json
import os
import typing as ty
import urllib
import requests
@ -25,7 +24,7 @@ from openstack.config import _util
from openstack import exceptions
_VENDORS_PATH = os.path.dirname(os.path.realpath(__file__))
_VENDOR_DEFAULTS: ty.Dict[str, ty.Dict] = {}
_VENDOR_DEFAULTS: dict[str, dict] = {}
_WELL_KNOWN_PATH = "{scheme}://{netloc}/.well-known/openstack/api"

View File

@ -80,7 +80,7 @@ class Resource(resource.Resource):
all_projects=None,
**params,
):
headers: ty.Union[ty.Union[ty.Dict[str, str], None]] = (
headers: ty.Union[ty.Union[dict[str, str], None]] = (
{} if project_id or all_projects else None
)
@ -95,7 +95,7 @@ class Resource(resource.Resource):
@classmethod
def _get_next_link(cls, uri, response, data, marker, limit, total_yielded):
next_link = None
params: ty.Dict[str, ty.Union[ty.List[str], str]] = {}
params: dict[str, ty.Union[list[str], str]] = {}
if isinstance(data, dict):
links = data.get('links')
if links:

View File

@ -18,7 +18,6 @@ Exception definitions.
import json
import re
import typing as ty
from requests import exceptions as _rex
@ -180,7 +179,7 @@ def raise_from_response(response, error_message=None):
if response.status_code < 400:
return
cls: ty.Type[SDKException]
cls: type[SDKException]
if response.status_code == 400:
cls = BadRequestException
elif response.status_code == 403:

View File

@ -12,7 +12,6 @@
import os
import time
import typing as ty
import warnings
from openstack import exceptions
@ -601,7 +600,7 @@ class Proxy(proxy.Proxy):
raise exceptions.SDKException(f"Image creation failed: {str(e)}")
def _make_v2_image_params(self, meta, properties):
ret: ty.Dict = {}
ret: dict = {}
for k, v in iter(properties.items()):
if k in _INT_PROPERTIES:
ret[k] = int(v)

View File

@ -338,7 +338,7 @@ class Image(resource.Resource, tag.TagMixin, _download.DownloadMixin):
stores = stores or []
url = utils.urljoin(self.base_path, self.id, 'import')
data: ty.Dict[str, ty.Any] = {'method': {'name': method}}
data: dict[str, ty.Any] = {'method': {'name': method}}
if uri:
if method != 'web-download':

View File

@ -195,7 +195,7 @@ class Proxy(proxy.Proxy):
@proxy._check_resource(strict=False)
def _update(
self,
resource_type: ty.Type[resource.Resource],
resource_type: type[resource.Resource],
value,
base_path=None,
if_revision=None,
@ -207,7 +207,7 @@ class Proxy(proxy.Proxy):
@proxy._check_resource(strict=False)
def _delete(
self,
resource_type: ty.Type[resource.Resource],
resource_type: type[resource.Resource],
value,
ignore_missing=True,
if_revision=None,

View File

@ -22,11 +22,11 @@ class BaseResource(resource.Resource):
create_method = 'PUT'
#: Metadata stored for this resource. *Type: dict*
metadata: ty.Dict[str, ty.Any] = {}
metadata: dict[str, ty.Any] = {}
_custom_metadata_prefix: str
_system_metadata: ty.Dict[str, ty.Any] = {}
_last_headers: ty.Dict[str, ty.Any] = {}
_system_metadata: dict[str, ty.Any] = {}
_last_headers: dict[str, ty.Any] = {}
def __init__(self, metadata=None, **attrs):
"""Process and save metadata known at creation stage"""

View File

@ -14,7 +14,6 @@
import collections.abc
import json
import typing as ty
from urllib import parse
from urllib import request
@ -221,8 +220,8 @@ def process_multiple_environments_and_files(
:return: tuple of files dict and a dict of the consolidated environment
:rtype: tuple
"""
merged_files: ty.Dict[str, str] = {}
merged_env: ty.Dict[str, ty.Dict] = {}
merged_files: dict[str, str] = {}
merged_env: dict[str, dict] = {}
# If we're keeping a list of environment files separately, include the
# contents of the files in the files dict
@ -275,8 +274,8 @@ def process_environment_and_files(
:return: tuple of files dict and the loaded environment as a dict
:rtype: (dict, dict)
"""
files: ty.Dict[str, str] = {}
env: ty.Dict[str, ty.Dict] = {}
files: dict[str, str] = {}
env: dict[str, dict] = {}
is_object = env_path_is_object and env_path_is_object(env_path)

View File

@ -75,14 +75,14 @@ def normalize_metric_name(name):
class Proxy(adapter.Adapter):
"""Represents a service."""
retriable_status_codes: ty.Optional[ty.List[int]] = None
retriable_status_codes: ty.Optional[list[int]] = None
"""HTTP status codes that should be retried by default.
The number of retries is defined by the configuration in parameters called
``<service-type>_status_code_retries``.
"""
_resource_registry: ty.Dict[str, ty.Type[resource.Resource]] = {}
_resource_registry: dict[str, type[resource.Resource]] = {}
"""Registry of the supported resourses.
Dictionary of resource names (key) types (value).
@ -436,7 +436,7 @@ class Proxy(adapter.Adapter):
)
def _get_resource(
self, resource_type: ty.Type[ResourceType], value, **attrs
self, resource_type: type[ResourceType], value, **attrs
) -> ResourceType:
"""Get a resource object to work on
@ -486,7 +486,7 @@ class Proxy(adapter.Adapter):
@ty.overload
def _find(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
name_or_id: str,
ignore_missing: ty.Literal[True] = True,
**attrs,
@ -495,7 +495,7 @@ class Proxy(adapter.Adapter):
@ty.overload
def _find(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
name_or_id: str,
ignore_missing: ty.Literal[False],
**attrs,
@ -506,7 +506,7 @@ class Proxy(adapter.Adapter):
@ty.overload
def _find(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
name_or_id: str,
ignore_missing: bool,
**attrs,
@ -514,7 +514,7 @@ class Proxy(adapter.Adapter):
def _find(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
name_or_id: str,
ignore_missing: bool = True,
**attrs,
@ -540,7 +540,7 @@ class Proxy(adapter.Adapter):
@_check_resource(strict=False)
def _delete(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
value,
ignore_missing=True,
**attrs,
@ -582,7 +582,7 @@ class Proxy(adapter.Adapter):
@_check_resource(strict=False)
def _update(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
value,
base_path=None,
**attrs,
@ -612,7 +612,7 @@ class Proxy(adapter.Adapter):
def _create(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
base_path=None,
**attrs,
) -> ResourceType:
@ -648,7 +648,7 @@ class Proxy(adapter.Adapter):
def _bulk_create(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
data,
base_path=None,
) -> ty.Generator[ResourceType, None, None]:
@ -674,7 +674,7 @@ class Proxy(adapter.Adapter):
@_check_resource(strict=False)
def _get(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
value=None,
requires_id=True,
base_path=None,
@ -715,7 +715,7 @@ class Proxy(adapter.Adapter):
def _list(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
paginated=True,
base_path=None,
jmespath_filters=None,
@ -765,7 +765,7 @@ class Proxy(adapter.Adapter):
def _head(
self,
resource_type: ty.Type[ResourceType],
resource_type: type[ResourceType],
value=None,
base_path=None,
**attrs,

View File

@ -99,7 +99,7 @@ class _BaseComponent(abc.ABC):
# The name this component is being tracked as in the Resource
key: str
# The class to be used for mappings
_map_cls: ty.Type[ty.Mapping] = dict
_map_cls: type[ty.Mapping] = dict
#: Marks the property as deprecated.
deprecated = False
@ -273,7 +273,7 @@ class Computed(_BaseComponent):
class _ComponentManager(collections.abc.MutableMapping):
"""Storage of a component type"""
attributes: ty.Dict[str, ty.Any]
attributes: dict[str, ty.Any]
def __init__(self, attributes=None, synchronized=False):
self.attributes = dict() if attributes is None else attributes.copy()
@ -354,7 +354,7 @@ class QueryParameters:
parameters, ``limit`` and ``marker``. These are the most common
query parameters used for listing resources in OpenStack APIs.
"""
self._mapping: ty.Dict[str, ty.Union[str, ty.Dict]] = {}
self._mapping: dict[str, ty.Union[str, dict]] = {}
if include_pagination_defaults:
self._mapping.update({"limit": "limit", "marker": "marker"})
self._mapping.update({name: name for name in names})
@ -520,13 +520,13 @@ class Resource(dict):
_header: _ComponentManager
_uri: _ComponentManager
_computed: _ComponentManager
_original_body: ty.Dict[str, ty.Any] = {}
_original_body: dict[str, ty.Any] = {}
_store_unknown_attrs_as_properties = False
_allow_unknown_attrs_in_body = False
_unknown_attrs_in_body: ty.Dict[str, ty.Any] = {}
_unknown_attrs_in_body: dict[str, ty.Any] = {}
# Placeholder for aliases as dict of {__alias__:__original}
_attr_aliases: ty.Dict[str, str] = {}
_attr_aliases: dict[str, str] = {}
def __init__(self, _synchronized=False, connection=None, **attrs):
"""The base resource
@ -1070,13 +1070,13 @@ class Resource(dict):
:return: A dictionary of key/value pairs where keys are named
as they exist as attributes of this class.
"""
mapping: ty.Union[utils.Munch, ty.Dict]
mapping: ty.Union[utils.Munch, dict]
if _to_munch:
mapping = utils.Munch()
else:
mapping = {}
components: ty.List[ty.Type[_BaseComponent]] = []
components: list[type[_BaseComponent]] = []
if body:
components.append(Body)
if headers:
@ -1164,7 +1164,7 @@ class Resource(dict):
*,
resource_request_key=None,
):
body: ty.Union[ty.Dict[str, ty.Any], ty.List[ty.Any]]
body: ty.Union[dict[str, ty.Any], list[ty.Any]]
if patch:
if not self._store_unknown_attrs_as_properties:
# Default case
@ -1590,7 +1590,7 @@ class Resource(dict):
f"Invalid create method: {cls.create_method}"
)
_body: ty.List[ty.Any] = []
_body: list[ty.Any] = []
resources = []
for attrs in data:
# NOTE(gryf): we need to create resource objects, since
@ -1605,7 +1605,7 @@ class Resource(dict):
)
_body.append(request.body)
body: ty.Union[ty.Dict[str, ty.Any], ty.List[ty.Any]] = _body
body: ty.Union[dict[str, ty.Any], list[ty.Any]] = _body
if prepend_key:
assert cls.resources_key

View File

@ -11,7 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
import warnings
import os_service_types
@ -45,11 +44,11 @@ class _ServiceDisabledProxyShim:
class ServiceDescription:
#: Dictionary of supported versions and proxy classes for that version
supported_versions: ty.Dict[str, ty.Type[proxy_mod.Proxy]] = {}
supported_versions: dict[str, type[proxy_mod.Proxy]] = {}
#: main service_type to use to find this service in the catalog
service_type: str
#: list of aliases this service might be registered as
aliases: ty.List[str] = []
aliases: list[str] = []
def __init__(self, service_type, supported_versions=None, aliases=None):
"""Class describing how to interact with a REST service.

View File

@ -24,12 +24,10 @@ import inspect
import random
from typing import (
Any,
Dict,
Generator,
Optional,
Type,
TypeVar,
)
from collections.abc import Generator
from unittest import mock
import uuid
@ -43,8 +41,8 @@ Resource = TypeVar('Resource', bound=resource.Resource)
def generate_fake_resource(
resource_type: Type[Resource],
**attrs: Dict[str, Any],
resource_type: type[Resource],
**attrs: dict[str, Any],
) -> Resource:
"""Generate a fake resource
@ -67,7 +65,7 @@ def generate_fake_resource(
:raises NotImplementedError: If a resource attribute specifies a ``type``
or ``list_type`` that cannot be automatically generated
"""
base_attrs: Dict[str, Any] = {}
base_attrs: dict[str, Any] = {}
for name, value in inspect.getmembers(
resource_type,
predicate=lambda x: isinstance(x, (resource.Body, resource.URI)),
@ -140,9 +138,9 @@ def generate_fake_resource(
def generate_fake_resources(
resource_type: Type[Resource],
resource_type: type[Resource],
count: int = 1,
attrs: Optional[Dict[str, Any]] = None,
attrs: Optional[dict[str, Any]] = None,
) -> Generator[Resource, None, None]:
"""Generate a given number of fake resource entities
@ -175,7 +173,7 @@ def generate_fake_resources(
# various proxy methods also, but doing so requires deep code introspection or
# (better) type annotations
def generate_fake_proxy(
service: Type[service_description.ServiceDescription],
service: type[service_description.ServiceDescription],
api_version: Optional[str] = None,
) -> proxy.Proxy:
"""Generate a fake proxy for the given service type

View File

@ -13,13 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.compute.v2 import hypervisor
from openstack import connection
from openstack.tests.functional import base
HYPERVISORS: ty.List[hypervisor.Hypervisor] = []
HYPERVISORS: list[hypervisor.Hypervisor] = []
def hypervisors():
@ -40,8 +39,7 @@ class TestHost(base.BaseFunctionalTest):
if not hypervisors():
self.skipTest(
"Skip TestHost as there are no hypervisors "
"configured in nova"
"Skip TestHost as there are no hypervisors configured in nova"
)
# Create segment

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.network.v2 import qos_policy as _qos_policy
from openstack.tests.functional import base
@ -20,7 +19,7 @@ class TestQoSPolicy(base.BaseFunctionalTest):
QOS_POLICY_ID = None
IS_SHARED = False
IS_DEFAULT = False
RULES: ty.List[str] = []
RULES: list[str] = []
QOS_POLICY_DESCRIPTION = "QoS policy description"
def setUp(self):

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack.shared_file_system.v2 import share as _share
from openstack.tests.functional.shared_file_system import base
@ -82,7 +81,7 @@ class ShareMetadataTest(base.BaseSharedFileSystemTest):
new_meta = {"newFoo": "newBar"}
full_meta = {"foo": "bar", "newFoo": "newBar"}
empty_meta: ty.Dict[str, str] = {}
empty_meta: dict[str, str] = {}
updated_share = (
self.user_cloud.shared_file_system.update_share_metadata(

View File

@ -257,8 +257,7 @@ class TestFloatingIP(base.TestCase):
[
dict(
method='GET',
uri='https://network.example.com/v2.0/floatingips/'
f'{fid}',
uri=f'https://network.example.com/v2.0/floatingips/{fid}',
json=self.mock_floating_ip_new_rep,
)
]

View File

@ -183,9 +183,10 @@ class TestFirewallRule(FirewallTestCase):
]
)
with mock.patch.object(
self.cloud.network, 'delete_firewall_rule'
), mock.patch.object(self.cloud.log, 'debug'):
with (
mock.patch.object(self.cloud.network, 'delete_firewall_rule'),
mock.patch.object(self.cloud.log, 'debug'),
):
self.assertFalse(
self.cloud.delete_firewall_rule(self.firewall_rule_name)
)
@ -501,11 +502,14 @@ class TestFirewallPolicy(FirewallTestCase):
]
)
with mock.patch.object(
self.cloud.network,
'find_firewall_policy',
return_value=self.mock_firewall_policy,
), mock.patch.object(self.cloud.log, 'debug'):
with (
mock.patch.object(
self.cloud.network,
'find_firewall_policy',
return_value=self.mock_firewall_policy,
),
mock.patch.object(self.cloud.log, 'debug'),
):
self.assertTrue(
self.cloud.delete_firewall_policy(
self.firewall_policy_name, filters
@ -1151,9 +1155,10 @@ class TestFirewallPolicy(FirewallTestCase):
]
)
with mock.patch.object(
self.cloud.network, 'remove_rule_from_policy'
), mock.patch.object(self.cloud.log, 'debug'):
with (
mock.patch.object(self.cloud.network, 'remove_rule_from_policy'),
mock.patch.object(self.cloud.log, 'debug'),
):
r = self.cloud.remove_rule_from_policy(policy['id'], rule['id'])
self.assertDictEqual(policy, r.to_dict())
self.assert_calls()

View File

@ -91,8 +91,7 @@ class TestOperatorCloud(base.TestCase):
self.cloud.config.config['region_name'] = 'testregion'
with testtools.ExpectedException(
exceptions.SDKException,
"Error getting image endpoint on testcloud:testregion: "
"No service",
"Error getting image endpoint on testcloud:testregion: No service",
):
self.cloud.get_session_endpoint("image")

View File

@ -188,8 +188,7 @@ class TestNetworkAddressGroup(TestNetworkProxy):
add_addresses.assert_called_once_with(address_group.AddressGroup, data)
@mock.patch(
'openstack.network.v2._proxy.Proxy.'
'remove_addresses_from_address_group'
'openstack.network.v2._proxy.Proxy.remove_addresses_from_address_group'
)
def test_remove_addresses_from_address_group(self, remove_addresses):
data = mock.sentinel

View File

@ -685,7 +685,7 @@ class TestTempURLBytesPathUnicodeKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode()
key = 'k\u00e9y'
expected_url = url + (
b'?temp_url_sig=temp_url_signature' b'&temp_url_expires=1400003600'
b'?temp_url_sig=temp_url_signature&temp_url_expires=1400003600'
)
expected_body = b'\n'.join(
[
@ -700,7 +700,7 @@ class TestTempURLBytesPathAndKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode()
key = 'k\u00e9y'.encode()
expected_url = url + (
b'?temp_url_sig=temp_url_signature' b'&temp_url_expires=1400003600'
b'?temp_url_sig=temp_url_signature&temp_url_expires=1400003600'
)
expected_body = b'\n'.join(
[
@ -715,7 +715,7 @@ class TestTempURLBytesPathAndNonUtf8Key(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode()
key = b'k\xffy'
expected_url = url + (
b'?temp_url_sig=temp_url_signature' b'&temp_url_expires=1400003600'
b'?temp_url_sig=temp_url_signature&temp_url_expires=1400003600'
)
expected_body = b'\n'.join(
[

View File

@ -3358,11 +3358,16 @@ class TestResourceFind(base.TestCase):
)
def test_find_result_name_not_in_query_parameters(self):
with mock.patch.object(
self.one_result, 'existing', side_effect=self.OneResult.existing
) as mock_existing, mock.patch.object(
self.one_result, 'list', side_effect=self.OneResult.list
) as mock_list:
with (
mock.patch.object(
self.one_result,
'existing',
side_effect=self.OneResult.existing,
) as mock_existing,
mock.patch.object(
self.one_result, 'list', side_effect=self.OneResult.list
) as mock_list,
):
self.assertEqual(
self.result, self.one_result.find(self.cloud.compute, "name")
)

View File

@ -59,8 +59,7 @@ def iterate_timeout(timeout, message, wait=2):
wait = float(wait)
except ValueError:
raise exceptions.SDKException(
f"Wait value must be an int or float value. "
f"{wait} given instead"
f"Wait value must be an int or float value. {wait} given instead"
)
start = time.time()
@ -407,7 +406,7 @@ class TinyDAG:
def _get_in_degree(self):
"""Calculate the in_degree (count incoming) for nodes"""
_in_degree: ty.Dict[str, int] = {u: 0 for u in self._graph.keys()}
_in_degree: dict[str, int] = {u: 0 for u in self._graph.keys()}
for u in self._graph:
for v in self._graph[u]:
_in_degree[v] += 1
@ -547,7 +546,7 @@ class Munch(dict):
def munchify(x, factory=Munch):
"""Recursively transforms a dictionary into a Munch via copy."""
# Munchify x, using `seen` to track object cycles
seen: ty.Dict[int, ty.Any] = dict()
seen: dict[int, ty.Any] = dict()
def munchify_cycles(obj):
try:
@ -587,7 +586,7 @@ def unmunchify(x):
"""Recursively converts a Munch into a dictionary."""
# Munchify x, using `seen` to track object cycles
seen: ty.Dict[int, ty.Any] = dict()
seen: dict[int, ty.Any] = dict()
def unmunchify_cycles(obj):
try: