typing: Annotate openstack.proxy

There are a quite a few hints included here for the Resource class, but
only because the Proxy class needs them typed.

Change-Id: I1e03c5cc2c1f33bd04b026c714a6508a4b9332ed
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2024-11-20 12:57:20 +00:00
parent 500aae499e
commit 7aa5a9d0c7
7 changed files with 406 additions and 191 deletions

View File

@ -1499,14 +1499,18 @@ class Proxy(proxy.Proxy):
:class:`~openstack.block_storage.v3.limits.RateLimit`
:rtype: :class:`~openstack.block_storage.v3.limits.Limits`
"""
params = {}
project_id = None
if project:
params['project_id'] = resource.Resource._get_id(project)
project_id = resource.Resource._get_id(project)
# we don't use Proxy._get since that doesn't allow passing arbitrary
# query string parameters
res = self._get_resource(_limits.Limits, None)
return res.fetch(self, requires_id=False, **params)
return res.fetch(
self,
requires_id=False,
project_id=project_id,
)
# ====== CAPABILITIES ======
def get_capabilities(self, host):

View File

@ -1729,7 +1729,6 @@ class NetworkCommonCloudMixin(openstackcloud._OpenStackCloudMixin):
if not filters:
filters = {}
data = []
# Handle neutron security groups
if self._use_neutron_secgroups():
# pass filters dict to the list to filter as much as possible on
@ -1815,7 +1814,6 @@ class NetworkCommonCloudMixin(openstackcloud._OpenStackCloudMixin):
"Unavailable feature: security groups"
)
data = []
security_group_json = {'name': name, 'description': description}
if stateful is not None:
security_group_json['stateful'] = stateful

View File

@ -14,6 +14,7 @@
import copy
import os.path
import typing as ty
from urllib import parse
import warnings
@ -30,17 +31,17 @@ import os_service_types
import requestsexceptions
try:
import statsd
import statsd as statsd_client
except ImportError:
statsd = None
statsd_client = None
try:
import prometheus_client
except ImportError:
prometheus_client = None
try:
import influxdb
import influxdb as influxdb_client
except ImportError:
influxdb = None
influxdb_client = None
from openstack import _log
from openstack.config import _util
@ -1131,8 +1132,10 @@ class CloudRegion:
'concurrency', service_type=service_type
)
def get_statsd_client(self):
if not statsd:
def get_statsd_client(
self,
) -> ty.Optional['statsd_client.StatsClientBase']:
if not statsd_client:
if self._statsd_host:
self.log.warning(
'StatsD python library is not available. '
@ -1146,25 +1149,29 @@ class CloudRegion:
statsd_args['port'] = self._statsd_port
if statsd_args:
try:
return statsd.StatsClient(**statsd_args)
return statsd_client.StatsClient(**statsd_args)
except Exception:
self.log.warning('Cannot establish connection to statsd')
return None
else:
return None
def get_statsd_prefix(self):
def get_statsd_prefix(self) -> str:
return self._statsd_prefix or 'openstack.api'
def get_prometheus_registry(self):
def get_prometheus_registry(
self,
) -> ty.Optional['prometheus_client.CollectorRegistry']:
if not self._collector_registry and prometheus_client:
self._collector_registry = prometheus_client.REGISTRY
return self._collector_registry
def get_prometheus_histogram(self):
def get_prometheus_histogram(
self,
) -> ty.Optional['prometheus_client.Histogram']:
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
return None
# We have to hide a reference to the histogram on the registry
# object, because it's collectors must be singletons for a given
# registry but register at creation time.
@ -1184,10 +1191,12 @@ class CloudRegion:
registry._openstacksdk_histogram = hist
return hist
def get_prometheus_counter(self):
def get_prometheus_counter(
self,
) -> ty.Optional['prometheus_client.Counter']:
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
return None
counter = getattr(registry, '_openstacksdk_counter', None)
if not counter:
counter = prometheus_client.Counter(
@ -1224,7 +1233,9 @@ class CloudRegion:
d_key = _make_key('disabled_reason', service_type)
return self.config.get(d_key)
def get_influxdb_client(self):
def get_influxdb_client(
self,
) -> ty.Optional['influxdb_client.InfluxDBClient']:
influx_args = {}
if not self._influxdb_config:
return None
@ -1240,9 +1251,9 @@ class CloudRegion:
for key in ['host', 'username', 'password', 'database', 'timeout']:
if key in self._influxdb_config:
influx_args[key] = self._influxdb_config[key]
if influxdb and influx_args:
if influxdb_client and influx_args:
try:
return influxdb.InfluxDBClient(**influx_args)
return influxdb_client.InfluxDBClient(**influx_args)
except Exception:
self.log.warning('Cannot establish connection to InfluxDB')
else:

View File

@ -10,7 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
# This is needed due to https://github.com/eventlet/eventlet/issues/1026 which
# nova (and possibly others) expose
from __future__ import annotations
import collections.abc
import functools
import logging
import queue
import typing as ty
import urllib
from urllib.parse import urlparse
@ -25,6 +32,7 @@ except ImportError:
import iso8601
import jmespath
from keystoneauth1 import adapter
from keystoneauth1 import session
from openstack import _log
from openstack import exceptions
@ -33,19 +41,30 @@ from openstack import utils
from openstack import warnings as os_warnings
if ty.TYPE_CHECKING:
import influxdb as influxdb_client
from keystoneauth1 import plugin
import prometheus_client
import requests
from statsd.client import base as statsd_client
from openstack import connection
def normalize_metric_name(name):
def normalize_metric_name(name: str) -> str:
name = name.replace('.', '_')
name = name.replace(':', '_')
return name
class CleanupDependency(ty.TypedDict):
before: list[str]
after: list[str]
class Proxy(adapter.Adapter):
"""Represents a service."""
retriable_status_codes: ty.Optional[list[int]] = None
retriable_status_codes: list[int] | None = None
"""HTTP status codes that should be retried by default.
The number of retries is defined by the configuration in parameters called
@ -58,28 +77,82 @@ class Proxy(adapter.Adapter):
Dictionary of resource names (key) types (value).
"""
_connection: 'connection.Connection'
_connection: connection.Connection
def __init__(
self,
session,
statsd_client=None,
statsd_prefix=None,
prometheus_counter=None,
prometheus_histogram=None,
influxdb_config=None,
influxdb_client=None,
*args,
**kwargs,
session: session.Session,
*,
service_type: str | None = None,
service_name: str | None = None,
interface: str | None = None,
region_name: str | None = None,
endpoint_override: str | None = None,
version: str | None = None,
auth: plugin.BaseAuthPlugin | None = None,
user_agent: str | None = None,
connect_retries: int | None = None,
logger: logging.Logger | None = None,
allow: dict[str, ty.Any] | None = None,
additional_headers: collections.abc.MutableMapping[str, str]
| None = None,
client_name: str | None = None,
client_version: str | None = None,
allow_version_hack: bool | None = None,
global_request_id: str | None = None,
min_version: str | None = None,
max_version: str | None = None,
default_microversion: str | None = None,
status_code_retries: int | None = None,
retriable_status_codes: list[int] | None = None,
raise_exc: bool | None = None,
rate_limit: float | None = None,
concurrency: int | None = None,
connect_retry_delay: float | None = None,
status_code_retry_delay: float | None = None,
# everything from here on is SDK-specific
statsd_client: statsd_client.StatsClient | None = None,
statsd_prefix: str | None = None,
prometheus_counter: prometheus_client.Counter | None = None,
prometheus_histogram: prometheus_client.Histogram | None = None,
influxdb_config: dict[str, ty.Any] | None = None,
influxdb_client: influxdb_client.InfluxDBClient | None = None,
):
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
# override it with a class-level value.
kwargs.setdefault(
'retriable_status_codes', self.retriable_status_codes
if retriable_status_codes is None:
retriable_status_codes = self.retriable_status_codes
super().__init__(
session=session,
service_type=service_type,
service_name=service_name,
interface=interface,
region_name=region_name,
endpoint_override=endpoint_override,
version=version,
auth=auth,
user_agent=user_agent,
connect_retries=connect_retries,
logger=logger,
allow=allow,
additional_headers=additional_headers,
client_name=client_name,
client_version=client_version,
allow_version_hack=allow_version_hack,
global_request_id=global_request_id,
min_version=min_version,
max_version=max_version,
default_microversion=default_microversion,
status_code_retries=status_code_retries,
retriable_status_codes=retriable_status_codes,
raise_exc=raise_exc,
rate_limit=rate_limit,
concurrency=concurrency,
connect_retry_delay=connect_retry_delay,
status_code_retry_delay=status_code_retry_delay,
)
# TODO(stephenfin): Resolve this by copying the signature of
# adapter.Adapter.__init__
super().__init__(session=session, *args, **kwargs) # type: ignore
self._statsd_client = statsd_client
self._statsd_prefix = statsd_prefix
self._prometheus_counter = prometheus_counter
@ -92,15 +165,23 @@ class Proxy(adapter.Adapter):
log_name = 'openstack'
self.log = _log.setup_logging(log_name)
def _get_cache_key_prefix(self, url):
def _get_cache_key_prefix(self, url: str) -> str:
"""Calculate cache prefix for the url"""
if not self.service_type:
# narrow type
raise RuntimeError('expected service_type to be set')
name_parts = self._extract_name(
url, self.service_type, self.session.get_project_id()
)
return '.'.join([self.service_type] + name_parts)
def _invalidate_cache(self, conn, key_prefix):
def _invalidate_cache(
self,
conn: connection.Connection,
key_prefix: str,
) -> None:
"""Invalidate all cache entries starting with given prefix"""
for k in set(conn._api_cache_keys):
if k.startswith(key_prefix):
@ -109,16 +190,20 @@ class Proxy(adapter.Adapter):
def request(
self,
url,
method,
error_message=None,
raise_exc=False,
connect_retries=1,
global_request_id=None,
*args,
**kwargs,
):
url: str,
method: str,
error_message: str | None = None,
raise_exc: bool = False,
connect_retries: int = 1,
global_request_id: str | None = None,
*args: ty.Any,
**kwargs: ty.Any,
) -> requests.Response:
conn = self._get_connection()
if not conn:
# narrow type
raise RuntimeError('no connection found')
if not global_request_id:
# Per-request setting should take precedence
global_request_id = conn._global_request_id
@ -143,20 +228,21 @@ class Proxy(adapter.Adapter):
conn._cache_expirations.get(key_prefix, 0)
)
# Get from cache or execute and cache
response = conn._cache.get_or_create(
_response = conn._cache.get_or_create(
key=key,
creator=super().request,
creator_args=(
[url, method],
dict(
connect_retries=connect_retries,
raise_exc=raise_exc,
global_request_id=global_request_id,
{
'connect_retries': connect_retries,
'raise_exc': raise_exc,
'global_request_id': global_request_id,
**kwargs,
),
},
),
expiration_time=expiration_time,
)
response = ty.cast('requests.Response', _response)
else:
# invalidate cache if we send modification request or user
# asked for cache bypass
@ -184,7 +270,12 @@ class Proxy(adapter.Adapter):
raise
@functools.lru_cache(maxsize=256)
def _extract_name(self, url, service_type=None, project_id=None):
def _extract_name(
self,
url: str,
service_type: str | None = None,
project_id: str | None = None,
) -> list[str]:
"""Produce a key name to use in logging/metrics from the URL path.
We want to be able to logic/metric sane general things, so we pull
@ -260,7 +351,9 @@ class Proxy(adapter.Adapter):
# Strip out anything that's empty or None
return [part for part in name_parts if part]
def _extract_name_consume_url_parts(self, url_parts):
def _extract_name_consume_url_parts(
self, url_parts: list[str]
) -> list[str]:
"""Pull out every other URL portion.
For example, ``GET /servers/{id}/os-security-groups`` returns
@ -282,20 +375,41 @@ class Proxy(adapter.Adapter):
return name_parts
def _report_stats(self, response, url=None, method=None, exc=None):
if self._statsd_client:
def _report_stats(
self,
response: requests.Response | None,
url: str | None = None,
method: str | None = None,
exc: BaseException | None = None,
) -> None:
self._report_stats_statsd(response, url, method, exc)
if self._prometheus_counter and self._prometheus_histogram:
self._report_stats_prometheus(response, url, method, exc)
if self._influxdb_client:
self._report_stats_influxdb(response, url, method, exc)
def _report_stats_statsd(self, response, url=None, method=None, exc=None):
def _report_stats_statsd(
self,
response: requests.Response | None,
url: str | None = None,
method: str | None = None,
exc: BaseException | None = None,
) -> None:
if not self._statsd_prefix:
return None
if not self._statsd_client:
return None
try:
if response is not None and not url:
url = response.request.url
if response is not None and not method:
method = response.request.method
# narrow types
assert url is not None
assert method is not None
assert self.service_type is not None
name_parts = [
normalize_metric_name(f)
for f in self._extract_name(
@ -326,31 +440,51 @@ class Proxy(adapter.Adapter):
self.log.exception("Exception reporting metrics")
def _report_stats_prometheus(
self, response, url=None, method=None, exc=None
):
self,
response: requests.Response | None,
url: str | None = None,
method: str | None = None,
exc: BaseException | None = None,
) -> None:
if not self._prometheus_counter:
return None
if not self._prometheus_histogram:
return None
if response is not None and not url:
url = response.request.url
if response is not None and not method:
method = response.request.method
parsed_url = urlparse(url)
endpoint = (
f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}" # type: ignore[str-bytes-safe]
)
if response is not None:
labels = dict(
method=method,
endpoint=endpoint,
service_type=self.service_type,
status_code=response.status_code,
)
labels = {
'method': method,
'endpoint': endpoint,
'service_type': self.service_type,
'status_code': response.status_code,
}
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.total_seconds() * 1000
)
def _report_stats_influxdb(
self, response, url=None, method=None, exc=None
):
self,
response: requests.Response | None,
url: str | None = None,
method: str | None = None,
exc: BaseException | None = None,
) -> None:
if not self._influxdb_client:
return None
if not self._influxdb_config:
return None
# NOTE(gtema): status_code is saved both as tag and field to give
# ability showing it as a value and not only as a legend.
# However Influx is not ok with having same name in tags and fields,
@ -359,9 +493,9 @@ class Proxy(adapter.Adapter):
url = response.request.url
if response is not None and not method:
method = response.request.method
tags = dict(
method=method,
name='_'.join(
tags = {
'method': method,
'name': '_'.join(
[
normalize_metric_name(f)
for f in self._extract_name(
@ -369,8 +503,8 @@ class Proxy(adapter.Adapter):
)
]
),
)
fields = dict(attempted=1)
}
fields = {'attempted': 1}
if response is not None:
fields['duration'] = int(response.elapsed.total_seconds() * 1000)
tags['status_code'] = str(response.status_code)
@ -392,13 +526,13 @@ class Proxy(adapter.Adapter):
)
# Note(gtema) append service name into the measurement name
measurement = f'{measurement}.{self.service_type}'
data = [dict(measurement=measurement, tags=tags, fields=fields)]
data = [{'measurement': measurement, 'tags': tags, 'fields': fields}]
try:
self._influxdb_client.write_points(data)
except Exception:
self.log.exception('Error writing statistics to InfluxDB')
def _get_connection(self):
def _get_connection(self) -> connection.Connection | None:
"""Get the Connection object associated with this Proxy.
When the Session is created, a reference to the Connection is attached
@ -412,7 +546,7 @@ class Proxy(adapter.Adapter):
def _get_resource(
self,
resource_type: type[resource.ResourceT],
value: ty.Union[None, str, resource.ResourceT, utils.Munch],
value: None | str | resource.ResourceT | utils.Munch,
**attrs: ty.Any,
) -> resource.ResourceT:
"""Get a resource object to work on
@ -450,7 +584,12 @@ class Proxy(adapter.Adapter):
return res
def _get_uri_attribute(self, child, parent, name):
def _get_uri_attribute(
self,
child: resource.Resource | str,
parent: resource.Resource | str | None,
name: str,
) -> str:
"""Get a value to be associated with a URI attribute
`child` will not be None here as it's a required argument
@ -461,10 +600,11 @@ class Proxy(adapter.Adapter):
"""
if parent is None:
value = getattr(child, name)
else:
value = resource.Resource._get_id(parent)
assert isinstance(value, str) # narrow type
return value
return resource.Resource._get_id(parent)
@ty.overload
def _find(
self,
@ -472,7 +612,7 @@ class Proxy(adapter.Adapter):
name_or_id: str,
ignore_missing: ty.Literal[True] = True,
**attrs: ty.Any,
) -> ty.Optional[resource.ResourceT]: ...
) -> resource.ResourceT | None: ...
@ty.overload
def _find(
@ -492,7 +632,7 @@ class Proxy(adapter.Adapter):
name_or_id: str,
ignore_missing: bool,
**attrs: ty.Any,
) -> ty.Optional[resource.ResourceT]: ...
) -> resource.ResourceT | None: ...
def _find(
self,
@ -500,7 +640,7 @@ class Proxy(adapter.Adapter):
name_or_id: str,
ignore_missing: bool = True,
**attrs: ty.Any,
) -> ty.Optional[resource.ResourceT]:
) -> resource.ResourceT | None:
"""Find a resource
:param resource_type: The type of resource to find. This should be a
@ -524,10 +664,10 @@ class Proxy(adapter.Adapter):
def _delete(
self,
resource_type: type[resource.ResourceT],
value: ty.Union[str, resource.ResourceT, None],
value: str | resource.ResourceT | None,
ignore_missing: bool = True,
**attrs: ty.Any,
) -> ty.Optional[resource.ResourceT]:
) -> resource.ResourceT | None:
"""Delete a resource
:param resource_type: The type of resource to delete. This should be a
@ -566,8 +706,8 @@ class Proxy(adapter.Adapter):
def _update(
self,
resource_type: type[resource.ResourceT],
value: ty.Union[str, resource.ResourceT, None],
base_path: ty.Optional[str] = None,
value: str | resource.ResourceT | None,
base_path: str | None = None,
**attrs: ty.Any,
) -> resource.ResourceT:
"""Update a resource
@ -598,7 +738,7 @@ class Proxy(adapter.Adapter):
def _create(
self,
resource_type: type[resource.ResourceT],
base_path: ty.Optional[str] = None,
base_path: str | None = None,
**attrs: ty.Any,
) -> resource.ResourceT:
"""Create a resource from attributes
@ -632,7 +772,7 @@ class Proxy(adapter.Adapter):
self,
resource_type: type[resource.ResourceT],
data: list[dict[str, ty.Any]],
base_path: ty.Optional[str] = None,
base_path: str | None = None,
) -> ty.Generator[resource.ResourceT, None, None]:
"""Create a resource from attributes
@ -656,9 +796,9 @@ class Proxy(adapter.Adapter):
def _get(
self,
resource_type: type[resource.ResourceT],
value: ty.Union[str, resource.ResourceT, None] = None,
value: str | resource.ResourceT | None = None,
requires_id: bool = True,
base_path: ty.Optional[str] = None,
base_path: str | None = None,
skip_cache: bool = False,
**attrs: ty.Any,
) -> resource.ResourceT:
@ -701,8 +841,8 @@ class Proxy(adapter.Adapter):
self,
resource_type: type[resource.ResourceT],
paginated: bool = True,
base_path: ty.Optional[str] = None,
jmespath_filters: ty.Optional[str] = None,
base_path: str | None = None,
jmespath_filters: str | None = None,
**attrs: ty.Any,
) -> ty.Generator[resource.ResourceT, None, None]:
"""List a resource
@ -748,15 +888,15 @@ class Proxy(adapter.Adapter):
'removed in a future release.',
os_warnings.RemovedInSDK60Warning,
)
return jmespath.search(jmespath_filters, data)
return jmespath.search(jmespath_filters, data) # type: ignore[no-any-return]
return data
def _head(
self,
resource_type: type[resource.ResourceT],
value: ty.Union[str, resource.ResourceT, None] = None,
base_path: ty.Optional[str] = None,
value: str | resource.ResourceT | None = None,
base_path: str | None = None,
**attrs: ty.Any,
) -> resource.ResourceT:
"""Retrieve a resource's header
@ -781,30 +921,50 @@ class Proxy(adapter.Adapter):
res = self._get_resource(resource_type, value, **attrs)
return res.head(self, base_path=base_path)
def _get_cleanup_dependencies(self):
def _get_cleanup_dependencies(
self,
) -> dict[str, CleanupDependency] | None:
return None
# TODO(stephenfin): Add type for filters. We expect the created_at or
# updated_at keys
def _service_cleanup(
self,
dry_run=True,
client_status_queue=None,
identified_resources=None,
filters=None,
resource_evaluation_fn=None,
skip_resources=None,
):
dry_run: bool = True,
client_status_queue: queue.Queue[resource.Resource] | None = None,
identified_resources: dict[str, resource.Resource] | None = None,
filters: dict[str, ty.Any] | None = None,
resource_evaluation_fn: ty.Callable[
[
resource.Resource,
dict[str, ty.Any] | None,
dict[str, resource.Resource] | None,
],
bool,
]
| None = None,
skip_resources: list[str] | None = None,
) -> None:
return None
def _service_cleanup_del_res(
self,
del_fn,
obj,
dry_run=True,
client_status_queue=None,
identified_resources=None,
filters=None,
resource_evaluation_fn=None,
):
del_fn: ty.Callable[[resource.Resource], None],
obj: resource.Resource,
dry_run: bool = True,
client_status_queue: queue.Queue[resource.Resource] | None = None,
identified_resources: dict[str, resource.Resource] | None = None,
filters: dict[str, ty.Any] | None = None,
resource_evaluation_fn: ty.Callable[
[
resource.Resource,
dict[str, ty.Any] | None,
dict[str, resource.Resource] | None,
],
bool,
]
| None = None,
) -> bool:
need_delete = False
try:
if resource_evaluation_fn and callable(resource_evaluation_fn):
@ -838,7 +998,11 @@ class Proxy(adapter.Adapter):
self.log.exception('Cannot delete resource %s: %s', obj, str(e))
return need_delete
def _service_cleanup_resource_filters_evaluation(self, obj, filters=None):
def _service_cleanup_resource_filters_evaluation(
self,
obj: resource.Resource,
filters: dict[str, ty.Any] | None = None,
) -> bool:
part_cond = []
if filters is not None and isinstance(filters, dict):
for k, v in filters.items():
@ -870,8 +1034,10 @@ class Proxy(adapter.Adapter):
else:
return False
def should_skip_resource_cleanup(self, resource=None, skip_resources=None):
if resource is None or skip_resources is None:
def should_skip_resource_cleanup(
self, resource: str, skip_resources: list[str] | None = None
) -> bool:
if skip_resources is None:
return False
if self.service_type is None:
@ -891,7 +1057,10 @@ class Proxy(adapter.Adapter):
# TODO(stephenfin): Remove this and all users. Use of this generally indicates
# a missing Resource type.
def _json_response(response, result_key=None, error_message=None):
def _json_response(
response: requests.Response,
error_message: str | None = None,
) -> requests.Response | ty.Any:
"""Temporary method to use to bridge from ShadeAdapter to SDK calls."""
exceptions.raise_from_response(response, error_message=error_message)
@ -900,11 +1069,11 @@ def _json_response(response, result_key=None, error_message=None):
return response
# Some REST calls do not return json content. Don't decode it.
if 'application/json' not in response.headers.get('Content-Type'):
content_type = response.headers.get('Content-Type')
if not content_type or 'application/json' not in content_type:
return response
try:
result_json = response.json()
return response.json()
except JSONDecodeError:
return response
return result_json

