
This adds support for staying within OpenStack volume quota limits on instances that utilize boot-from-volume. Change-Id: I1b7bc177581d23cecd9443a392fb058176409c46
863 lines
32 KiB
Python
863 lines
32 KiB
Python
# Copyright (C) 2011-2013 OpenStack Foundation
|
|
# Copyright 2017 Red Hat
|
|
# Copyright 2022 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
#
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import functools
|
|
import logging
|
|
import math
|
|
import time
|
|
import operator
|
|
|
|
import cachetools.func
|
|
import openstack
|
|
from keystoneauth1.exceptions.catalog import EndpointNotFound
|
|
|
|
from nodepool.driver.utils import QuotaInformation
|
|
from nodepool.driver import statemachine
|
|
from nodepool import exceptions
|
|
from nodepool import stats
|
|
from nodepool import version
|
|
|
|
CACHE_TTL = 10
|
|
|
|
|
|
def quota_from_flavor(flavor, label=None, volumes=None):
|
|
args = dict(instances=1,
|
|
cores=flavor.vcpus,
|
|
ram=flavor.ram)
|
|
if label and label.boot_from_volume:
|
|
args['volumes'] = 1
|
|
args['volume-gb'] = label.volume_size
|
|
elif volumes:
|
|
args['volumes'] = len(volumes)
|
|
args['volume-gb'] = sum([v.size for v in volumes])
|
|
return QuotaInformation(**args)
|
|
|
|
|
|
def quota_from_limits(compute, volume):
|
|
def bound_value(value):
|
|
if value == -1:
|
|
return math.inf
|
|
return value
|
|
|
|
args = dict(
|
|
instances=bound_value(compute.max_total_instances),
|
|
cores=bound_value(compute.max_total_cores),
|
|
ram=bound_value(compute.max_total_ram_size))
|
|
if volume is not None:
|
|
args['volumes'] = bound_value(volume['absolute']['maxTotalVolumes'])
|
|
args['volume-gb'] = bound_value(
|
|
volume['absolute']['maxTotalVolumeGigabytes'])
|
|
return QuotaInformation(**args)
|
|
|
|
|
|
class OpenStackInstance(statemachine.Instance):
|
|
def __init__(self, provider, server, quota):
|
|
super().__init__()
|
|
self.external_id = server.id
|
|
self.metadata = server.metadata
|
|
self.private_ipv4 = server.private_v4
|
|
self.private_ipv6 = None
|
|
self.public_ipv4 = server.public_v4
|
|
self.public_ipv6 = server.public_v6
|
|
self.host_id = server.host_id
|
|
self.cloud = provider.cloud_config.name
|
|
self.region = provider.region_name
|
|
self.az = server.location.zone
|
|
|
|
self.interface_ip = server.interface_ip
|
|
# TODO: this doesn't match the behavior of other drivers
|
|
# but is here for backwards compatibility.
|
|
self.private_ipv4 = self.private_ipv4 or self.public_ipv4
|
|
|
|
self.quota = quota
|
|
|
|
def getQuotaInformation(self):
|
|
return self.quota
|
|
|
|
|
|
class OpenStackResource(statemachine.Resource):
|
|
def __init__(self, metadata, type, id):
|
|
super().__init__(metadata)
|
|
self.type = type
|
|
self.id = id
|
|
|
|
|
|
class OpenStackDeleteStateMachine(statemachine.StateMachine):
|
|
FLOATING_IP_DELETING = 'deleting floating ip'
|
|
SERVER_DELETE = 'delete server'
|
|
SERVER_DELETING = 'deleting server'
|
|
COMPLETE = 'complete'
|
|
|
|
def __init__(self, adapter, external_id, log):
|
|
self.log = log
|
|
super().__init__()
|
|
self.adapter = adapter
|
|
self.external_id = external_id
|
|
self.floating_ips = None
|
|
|
|
def advance(self):
|
|
if self.state == self.START:
|
|
self.server = self.adapter._getServer(self.external_id)
|
|
if (self.server and
|
|
self.adapter._hasFloatingIps() and
|
|
self.server.addresses):
|
|
self.floating_ips = self.adapter._getFloatingIps(self.server)
|
|
for fip in self.floating_ips:
|
|
self.adapter._deleteFloatingIp(fip)
|
|
self.state = self.FLOATING_IP_DELETING
|
|
if not self.floating_ips:
|
|
self.state = self.SERVER_DELETE
|
|
|
|
if self.state == self.FLOATING_IP_DELETING:
|
|
fips = []
|
|
for fip in self.floating_ips:
|
|
fip = self.adapter._refreshFloatingIpDelete(fip)
|
|
if not fip or fip['status'] == 'DOWN':
|
|
fip = None
|
|
if fip:
|
|
fips.append(fip)
|
|
self.floating_ips = fips
|
|
if self.floating_ips:
|
|
return
|
|
else:
|
|
self.state = self.SERVER_DELETE
|
|
|
|
if self.state == self.SERVER_DELETE:
|
|
self.adapter._deleteServer(self.external_id)
|
|
self.state = self.SERVER_DELETING
|
|
|
|
if self.state == self.SERVER_DELETING:
|
|
self.server = self.adapter._refreshServerDelete(self.server)
|
|
if self.server:
|
|
return
|
|
else:
|
|
self.state = self.COMPLETE
|
|
|
|
if self.state == self.COMPLETE:
|
|
self.complete = True
|
|
|
|
|
|
class OpenStackCreateStateMachine(statemachine.StateMachine):
|
|
SERVER_CREATING_SUBMIT = 'submit creating server'
|
|
SERVER_CREATING = 'creating server'
|
|
SERVER_RETRY = 'retrying server creation'
|
|
SERVER_RETRY_DELETING = 'deleting server for retry'
|
|
FLOATING_IP_CREATING = 'creating floating ip'
|
|
FLOATING_IP_ATTACHING = 'attaching floating ip'
|
|
COMPLETE = 'complete'
|
|
|
|
def __init__(self, adapter, hostname, label, image_external_id,
|
|
metadata, retries, request, az, log):
|
|
self.log = log
|
|
super().__init__()
|
|
self.adapter = adapter
|
|
self.provider = adapter.provider
|
|
self.retries = retries
|
|
self.attempts = 0
|
|
self.label = label
|
|
self.server = None
|
|
self.hostname = hostname
|
|
self.request = request
|
|
self.az = az
|
|
|
|
if image_external_id:
|
|
self.image_external = image_external_id
|
|
diskimage = self.provider.diskimages[label.diskimage.name]
|
|
self.config_drive = diskimage.config_drive
|
|
image_name = diskimage.name
|
|
else:
|
|
# launch using unmanaged cloud image
|
|
self.config_drive = label.cloud_image.config_drive
|
|
|
|
if label.cloud_image.image_id:
|
|
# Using a dict with the ID bypasses an image search during
|
|
# server creation.
|
|
self.image_external = dict(id=label.cloud_image.image_id)
|
|
else:
|
|
self.image_external = label.cloud_image.external_name
|
|
image_name = label.cloud_image.name
|
|
|
|
props = label.instance_properties.copy()
|
|
for k, v in label.dynamic_instance_properties.items():
|
|
try:
|
|
props[k] = v.format(request=self.request.getSafeAttributes())
|
|
except Exception:
|
|
self.log.exception(
|
|
"Error formatting dynamic instance property %s", k)
|
|
if not props:
|
|
props = None
|
|
|
|
# Put provider.name and image_name in as groups so that ansible
|
|
# inventory can auto-create groups for us based on each of those
|
|
# qualities
|
|
# Also list each of those values directly so that non-ansible
|
|
# consumption programs don't need to play a game of knowing that
|
|
# groups[0] is the image name or anything silly like that.
|
|
groups_list = [self.provider.name]
|
|
groups_list.append(image_name)
|
|
groups_list.append(label.name)
|
|
meta = dict(
|
|
groups=",".join(groups_list),
|
|
)
|
|
# merge in any instance properties provided from config
|
|
if props:
|
|
meta.update(props)
|
|
# merge nodepool-internal metadata
|
|
meta.update(metadata)
|
|
self.metadata = meta
|
|
self.flavor = self.adapter._findFlavor(
|
|
flavor_name=self.label.flavor_name,
|
|
min_ram=self.label.min_ram)
|
|
self.quota = quota_from_flavor(self.flavor, label=self.label)
|
|
self.external_id = None
|
|
|
|
def _handleServerFault(self):
|
|
if not self.external_id:
|
|
return
|
|
try:
|
|
server = self.adapter._getServerByIdNow(self.external_id)
|
|
if not server:
|
|
return
|
|
fault = server.get('fault', {}).get('message')
|
|
if fault:
|
|
self.log.error('Detailed node error: %s', fault)
|
|
if 'quota' in fault:
|
|
self.quota_exceeded = True
|
|
except Exception:
|
|
self.log.exception(
|
|
'Failed to retrieve node error information:')
|
|
|
|
def advance(self):
|
|
if self.state == self.START:
|
|
self.external_id = None
|
|
self.quota_exceeded = False
|
|
self.create_future = self.adapter._submitApi(
|
|
self.adapter._createServer,
|
|
self.hostname,
|
|
image=self.image_external,
|
|
flavor=self.flavor,
|
|
key_name=self.label.key_name,
|
|
az=self.az,
|
|
config_drive=self.config_drive,
|
|
networks=self.label.networks,
|
|
security_groups=self.label.pool.security_groups,
|
|
boot_from_volume=self.label.boot_from_volume,
|
|
volume_size=self.label.volume_size,
|
|
instance_properties=self.metadata,
|
|
userdata=self.label.userdata)
|
|
self.state = self.SERVER_CREATING_SUBMIT
|
|
|
|
if self.state == self.SERVER_CREATING_SUBMIT:
|
|
try:
|
|
try:
|
|
self.server = self.adapter._completeApi(self.create_future)
|
|
if self.server is None:
|
|
return
|
|
self.external_id = self.server.id
|
|
self.state = self.SERVER_CREATING
|
|
except openstack.cloud.exc.OpenStackCloudCreateException as e:
|
|
if e.resource_id:
|
|
self.external_id = e.resource_id
|
|
self._handleServerFault()
|
|
raise
|
|
except Exception as e:
|
|
self.log.exception("Launch attempt %d/%d failed:",
|
|
self.attempts, self.retries)
|
|
if 'quota exceeded' in str(e).lower():
|
|
self.quota_exceeded = True
|
|
if 'number of ports exceeded' in str(e).lower():
|
|
self.quota_exceeded = True
|
|
self.state = self.SERVER_RETRY
|
|
|
|
if self.state == self.SERVER_CREATING:
|
|
self.server = self.adapter._refreshServer(self.server)
|
|
|
|
if self.server.status == 'ACTIVE':
|
|
if (self.label.pool.auto_floating_ip and
|
|
self.adapter._needsFloatingIp(self.server)):
|
|
self.floating_ip = self.adapter._createFloatingIp(
|
|
self.server)
|
|
self.state = self.FLOATING_IP_CREATING
|
|
else:
|
|
self.state = self.COMPLETE
|
|
elif self.server.status == 'ERROR':
|
|
if ('fault' in self.server and self.server['fault'] is not None
|
|
and 'message' in self.server['fault']):
|
|
self.log.error(
|
|
"Error in creating the server."
|
|
" Compute service reports fault: {reason}".format(
|
|
reason=self.server['fault']['message']))
|
|
if self.external_id:
|
|
try:
|
|
self.server = self.adapter._deleteServer(
|
|
self.external_id)
|
|
except Exception:
|
|
self.log.exception("Error deleting server:")
|
|
self.server = None
|
|
else:
|
|
self.server = None
|
|
self.state = self.SERVER_RETRY
|
|
else:
|
|
return
|
|
|
|
if self.state == self.SERVER_RETRY:
|
|
if self.external_id:
|
|
try:
|
|
self.server = self.adapter._deleteServer(self.external_id)
|
|
except Exception:
|
|
self.log.exception("Error deleting server:")
|
|
# We must keep trying the delete until timeout in
|
|
# order to avoid having two servers for the same
|
|
# node id.
|
|
return
|
|
else:
|
|
self.server = None
|
|
self.state = self.SERVER_RETRY_DELETING
|
|
|
|
if self.state == self.SERVER_RETRY_DELETING:
|
|
self.server = self.adapter._refreshServerDelete(self.server)
|
|
if self.server:
|
|
return
|
|
self.attempts += 1
|
|
if self.attempts >= self.retries:
|
|
raise Exception("Too many retries")
|
|
if self.quota_exceeded:
|
|
# A quota exception is not directly recoverable so bail
|
|
# out immediately with a specific exception.
|
|
self.log.info("Quota exceeded, invalidating quota cache")
|
|
raise exceptions.QuotaException("Quota exceeded")
|
|
self.state = self.START
|
|
return
|
|
|
|
if self.state == self.FLOATING_IP_CREATING:
|
|
self.floating_ip = self.adapter._refreshFloatingIp(
|
|
self.floating_ip)
|
|
if self.floating_ip.get('port_id', None):
|
|
if self.floating_ip['status'] == 'ACTIVE':
|
|
self.state = self.FLOATING_IP_ATTACHING
|
|
else:
|
|
return
|
|
else:
|
|
self.adapter._attachIpToServer(self.server, self.floating_ip)
|
|
self.state = self.FLOATING_IP_ATTACHING
|
|
|
|
if self.state == self.FLOATING_IP_ATTACHING:
|
|
self.server = self.adapter._refreshServer(self.server)
|
|
ext_ip = openstack.cloud.meta.get_server_ip(
|
|
self.server, ext_tag='floating', public=True)
|
|
if ext_ip == self.floating_ip['floating_ip_address']:
|
|
self.state = self.COMPLETE
|
|
else:
|
|
return
|
|
|
|
if self.state == self.COMPLETE:
|
|
self.complete = True
|
|
return OpenStackInstance(
|
|
self.adapter.provider, self.server, self.quota)
|
|
|
|
|
|
class OpenStackAdapter(statemachine.Adapter):
|
|
# If we fail to find an image specified by the config, invalidate
|
|
# the image cache after this interval:
|
|
IMAGE_CHECK_TIMEOUT = 300
|
|
|
|
def __init__(self, provider_config):
|
|
# Wrap these instance methods with a per-instance LRU cache so
|
|
# that we don't leak memory over time when the adapter is
|
|
# occasionally replaced.
|
|
self._findImage = functools.lru_cache(maxsize=None)(
|
|
self._findImage)
|
|
self._listFlavors = functools.lru_cache(maxsize=None)(
|
|
self._listFlavors)
|
|
self._findNetwork = functools.lru_cache(maxsize=None)(
|
|
self._findNetwork)
|
|
self._listAZs = functools.lru_cache(maxsize=None)(
|
|
self._listAZs)
|
|
|
|
self.log = logging.getLogger(
|
|
f"nodepool.OpenStackAdapter.{provider_config.name}")
|
|
self.provider = provider_config
|
|
|
|
workers = 8
|
|
self.log.info("Create executor with max workers=%s", workers)
|
|
self.api_executor = ThreadPoolExecutor(
|
|
thread_name_prefix=f'openstack-api-{provider_config.name}',
|
|
max_workers=workers)
|
|
|
|
self._last_image_check_failure = time.time()
|
|
self._last_port_cleanup = None
|
|
self._statsd = stats.get_client()
|
|
self._client = self._getClient()
|
|
|
|
def stop(self):
|
|
self.api_executor.shutdown()
|
|
|
|
def getCreateStateMachine(self, hostname, label, image_external_id,
|
|
metadata, retries, request, az, log):
|
|
return OpenStackCreateStateMachine(
|
|
self, hostname, label, image_external_id,
|
|
metadata, retries, request, az, log)
|
|
|
|
def getDeleteStateMachine(self, external_id, log):
|
|
return OpenStackDeleteStateMachine(self, external_id, log)
|
|
|
|
def listResources(self):
|
|
for server in self._listServers():
|
|
if server.status.lower() == 'deleted':
|
|
continue
|
|
yield OpenStackResource(server.metadata,
|
|
'server', server.id)
|
|
# Floating IP and port leakage can't be handled by the
|
|
# automatic resource cleanup in cleanupLeakedResources because
|
|
# openstack doesn't store metadata on those objects, so we
|
|
# call internal cleanup methods here.
|
|
if self.provider.port_cleanup_interval:
|
|
self._cleanupLeakedPorts()
|
|
if self.provider.clean_floating_ips:
|
|
self._cleanupFloatingIps()
|
|
|
|
def deleteResource(self, resource):
|
|
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
|
|
if resource.type == 'server':
|
|
self._deleteServer(resource.id)
|
|
|
|
def listInstances(self):
|
|
volumes = {}
|
|
for volume in self._listVolumes():
|
|
volumes[volume.id] = volume
|
|
for server in self._listServers():
|
|
if server.status.lower() == 'deleted':
|
|
continue
|
|
flavor = self._getFlavorFromServer(server)
|
|
server_volumes = []
|
|
for vattach in server.volumes:
|
|
volume = volumes.get(vattach.id)
|
|
if volume:
|
|
server_volumes.append(volume)
|
|
quota = quota_from_flavor(flavor, volumes=server_volumes)
|
|
yield OpenStackInstance(self.provider, server, quota)
|
|
|
|
def getQuotaLimits(self):
|
|
compute = self._client.get_compute_limits()
|
|
try:
|
|
volume = self._client.get_volume_limits()
|
|
except EndpointNotFound:
|
|
volume = None
|
|
return quota_from_limits(compute, volume)
|
|
|
|
def getQuotaForLabel(self, label):
|
|
flavor = self._findFlavor(label.flavor_name, label.min_ram)
|
|
return quota_from_flavor(flavor, label=label)
|
|
|
|
def getAZs(self):
|
|
azs = self._listAZs()
|
|
if not azs:
|
|
# If there are no zones, return a list containing None so that
|
|
# random.choice can pick None and pass that to Nova. If this
|
|
# feels dirty, please direct your ire to policy.json and the
|
|
# ability to turn off random portions of the OpenStack API.
|
|
return [None]
|
|
return azs
|
|
|
|
def labelReady(self, label):
|
|
if not label.cloud_image:
|
|
return False
|
|
|
|
# If an image ID was supplied, we'll assume it is ready since
|
|
# we don't currently have a way of validating that (except during
|
|
# server creation).
|
|
if label.cloud_image.image_id:
|
|
return True
|
|
|
|
image = self._findImage(label.cloud_image.external_name)
|
|
if not image:
|
|
self.log.warning(
|
|
"Provider %s is configured to use %s as the"
|
|
" cloud-image for label %s and that"
|
|
" cloud-image could not be found in the"
|
|
" cloud." % (self.provider.name,
|
|
label.cloud_image.external_name,
|
|
label.name))
|
|
# If the user insists there should be an image but it
|
|
# isn't in our cache, invalidate the cache periodically so
|
|
# that we can see new cloud image uploads.
|
|
if (time.time() - self._last_image_check_failure >
|
|
self.IMAGE_CHECK_TIMEOUT):
|
|
self._findImage.cache_clear()
|
|
self._last_image_check_failure = time.time()
|
|
return False
|
|
return True
|
|
|
|
def uploadImage(self, provider_image, image_name, filename,
|
|
image_format, metadata, md5, sha256):
|
|
# configure glance and upload image. Note the meta flags
|
|
# are provided as custom glance properties
|
|
# NOTE: we have wait=True set here. This is not how we normally
|
|
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
|
|
# However - two things to note:
|
|
# - PUT has no aysnc mechanism, so we have to handle it anyway
|
|
# - v2 w/task waiting is very strange and complex - but we have to
|
|
# block for our v1 clouds anyway, so we might as well
|
|
# have the interface be the same and treat faking-out
|
|
# a openstacksdk-level fake-async interface later
|
|
if not metadata:
|
|
metadata = {}
|
|
if image_format:
|
|
metadata['disk_format'] = image_format
|
|
image = self._client.create_image(
|
|
name=image_name,
|
|
filename=filename,
|
|
is_public=False,
|
|
wait=True,
|
|
md5=md5,
|
|
sha256=sha256,
|
|
**metadata)
|
|
return image.id
|
|
|
|
def deleteImage(self, external_id):
|
|
self.log.debug(f"Deleting image {external_id}")
|
|
return self._client.delete_image(external_id)
|
|
|
|
# Local implementation
|
|
|
|
def _getClient(self):
|
|
rate_limit = None
|
|
# nodepool tracks rate limit in time between requests.
|
|
# openstacksdk tracks rate limit in requests per second.
|
|
# 1/time = requests-per-second.
|
|
if self.provider.rate:
|
|
rate_limit = 1 / self.provider.rate
|
|
return openstack.connection.Connection(
|
|
config=self.provider.cloud_config,
|
|
use_direct_get=False,
|
|
rate_limit=rate_limit,
|
|
app_name='nodepool',
|
|
app_version=version.version_info.version_string()
|
|
)
|
|
|
|
def _submitApi(self, api, *args, **kw):
|
|
return self.api_executor.submit(
|
|
api, *args, **kw)
|
|
|
|
def _completeApi(self, future):
|
|
if not future.done():
|
|
return None
|
|
return future.result()
|
|
|
|
def _createServer(self, name, image, flavor,
|
|
az=None, key_name=None, config_drive=True,
|
|
networks=None, security_groups=None,
|
|
boot_from_volume=False, volume_size=50,
|
|
instance_properties=None, userdata=None):
|
|
if not networks:
|
|
networks = []
|
|
if not isinstance(image, dict):
|
|
# if it's a dict, we already have the cloud id. If it's not,
|
|
# we don't know if it's name or ID so need to look it up
|
|
image = self._findImage(image)
|
|
create_args = dict(name=name,
|
|
image=image,
|
|
flavor=flavor,
|
|
config_drive=config_drive)
|
|
if boot_from_volume:
|
|
create_args['boot_from_volume'] = boot_from_volume
|
|
create_args['volume_size'] = volume_size
|
|
# NOTE(pabelanger): Always cleanup volumes when we delete a server.
|
|
create_args['terminate_volume'] = True
|
|
if key_name:
|
|
create_args['key_name'] = key_name
|
|
if az:
|
|
create_args['availability_zone'] = az
|
|
if security_groups:
|
|
create_args['security_groups'] = security_groups
|
|
if userdata:
|
|
create_args['userdata'] = userdata
|
|
nics = []
|
|
for network in networks:
|
|
net_id = self._findNetwork(network)['id']
|
|
nics.append({'net-id': net_id})
|
|
if nics:
|
|
create_args['nics'] = nics
|
|
if instance_properties:
|
|
create_args['meta'] = instance_properties
|
|
|
|
try:
|
|
return self._client.create_server(wait=False, **create_args)
|
|
except openstack.exceptions.BadRequestException:
|
|
# We've gotten a 400 error from nova - which means the request
|
|
# was malformed. The most likely cause of that, unless something
|
|
# became functionally and systemically broken, is stale az, image
|
|
# or flavor cache. Log a message, invalidate the caches so that
|
|
# next time we get new caches.
|
|
self.log.info(
|
|
"Clearing az, flavor and image caches due to 400 error "
|
|
"from nova")
|
|
self._findImage.cache_clear()
|
|
self._listFlavors.cache_clear()
|
|
self._findNetwork.cache_clear()
|
|
self._listAZs.cache_clear()
|
|
raise
|
|
|
|
# This method is wrapped with an LRU cache in the constructor.
|
|
def _listAZs(self):
|
|
return self._client.list_availability_zone_names()
|
|
|
|
# This method is wrapped with an LRU cache in the constructor.
|
|
def _findImage(self, name):
|
|
return self._client.get_image(name, filters={'status': 'active'})
|
|
|
|
# This method is wrapped with an LRU cache in the constructor.
|
|
def _listFlavors(self):
|
|
return self._client.list_flavors(get_extra=False)
|
|
|
|
# This method is only used by the nodepool alien-image-list
|
|
# command and only works with the openstack driver.
|
|
def _listImages(self):
|
|
return self._client.list_images()
|
|
|
|
def _getFlavors(self):
|
|
flavors = self._listFlavors()
|
|
flavors.sort(key=operator.itemgetter('ram', 'name'))
|
|
return flavors
|
|
|
|
def _findFlavorByName(self, flavor_name):
|
|
for f in self._getFlavors():
|
|
if flavor_name in (f['name'], f['id']):
|
|
return f
|
|
raise Exception("Unable to find flavor: %s" % flavor_name)
|
|
|
|
def _findFlavorByRam(self, min_ram, flavor_name):
|
|
for f in self._getFlavors():
|
|
if (f['ram'] >= min_ram
|
|
and (not flavor_name or flavor_name in f['name'])):
|
|
return f
|
|
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
|
|
|
|
def _findFlavorById(self, flavor_id):
|
|
for f in self._getFlavors():
|
|
if f['id'] == flavor_id:
|
|
return f
|
|
raise Exception("Unable to find flavor with id: %s" % flavor_id)
|
|
|
|
def _findFlavor(self, flavor_name, min_ram):
|
|
if min_ram:
|
|
return self._findFlavorByRam(min_ram, flavor_name)
|
|
else:
|
|
return self._findFlavorByName(flavor_name)
|
|
|
|
# This method is wrapped with an LRU cache in the constructor.
|
|
def _findNetwork(self, name):
|
|
network = self._client.get_network(name)
|
|
if not network:
|
|
raise Exception("Unable to find network %s in provider %s" % (
|
|
name, self.provider.name))
|
|
return network
|
|
|
|
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
|
def _listServers(self):
|
|
return self._client.list_servers(bare=True)
|
|
|
|
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
|
def _listVolumes(self):
|
|
try:
|
|
return self._client.list_volumes()
|
|
except EndpointNotFound:
|
|
return []
|
|
|
|
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
|
def _listFloatingIps(self):
|
|
return self._client.list_floating_ips()
|
|
|
|
def _refreshServer(self, obj):
|
|
ret = self._getServer(obj.id)
|
|
if ret:
|
|
return ret
|
|
return obj
|
|
|
|
def _expandServer(self, server):
|
|
return openstack.cloud.meta.add_server_interfaces(
|
|
self._client, server)
|
|
|
|
def _getServer(self, external_id):
|
|
for server in self._listServers():
|
|
if server.id == external_id:
|
|
if server.status in ['ACTIVE', 'ERROR']:
|
|
return self._expandServer(server)
|
|
return server
|
|
return None
|
|
|
|
def _getServerByIdNow(self, server_id):
|
|
# A synchronous get server by id. Only to be used in error
|
|
# handling where we can't wait for the list to update.
|
|
return self._client.get_server_by_id(server_id)
|
|
|
|
def _refreshServerDelete(self, obj):
|
|
if obj is None:
|
|
return obj
|
|
for server in self._listServers():
|
|
if server.id == obj.id:
|
|
if server.status.lower() == 'deleted':
|
|
return None
|
|
return server
|
|
return None
|
|
|
|
def _refreshFloatingIp(self, obj):
|
|
for fip in self._listFloatingIps():
|
|
if fip.id == obj.id:
|
|
return fip
|
|
return obj
|
|
|
|
def _refreshFloatingIpDelete(self, obj):
|
|
if obj is None:
|
|
return obj
|
|
for fip in self._listFloatingIps():
|
|
if fip.id == obj.id:
|
|
if fip.status == 'DOWN':
|
|
return None
|
|
return fip
|
|
return None
|
|
|
|
def _needsFloatingIp(self, server):
|
|
return self._client._needs_floating_ip(
|
|
server=server, nat_destination=None)
|
|
|
|
def _createFloatingIp(self, server):
|
|
return self._client.create_floating_ip(server=server, wait=True)
|
|
|
|
def _attachIpToServer(self, server, fip):
|
|
# skip_attach is ignored for nova, which is the only time we
|
|
# should actually call this method.
|
|
return self._client._attach_ip_to_server(
|
|
server=server, floating_ip=fip,
|
|
skip_attach=True)
|
|
|
|
def _hasFloatingIps(self):
|
|
return self._client._has_floating_ips()
|
|
|
|
def _getFloatingIps(self, server):
|
|
fips = openstack.cloud.meta.find_nova_interfaces(
|
|
server['addresses'], ext_tag='floating')
|
|
fips = [self._client.get_floating_ip(
|
|
id=None, filters={'floating_ip_address': fip['addr']})
|
|
for fip in fips]
|
|
return fips
|
|
|
|
def _deleteFloatingIp(self, fip):
|
|
self._client.delete_floating_ip(fip['id'], retry=0)
|
|
|
|
def _deleteServer(self, external_id):
|
|
self._client.delete_server(external_id)
|
|
|
|
def _getFlavorFromServer(self, server):
|
|
# In earlier versions of nova or the sdk, flavor has just an id.
|
|
# In later versions it returns the information we're looking for.
|
|
# If we get the information we want, we do not need to try to
|
|
# lookup the flavor in our list.
|
|
if hasattr(server.flavor, 'vcpus'):
|
|
return server.flavor
|
|
else:
|
|
return self._findFlavorById(server.flavor.id)
|
|
|
|
# The port cleanup logic. We don't get tags or metadata, so we
|
|
# have to figure this out on our own.
|
|
|
|
# This method is not cached
|
|
def _listPorts(self, status=None):
|
|
'''
|
|
List known ports.
|
|
|
|
:param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
|
|
'''
|
|
if status:
|
|
ports = self._client.list_ports(filters={'status': status})
|
|
else:
|
|
ports = self._client.list_ports()
|
|
return ports
|
|
|
|
def _filterComputePorts(self, ports):
|
|
'''
|
|
Return a list of compute ports (or no device owner).
|
|
|
|
We are not interested in ports for routers or DHCP.
|
|
'''
|
|
ret = []
|
|
for p in ports:
|
|
if (p.device_owner is None or p.device_owner == '' or
|
|
p.device_owner.startswith("compute:")):
|
|
ret.append(p)
|
|
return ret
|
|
|
|
def _cleanupLeakedPorts(self):
|
|
if not self._last_port_cleanup:
|
|
self._last_port_cleanup = time.monotonic()
|
|
ports = self._listPorts(status='DOWN')
|
|
ports = self._filterComputePorts(ports)
|
|
self._down_ports = set([p.id for p in ports])
|
|
return
|
|
|
|
# Return if not enough time has passed between cleanup
|
|
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
|
|
if last_check_in_secs <= self.provider.port_cleanup_interval:
|
|
return
|
|
|
|
ports = self._listPorts(status='DOWN')
|
|
ports = self._filterComputePorts(ports)
|
|
current_set = set([p.id for p in ports])
|
|
remove_set = current_set & self._down_ports
|
|
|
|
removed_count = 0
|
|
for port_id in remove_set:
|
|
try:
|
|
self._deletePort(port_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting port %s in %s:",
|
|
port_id, self.provider.name)
|
|
else:
|
|
removed_count += 1
|
|
self.log.debug("Removed DOWN port %s in %s",
|
|
port_id, self.provider.name)
|
|
|
|
if self._statsd and removed_count:
|
|
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
|
|
self._statsd.incr(key, removed_count)
|
|
|
|
self._last_port_cleanup = time.monotonic()
|
|
|
|
# Rely on OpenStack to tell us the down ports rather than doing our
|
|
# own set adjustment.
|
|
ports = self._listPorts(status='DOWN')
|
|
ports = self._filterComputePorts(ports)
|
|
self._down_ports = set([p.id for p in ports])
|
|
|
|
def _deletePort(self, port_id):
|
|
self._client.delete_port(port_id)
|
|
|
|
def _cleanupFloatingIps(self):
|
|
did_clean = self._client.delete_unattached_floating_ips()
|
|
if did_clean:
|
|
# some openstacksdk's return True if any port was
|
|
# cleaned, rather than the count. Just set it to 1 to
|
|
# indicate something happened.
|
|
if type(did_clean) == bool:
|
|
did_clean = 1
|
|
if self._statsd:
|
|
key = ('nodepool.provider.%s.leaked.floatingips'
|
|
% self.provider.name)
|
|
self._statsd.incr(key, did_clean)
|