pre-commit: Enable bandit checks
Change-Id: Ic991a656785d27626fd9e5f86577d138b4df07ae Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
parent
43dd5d9041
commit
32388ccc99
@ -30,7 +30,7 @@ def download_image_stream(conn):
|
||||
# and in your own code, you are now responsible for checking
|
||||
# the integrity of the data. Create an MD5 has to be computed
|
||||
# after all of the data has been consumed.
|
||||
md5 = hashlib.md5()
|
||||
md5 = hashlib.md5(usedforsecurity=False)
|
||||
|
||||
with open("myimage.qcow2", "wb") as local_image:
|
||||
response = conn.image.download_image(image, stream=True)
|
||||
|
@ -110,11 +110,10 @@ def pack(path: str) -> str:
|
||||
with tempfile.NamedTemporaryFile() as tmpfile:
|
||||
# NOTE(toabctl): Luckily, genisoimage, mkisofs and xorrisofs understand
|
||||
# the same parameters which are currently used.
|
||||
cmds = ['genisoimage', 'mkisofs', 'xorrisofs']
|
||||
error: ty.Optional[Exception]
|
||||
for c in cmds:
|
||||
for c in ['genisoimage', 'mkisofs', 'xorrisofs']:
|
||||
try:
|
||||
p = subprocess.Popen(
|
||||
p = subprocess.Popen( # noqa: S603
|
||||
[
|
||||
c,
|
||||
'-o',
|
||||
|
@ -266,7 +266,9 @@ class BaremetalCloudMixin(openstackcloud._OpenStackCloudMixin):
|
||||
for uuid in created_nics:
|
||||
try:
|
||||
self.baremetal.delete_port(uuid)
|
||||
except Exception:
|
||||
except Exception: # noqa: S110
|
||||
# the port might not have been actually created, so a
|
||||
# failure to delete isn't necessarily an issue
|
||||
pass
|
||||
raise
|
||||
|
||||
|
@ -1247,7 +1247,8 @@ class ComputeCloudMixin(_network_common.NetworkCommonCloudMixin):
|
||||
):
|
||||
try:
|
||||
server = self.get_server(server_id)
|
||||
except Exception:
|
||||
except Exception: # noqa: S112
|
||||
# if it hasn't appeared yet, that's okay
|
||||
continue
|
||||
if not server:
|
||||
continue
|
||||
|
@ -1566,7 +1566,8 @@ class NetworkCommonCloudMixin(openstackcloud._OpenStackCloudMixin):
|
||||
for address in port.get('fixed_ips', list()):
|
||||
try:
|
||||
ip = ipaddress.ip_address(address['ip_address'])
|
||||
except Exception:
|
||||
except Exception: # noqa: S112
|
||||
# the address might be unset; ignore if so
|
||||
continue
|
||||
if ip.version == 4:
|
||||
fixed_address = address['ip_address']
|
||||
|
@ -220,7 +220,7 @@ def get_server_external_ipv4(cloud, server):
|
||||
for interface in interfaces:
|
||||
try:
|
||||
ip = ipaddress.ip_address(interface['addr'])
|
||||
except Exception:
|
||||
except Exception: # noqa: S112
|
||||
# Skip any error, we're looking for a working ip - if the
|
||||
# cloud returns garbage, it wouldn't be the first weird thing
|
||||
# but it still doesn't meet the requirement of "be a working
|
||||
@ -268,7 +268,8 @@ def find_best_address(addresses, public=False, cloud_public=True):
|
||||
# will fail fast, but can often come alive
|
||||
# when retried.
|
||||
continue
|
||||
except Exception:
|
||||
except Exception: # noqa: S110
|
||||
# This is best effort. Ignore any errors.
|
||||
pass
|
||||
|
||||
# Give up and return the first - none work as far as we can tell
|
||||
|
2
openstack/config/vendors/__init__.py
vendored
2
openstack/config/vendors/__init__.py
vendored
@ -59,7 +59,7 @@ def get_profile(profile_name):
|
||||
scheme=profile_url.scheme,
|
||||
netloc=profile_url.netloc,
|
||||
)
|
||||
response = requests.get(well_known_url)
|
||||
response = requests.get(well_known_url, timeout=10)
|
||||
if not response.ok:
|
||||
raise exceptions.ConfigException(
|
||||
f"{profile_name} is a remote profile that could not be fetched: "
|
||||
|
@ -202,7 +202,8 @@ def _extract_message(obj: ty.Any) -> ty.Optional[str]:
|
||||
# Ironic before Stein has double JSON encoding, nobody remembers why.
|
||||
try:
|
||||
obj = json.loads(obj)
|
||||
except Exception:
|
||||
except Exception: # noqa: S110
|
||||
# This is best effort. Ignore any errors.
|
||||
pass
|
||||
else:
|
||||
return _extract_message(obj)
|
||||
|
@ -39,7 +39,7 @@ def parse(env_str):
|
||||
YAML format.
|
||||
"""
|
||||
try:
|
||||
env = yaml.load(env_str, Loader=template_format.yaml_loader)
|
||||
env = yaml.load(env_str, Loader=template_format.yaml_loader) # noqa: S506
|
||||
except yaml.YAMLError:
|
||||
# NOTE(prazumovsky): we need to return more informative error for
|
||||
# user, so use SafeLoader, which return error message with template
|
||||
|
@ -52,7 +52,7 @@ def parse(tmpl_str):
|
||||
tpl = json.loads(tmpl_str)
|
||||
else:
|
||||
try:
|
||||
tpl = yaml.load(tmpl_str, Loader=HeatYamlLoader)
|
||||
tpl = yaml.load(tmpl_str, Loader=HeatYamlLoader) # noqa: S506
|
||||
except yaml.YAMLError:
|
||||
# NOTE(prazumovsky): we need to return more informative error for
|
||||
# user, so use SafeLoader, which return error message with template
|
||||
|
@ -39,7 +39,7 @@ def get_template_contents(
|
||||
template_url = utils.normalise_file_path_to_url(template_file)
|
||||
|
||||
if template_url:
|
||||
tpl = request.urlopen(template_url).read()
|
||||
tpl = request.urlopen(template_url).read() # noqa: S310
|
||||
|
||||
elif template_object:
|
||||
is_object = True
|
||||
@ -295,7 +295,7 @@ def process_environment_and_files(
|
||||
elif env_path:
|
||||
env_url = utils.normalise_file_path_to_url(env_path)
|
||||
env_base_url = utils.base_url_for_url(env_url)
|
||||
raw_env = request.urlopen(env_url).read()
|
||||
raw_env = request.urlopen(env_url).read() # noqa: S310
|
||||
|
||||
env = environment_format.parse(raw_env)
|
||||
|
||||
|
@ -38,7 +38,7 @@ def normalise_file_path_to_url(path):
|
||||
def read_url_content(url):
|
||||
try:
|
||||
# TODO(mordred) Use requests
|
||||
content = request.urlopen(url).read()
|
||||
content = request.urlopen(url).read() # noqa: S310
|
||||
except error.URLError:
|
||||
raise exceptions.SDKException(f'Could not fetch contents for {url}')
|
||||
|
||||
|
@ -1508,7 +1508,11 @@ class Resource(dict):
|
||||
body: ty.Union[dict[str, ty.Any], list[ty.Any]] = _body
|
||||
|
||||
if prepend_key:
|
||||
assert cls.resources_key
|
||||
if not cls.resources_key:
|
||||
raise exceptions.ResourceFailure(
|
||||
"Cannot request prepend_key with Unset resources key"
|
||||
)
|
||||
|
||||
body = {cls.resources_key: body}
|
||||
|
||||
response = method(
|
||||
|
@ -112,15 +112,15 @@ def generate_fake_resource(
|
||||
base_attrs[name] = uuid.uuid4().hex
|
||||
elif issubclass(target_type, int):
|
||||
# int
|
||||
base_attrs[name] = random.randint(1, 100)
|
||||
base_attrs[name] = random.randint(1, 100) # noqa: S311
|
||||
elif issubclass(target_type, float):
|
||||
# float
|
||||
base_attrs[name] = random.random()
|
||||
base_attrs[name] = random.random() # noqa: S311
|
||||
elif issubclass(target_type, bool) or issubclass(
|
||||
target_type, _format.BoolStr
|
||||
):
|
||||
# bool
|
||||
base_attrs[name] = random.choice([True, False])
|
||||
base_attrs[name] = random.choice([True, False]) # noqa: S311
|
||||
elif issubclass(target_type, dict):
|
||||
# some dict - without further details leave it empty
|
||||
base_attrs[name] = dict()
|
||||
|
@ -58,4 +58,12 @@ quote-style = "preserve"
|
||||
docstring-code-format = true
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E4", "E7", "E9", "F", "U"]
|
||||
select = ["E4", "E7", "E9", "F", "S", "U"]
|
||||
ignore = [
|
||||
# we only use asserts for type narrowing
|
||||
"S101",
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"openstack/tests/*" = ["S"]
|
||||
"examples/*" = ["S"]
|
||||
|
Loading…
x
Reference in New Issue
Block a user