View File

@ -51,9 +51,13 @@ from openstack import fields
from openstack import utils
from openstack import warnings as os_warnings
if ty.TYPE_CHECKING:
from openstack import connection
LOG = _log.setup_logging(__name__)
AdapterT = ty.TypeVar('AdapterT', bound=adapter.Adapter)
ResourceT = ty.TypeVar('ResourceT', bound='Resource')
# TODO(stephenfin): We should deprecate the 'type' and 'list_type' arguments
@ -354,7 +358,7 @@ class Resource(dict):
pagination_key: ty.Optional[str] = None
#: The ID of this resource.
id = Body("id")
id: str = Body("id")
#: The name of this resource.
name: str = Body("name")
@ -658,7 +662,7 @@ class Resource(dict):
res.append((attr, self[attr]))
return res
def _update(self, **attrs):
def _update(self, **attrs: ty.Any) -> None:
"""Given attributes, update them on this instance
This is intended to be used from within the proxy
@ -858,7 +862,7 @@ class Resource(dict):
return ""
@staticmethod
def _get_id(value):
def _get_id(value: ty.Union['Resource', str]) -> str:
"""If a value is a Resource, return the canonical ID
This will return either the value specified by `id` or
@ -872,7 +876,7 @@ class Resource(dict):
return value
@classmethod
def new(cls, **kwargs):
def new(cls, **kwargs: ty.Any) -> ty_ext.Self:
"""Create a new instance of this resource.
When creating the instance set the ``_synchronized`` parameter
@ -903,7 +907,12 @@ class Resource(dict):
return cls(_synchronized=True, connection=connection, **kwargs)
@classmethod
def _from_munch(cls, obj, synchronized=True, connection=None):
def _from_munch(
cls,
obj: dict[str, ty.Union],
synchronized: bool = True,
connection: ty.Optional['connection.Connection'] = None,
) -> ty_ext.Self:
"""Create an instance from a ``utils.Munch`` object.
This is intended as a temporary measure to convert between shade-style
@ -1325,15 +1334,15 @@ class Resource(dict):
def create(
self,
session,
prepend_key=True,
base_path=None,
session: adapter.Adapter,
prepend_key: bool = True,
base_path: ty.Optional[str] = None,
*,
resource_request_key=None,
resource_response_key=None,
microversion=None,
**params,
):
resource_request_key: ty.Optional[str] = None,
resource_response_key: ty.Optional[str] = None,
microversion: ty.Optional[str] = None,
**params: ty.Any,
) -> ty_ext.Self:
"""Create a remote resource based on this instance.
:param session: The session to use for making this request.
@ -1417,23 +1426,23 @@ class Resource(dict):
# direct comparision to False since we need to rule out None
if self.has_body and self.create_returns_body is False:
# fetch the body if it's required but not returned by create
fetch_kwargs = {}
if resource_response_key is not None:
fetch_kwargs = {'resource_response_key': resource_response_key}
return self.fetch(session, **fetch_kwargs)
return self.fetch(
session,
resource_response_key=resource_response_key,
)
return self
@classmethod
def bulk_create(
cls,
session,
data,
prepend_key=True,
base_path=None,
session: adapter.Adapter,
data: list[dict[str, ty.Any]],
prepend_key: bool = True,
base_path: ty.Optional[str] = None,
*,
microversion=None,
**params,
):
microversion: ty.Optional[str] = None,
**params: ty.Any,
) -> ty.Generator[ty_ext.Self, None, None]:
"""Create multiple remote resources based on this class and data.
:param session: The session to use for making this request.
@ -1486,7 +1495,10 @@ class Resource(dict):
# Those objects will be used in case where request doesn't return
# JSON data representing created resource, and yet it's required
# to return newly created resource objects.
resource = cls.new(connection=session._get_connection(), **attrs)
# TODO(stephenfin): Our types say we accept a ksa Adapter, but this
# requires an SDK Proxy. Do we update the types or rework this to
# support use of an adapter.
resource = cls.new(connection=session._get_connection(), **attrs) # type: ignore
resources.append(resource)
request = resource._prepare_request(
requires_id=requires_id, base_path=base_path
@ -1507,13 +1519,17 @@ class Resource(dict):
params=params,
)
exceptions.raise_from_response(response)
data = response.json()
json = response.json()
if cls.resources_key:
data = data[cls.resources_key]
json = json[cls.resources_key]
else:
json = json
if not isinstance(data, list):
data = [data]
if isinstance(data, list):
json = json
else:
json = [json]
has_body = (
cls.has_body
@ -1524,26 +1540,29 @@ class Resource(dict):
return (r.fetch(session) for r in resources)
else:
return (
# TODO(stephenfin): Our types say we accept a ksa Adapter, but
# this requires an SDK Proxy. Do we update the types or rework
# this to support use of an adapter.
cls.existing(
microversion=microversion,
connection=session._get_connection(),
connection=session._get_connection(), # type: ignore
**res_dict,
)
for res_dict in data
for res_dict in json
)
def fetch(
self,
session,
requires_id=True,
base_path=None,
error_message=None,
skip_cache=False,
session: adapter.Adapter,
requires_id: bool = True,
base_path: ty.Optional[str] = None,
error_message: ty.Optional[str] = None,
skip_cache: bool = False,
*,
resource_response_key=None,
microversion=None,
**params,
):
resource_response_key: ty.Optional[str] = None,
microversion: ty.Optional[str] = None,
**params: ty.Any,
) -> ty_ext.Self:
"""Get a remote resource based on this instance.
:param session: The session to use for making this request.
@ -1594,7 +1613,13 @@ class Resource(dict):
return self
def head(self, session, base_path=None, *, microversion=None):
def head(
self,
session: adapter.Adapter,
base_path: ty.Optional[str] = None,
*,
microversion: ty.Optional[str] = None,
) -> ty_ext.Self:
"""Get headers from a remote resource based on this instance.
:param session: The session to use for making this request.
@ -1633,15 +1658,15 @@ class Resource(dict):
def commit(
self,
session,
prepend_key=True,
has_body=True,
retry_on_conflict=None,
base_path=None,
session: adapter.Adapter,
prepend_key: bool = True,
has_body: bool = True,
retry_on_conflict: ty.Optional[bool] = None,
base_path: ty.Optional[str] = None,
*,
microversion=None,
**kwargs,
):
microversion: ty.Optional[str] = None,
**kwargs: ty.Any,
) -> ty_ext.Self:
"""Commit the state of the instance to the remote resource.
:param session: The session to use for making this request.
@ -1826,8 +1851,13 @@ class Resource(dict):
)
def delete(
self, session, error_message=None, *, microversion=None, **kwargs
):
self,
session: adapter.Adapter,
error_message: ty.Optional[str] = None,
*,
microversion: ty.Optional[str] = None,
**kwargs: ty.Any,
) -> ty_ext.Self:
"""Delete the remote resource based on this instance.
:param session: The session to use for making this request.
@ -1873,15 +1903,15 @@ class Resource(dict):
@classmethod
def list(
cls,
session,
paginated=True,
base_path=None,
allow_unknown_params=False,
session: adapter.Adapter,
paginated: bool = True,
base_path: ty.Optional[str] = None,
allow_unknown_params: bool = False,
*,
microversion=None,
headers=None,
**params,
):
microversion: ty.Optional[str] = None,
headers: ty.Optional[dict[str, str]] = None,
**params: ty.Any,
) -> ty.Generator[ty_ext.Self, None, None]:
"""This method is a generator which yields resource objects.
This resource object list generator handles pagination and takes query
@ -2012,9 +2042,12 @@ class Resource(dict):
# We want that URI props are available on the resource
raw_resource.update(uri_params)
# TODO(stephenfin): Our types say we accept a ksa Adapter, but
# this requires an SDK Proxy. Do we update the types or rework
# this to support use of an adapter.
value = cls.existing(
microversion=microversion,
connection=session._get_connection(),
connection=session._get_connection(), # type: ignore
**raw_resource,
)
marker = value.id
@ -2280,9 +2313,6 @@ def _normalize_status(status: ty.Optional[str]) -> ty.Optional[str]:
return status
ResourceT = ty.TypeVar('ResourceT', bound=Resource)
def wait_for_status(
session: adapter.Adapter,
resource: ResourceT,

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from openstack import resource
from openstack import utils
@ -91,7 +93,7 @@ class ShareAccessRule(resource.Resource):
unrestrict=False,
**kwargs,
):
body = {'deny_access': {'access_id': self.id}}
body: dict[str, ty.Any] = {'deny_access': {'access_id': self.id}}
if unrestrict:
body['deny_access']['unrestrict'] = True
url = utils.urljoin("/shares", self.share_id, "action")

View File

@ -33,6 +33,7 @@ module = [
"openstack.exceptions",
"openstack.fields",
"openstack.format",
"openstack.proxy",
"openstack.utils",
]
warn_return_any = true