Support EC2 Fleet API

This change adds support for launching Amazon EC2 instances
(On-Demand and Spot), using the EC2 fleet API.

One key difference comparing to other type of labels is the quota
information can not be determined by the label itself, it is only
known after the instance is launched.

Therefore, when EC2 fleet API is configured, quota is not checked
before launching the instance, but the quota would be taken into
account after the instance is launched.

In the current change the `InstanceRequirements` is not supported.

Change-Id: I1759d3539ef79d4a556661898553396d85aa69fa
Co-Authored-By: James E. Blair <jim@acmegating.com>
Co-Authored-By: Dong Zhang <dong.zhang@bmw.de>
This commit is contained in:
Benedikt Loeffler 2024-07-04 14:50:03 +02:00 committed by Dong Zhang
parent 0c3063a89a
commit 7d31a8e79f
13 changed files with 745 additions and 11 deletions

View File

@ -682,9 +682,31 @@ Selecting the ``aws`` driver adds the following options to the
.. attr:: instance-type
:type: str
:required:
Name of the flavor to use.
Mutually exclusive with :attr:`providers.[aws].pools.labels.fleet`
.. attr:: fleet
:type: dict
If specified, EC2 fleet API would be used for launching
the instance. In this case, quota is not checked before
launching the instance, but is taken into account after
the instance is launched. Mutually exclusive with
:attr:`providers.[aws].pools.labels.instance-type`
.. attr:: instance-types
:type: list
Names of the flavors of the instance that to be launched.
.. attr:: allocation-strategy
:type: str
:required:
Allowed values for on On-Demand: ``lowest-price`` or ``prioritized``.
Allowed values for Spot: ``price-capacity-optimized``, ``capacity-optimized``,
``diversified`` or ``lowest-price``
.. attr:: iam-instance-profile
:type: dict

View File

@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
import cachetools.func
import copy
import functools
import hashlib
import json
import logging
import math
@ -396,7 +397,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
return
self.instance = instance
self.external_id['instance'] = instance['InstanceId']
self.quota = self.adapter.getQuotaForLabel(self.label)
self.quota = self.adapter._getQuotaForLabel(self.label)
self.state = self.INSTANCE_CREATING
if self.state == self.INSTANCE_CREATING:
@ -412,12 +413,15 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if self.state == self.COMPLETE:
self.complete = True
self.quota = self.adapter._getQuotaForLabel(
self.label, self.instance['InstanceType'])
return AwsInstance(self.adapter.provider, self.instance,
self.host, self.quota)
class AwsAdapter(statemachine.Adapter):
IMAGE_UPLOAD_SLEEP = 30
LAUNCH_TEMPLATE_PREFIX = 'nodepool-launch-template'
def __init__(self, provider_config):
# Wrap these instance methods with a per-instance LRU cache so
@ -532,6 +536,7 @@ class AwsAdapter(statemachine.Adapter):
# time on that again.
self.not_our_images = set()
self.not_our_snapshots = set()
self._createLaunchTemplates()
def stop(self):
self.create_executor.shutdown()
@ -668,10 +673,18 @@ class AwsAdapter(statemachine.Adapter):
if label.dedicated_host:
host_types.add(label.instance_type)
else:
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
label_instance_types = []
if label.instance_type:
label_instance_types.append(label.instance_type)
elif label.fleet and label.fleet.get('instance-types'):
# Include instance-types from fleet config if available
label_instance_types.extend(
label.fleet.get('instance-types'))
for label_instance_type in label_instance_types:
if label_instance_type not in instance_types:
instance_types[label_instance_type] = set()
instance_types[label_instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
volume_types.add(label.volume_type)
args = dict(default=math.inf)
@ -725,6 +738,13 @@ class AwsAdapter(statemachine.Adapter):
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
return self._getQuotaForLabel(label)
def _getQuotaForLabel(self, label, instance_type=None):
# When using the Fleet API, we may need to fill in quota
# information from the actual instance, so this internal
# method operates on the label alone or label+instance.
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not counted
# against instance quota. That may be overly optimistic. If
@ -733,9 +753,13 @@ class AwsAdapter(statemachine.Adapter):
if label.dedicated_host:
quota = self._getQuotaForHostType(
label.instance_type)
elif label.fleet and instance_type is None:
# For fleet API, do not check quota before launch the instance
quota = QuotaInformation(instances=1)
else:
check_instance_type = label.instance_type or instance_type
quota = self._getQuotaForInstanceType(
label.instance_type,
check_instance_type,
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
quota.add(self._getQuotaForVolumeType(
@ -1486,6 +1510,197 @@ class AwsAdapter(statemachine.Adapter):
else:
image_id = self._getImageId(label.cloud_image)
if label.fleet:
return self._createFleet(label, image_id, tags, hostname, log)
else:
return self._runInstance(label, image_id, tags,
hostname, dedicated_host_id, log)
def _createLaunchTemplates(self):
fleet_labels = []
for pool_name, pool in self.provider.pools.items():
for label_name, label in pool.labels.items():
# Create launch templates only for labels which usage fleet
if not label.fleet:
continue
fleet_labels.append(label)
if not fleet_labels:
return
self.log.info("Creating launch templates")
tags = {
'nodepool_managed': True,
'nodepool_provider_name': self.provider.name,
}
existing_templates = dict() # for clean up and avoid creation attempt
created_templates = set() # for avoid creation attempt
configured_templates = set() # for clean up
name_filter = {
'Name': 'launch-template-name',
'Values': [f'{self.LAUNCH_TEMPLATE_PREFIX}-*'],
}
paginator = self.ec2_client.get_paginator(
'describe_launch_templates')
with self.non_mutating_rate_limiter:
for page in paginator.paginate(Filters=[name_filter]):
for template in page['LaunchTemplates']:
existing_templates[
template['LaunchTemplateName']] = template
for label in fleet_labels:
ebs_settings = {
'DeleteOnTermination': True,
}
if label.volume_size:
ebs_settings['VolumeSize'] = label.volume_size
if label.volume_type:
ebs_settings['VolumeType'] = label.volume_type
if label.iops:
ebs_settings['Iops'] = label.iops
if label.throughput:
ebs_settings['Throughput'] = label.throughput
template_data = {
'KeyName': label.key_name,
'SecurityGroupIds': [label.pool.security_group_id],
'BlockDeviceMappings': [
{
'DeviceName': '/dev/sda1',
'Ebs': ebs_settings,
},
],
}
template_args = dict(
LaunchTemplateData=template_data,
TagSpecifications=[
{
'ResourceType': 'launch-template',
'Tags': tag_dict_to_list(tags),
},
]
)
template_name = self._getLaunchTemplateName(template_args)
configured_templates.add(template_name)
label._launch_template_name = template_name
if (template_name in existing_templates or
template_name in created_templates):
self.log.debug(
'Launch template %s already exists', template_name)
continue
template_args['LaunchTemplateName'] = template_name
self.log.debug('Creating launch template %s', template_name)
try:
self.ec2_client.create_launch_template(**template_args)
created_templates.add(template_name)
self.log.debug('Launch template %s created', template_name)
except botocore.exceptions.ClientError as e:
if (e.response['Error']['Code'] ==
'InvalidLaunchTemplateName.AlreadyExistsException'):
self.log.debug(
'Launch template %s already created',
template_name)
else:
raise e
except Exception:
self.log.exception(
'Could not create launch template %s', template_name)
# remove unused templates
for template_name, template in existing_templates.items():
if template_name not in configured_templates:
# check if the template was created by the current provider
tags = template.get('Tags', [])
for tag in tags:
if (tag['Key'] == 'nodepool_provider_name' and
tag['Value'] == self.provider.name):
self.ec2_client.delete_launch_template(
LaunchTemplateName=template_name)
self.log.debug("Deleted unused launch template: %s",
template_name)
def _getLaunchTemplateName(self, args):
hasher = hashlib.sha256()
hasher.update(json.dumps(args, sort_keys=True).encode('utf8'))
sha = hasher.hexdigest()
return (f'{self.LAUNCH_TEMPLATE_PREFIX}-{sha}')
def _createFleet(self, label, image_id, tags, hostname, log):
overrides = []
instance_types = label.fleet.get('instance-types', [])
for instance_type in instance_types:
overrides.append({
'ImageId': image_id,
'InstanceType': instance_type,
'SubnetId': label.pool.subnet_id,
})
if label.use_spot:
capacity_type_option = {
'SpotOptions': {
'AllocationStrategy': label.fleet['allocation-strategy'],
},
'TargetCapacitySpecification': {
'TotalTargetCapacity': 1,
'DefaultTargetCapacityType': 'spot',
},
}
else:
capacity_type_option = {
'OnDemandOptions': {
'AllocationStrategy': label.fleet['allocation-strategy'],
},
'TargetCapacitySpecification': {
'TotalTargetCapacity': 1,
'DefaultTargetCapacityType': 'on-demand',
},
}
template_name = label._launch_template_name
args = {
**capacity_type_option,
'LaunchTemplateConfigs': [
{
'LaunchTemplateSpecification': {
'LaunchTemplateName': template_name,
'Version': '$Latest',
},
'Overrides': overrides,
},
],
'Type': 'instant',
'TagSpecifications': [
{
'ResourceType': 'instance',
'Tags': tag_dict_to_list(tags),
},
{
'ResourceType': 'volume',
'Tags': tag_dict_to_list(tags),
},
],
}
with self.rate_limiter(log.debug, "Created fleet"):
resp = self.ec2_client.create_fleet(**args)
instance_id = resp['Instances'][0]['InstanceIds'][0]
describe_instances_result = self.ec2_client.describe_instances(
InstanceIds=[instance_id]
)
log.debug("Created VM %s as instance %s using EC2 Fleet API",
hostname, instance_id)
return describe_instances_result['Reservations'][0]['Instances'][0]
def _runInstance(self, label, image_id, tags, hostname,
dedicated_host_id, log):
args = dict(
ImageId=image_id,
MinCount=1,

View File

@ -173,7 +173,7 @@ class AwsLabel(ConfigValue):
self.diskimage = None
self.ebs_optimized = bool(label.get('ebs-optimized', False))
self.instance_type = label['instance-type']
self.instance_type = label.get('instance-type', None)
self.key_name = label.get('key-name')
self.volume_type = label.get('volume-type')
self.volume_size = label.get('volume-size')
@ -194,6 +194,7 @@ class AwsLabel(ConfigValue):
if not self.pool.az:
raise Exception(
"Availability-zone is required for dedicated hosts")
self.fleet = label.get('fleet', None)
@staticmethod
def getSchema():
@ -201,7 +202,13 @@ class AwsLabel(ConfigValue):
v.Required('name'): str,
v.Exclusive('cloud-image', 'image'): str,
v.Exclusive('diskimage', 'image'): str,
v.Required('instance-type'): str,
v.Exclusive('instance-type', 'instance'): str,
v.Exclusive('fleet', 'instance'): {
v.Required('instance-types'): list,
v.Required('allocation-strategy'): v.Any(
'prioritized', 'price-capacity-optimized',
'capacity-optimized', 'diversified', 'lowest-price')
},
v.Required('key-name'): str,
'ebs-optimized': bool,
'volume-type': str,

View File

@ -166,6 +166,10 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.driver_data = instance.driver_data
node.slot = instance.slot
# If we did not know the resource information before
# launching, update it now.
node.resources = instance.getQuotaInformation().get_resources()
# Optionally, if the node has updated values that we set from
# the image attributes earlier, set those.
for attr in ('username', 'python_path', 'shell_type',

View File

@ -246,6 +246,12 @@ class QuotaInformation:
'''Return resources value to register in ZK node'''
return self.quota['compute']
@staticmethod
def from_resources(resources):
qi = QuotaInformation()
qi.quota['compute'] = resources
return qi
def __str__(self):
return str(self.quota)
@ -364,8 +370,14 @@ class QuotaSupport:
# may have changed under it. It should settle out
# eventually when it's deleted.
continue
node_resources = self.quotaNeededByLabel(
node.type[0], provider_pool)
# If the node resources is valid, we can use that to
# construct the qi object for the node.
if node.resources['cores']:
node_resources = QuotaInformation.from_resources(
node.resources)
else:
node_resources = self.quotaNeededByLabel(
node.type[0], provider_pool)
used_quota.add(node_resources)
except Exception:
self.log.exception("Couldn't consider invalid node %s "

View File

@ -0,0 +1,44 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: ubuntu1404-io2
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404-io2
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.medium
allocation-strategy: prioritized
key-name: zuul
volume-type: io2
volume-size: 20
iops: 1234

View File

@ -0,0 +1,104 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: ubuntu1404-io2
- name: ubuntu1404-gp3
- name: ubuntu1404-on-demand
- name: ubuntu1404-spot
- name: ubuntu1404-4core
- name: ubuntu1404-fleet-4core
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404-io2
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.medium
allocation-strategy: prioritized
key-name: zuul
volume-type: io2
volume-size: 20
iops: 2000
- name: ubuntu1404-gp3
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.medium
allocation-strategy: prioritized
key-name: zuul
volume-type: gp3
volume-size: 40
iops: 1000
throughput: 200
- name: ubuntu1404-on-demand
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.nano
- t3.micro
- t3.small
- t3.medium
allocation-strategy: prioritized
key-name: zuul
volume-type: gp3
volume-size: 40
iops: 1000
throughput: 200
- name: ubuntu1404-spot
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.nano
- t3.micro
- t3.small
- t3.medium
allocation-strategy: price-capacity-optimized
key-name: zuul
volume-type: gp3
volume-size: 40
iops: 1000
throughput: 200
use-spot: True
- name: ubuntu1404-4core
cloud-image: ubuntu1404
instance-type: t3.xlarge
key-name: zuul
- name: ubuntu1404-fleet-4core
cloud-image: ubuntu1404
fleet:
instance-types:
- t3.xlarge
allocation-strategy: prioritized
key-name: zuul
volume-type: gp3
volume-size: 40
iops: 1000
throughput: 200

View File

@ -0,0 +1,35 @@
elements-dir: /etc/nodepool/elements
images-dir: /opt/nodepool_dib
zookeeper-servers:
- host: zk1.openstack.org
port: 2181
chroot: /test
labels:
- name: ubuntu1404-on-demand
- name: ubuntu1404-spot
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404-on-demand
cloud-image: ubuntu1404
key-name: zuul
fleet:
instance-types:
- t3.nano
- t3.micro
allocation-strategy: not-exist

View File

@ -0,0 +1,43 @@
elements-dir: /etc/nodepool/elements
images-dir: /opt/nodepool_dib
zookeeper-servers:
- host: zk1.openstack.org
port: 2181
chroot: /test
labels:
- name: ubuntu1404-on-demand
- name: ubuntu1404-spot
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404-on-demand
cloud-image: ubuntu1404
key-name: zuul
fleet:
# "instance-types" and "instance-requirements" are exclusive
instance-types:
- t3.nano
- t3.micro
instance-requirements:
vcpu-count:
min: 1
max: 8
memory-mib:
min: 1
max: 16000
allocation-strategy: lowest-price

View File

@ -0,0 +1,35 @@
elements-dir: /etc/nodepool/elements
images-dir: /opt/nodepool_dib
zookeeper-servers:
- host: zk1.openstack.org
port: 2181
chroot: /test
labels:
- name: ubuntu1404-on-demand
- name: ubuntu1404-spot
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
pools:
- name: main
max-servers: 10
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu1404-on-demand
cloud-image: ubuntu1404
key-name: zuul
fleet:
instance-types:
- t3.small
- t3.large
allocation-strategy: lowest-price

View File

@ -102,3 +102,30 @@ class TestConfigValidation(tests.BaseTestCase):
validator = ConfigValidator(config)
ret = validator.validate()
self.assertEqual(ret, 1)
def test_aws_fleet_good(self):
config = os.path.join(os.path.dirname(tests.__file__),
'fixtures', 'config_validate',
'aws-fleet-good.yaml')
validator = ConfigValidator(config)
ret = validator.validate()
self.assertEqual(ret, 0)
def test_aws_fleet_exclusive_error(self):
config = os.path.join(os.path.dirname(tests.__file__),
'fixtures', 'config_validate',
'aws-fleet-exclusive-error.yaml')
validator = ConfigValidator(config)
ret = validator.validate()
self.assertEqual(ret, 1)
def test_aws_fleet_allocation_strategy_error(self):
config = os.path.join(os.path.dirname(tests.__file__),
'fixtures', 'config_validate',
'aws-fleet-allocation-strategy-error.yaml')
validator = ConfigValidator(config)
ret = validator.validate()
self.assertEqual(ret, 1)

View File

@ -55,6 +55,19 @@ class FakeAwsAdapter(AwsAdapter):
raise self.__testcase.run_instances_exception
return self.ec2_client.run_instances_orig(*args, **kwargs)
# Note: boto3 doesn't handle all features correctly (e.g.
# instance-requirements, volume attributes) when creating
# fleet in fake mode, we need to intercept the create_fleet
# call and validate the args we supply. Results are also
# intercepted for validate instance attributes
def _fake_create_fleet(*args, **kwargs):
self.__testcase.create_fleet_calls.append(kwargs)
if self.__testcase.create_fleet_exception:
raise self.__testcase.create_fleet_exception
result = self.ec2_client.create_fleet_orig(*args, **kwargs)
self.__testcase.create_fleet_results.append(result)
return result
def _fake_allocate_hosts(*args, **kwargs):
if self.__testcase.allocate_hosts_exception:
raise self.__testcase.allocate_hosts_exception
@ -73,6 +86,8 @@ class FakeAwsAdapter(AwsAdapter):
self.ec2_client.run_instances_orig = self.ec2_client.run_instances
self.ec2_client.run_instances = _fake_run_instances
self.ec2_client.create_fleet_orig = self.ec2_client.create_fleet
self.ec2_client.create_fleet = _fake_create_fleet
self.ec2_client.allocate_hosts_orig = self.ec2_client.allocate_hosts
self.ec2_client.allocate_hosts = _fake_allocate_hosts
self.ec2_client.register_image_orig = self.ec2_client.register_image
@ -174,6 +189,9 @@ class TestDriverAws(tests.DBTestCase):
# A list of args to method calls for validation
self.run_instances_calls = []
self.run_instances_exception = None
self.create_fleet_calls = []
self.create_fleet_results = []
self.create_fleet_exception = None
self.allocate_hosts_exception = None
self.register_image_calls = []
@ -721,6 +739,7 @@ class TestDriverAws(tests.DBTestCase):
instance['Placement'] = {'AvailabilityZone': 'us-west-2b'}
iface = {'Ipv6Addresses': [{'Ipv6Address': 'fe80::dead:beef'}]}
instance['NetworkInterfaces'] = [iface]
instance['InstanceType'] = 'test'
provider = Dummy()
provider.region_name = 'us-west-2'
awsi = AwsInstance(provider, instance, None, None)
@ -1349,3 +1368,164 @@ class TestDriverAws(tests.DBTestCase):
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_create_launch_templates(self):
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
launch_tempaltes = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(launch_tempaltes), 2)
lt1 = launch_tempaltes[0]
lt2 = launch_tempaltes[1]
self.assertTrue(lt1['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
self.assertTrue(lt2['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
lt_version = self.ec2_client.\
describe_launch_template_versions(
LaunchTemplateId=lt2['LaunchTemplateId'])[
'LaunchTemplateVersions'][0]
lt_data = lt_version['LaunchTemplateData']
self.assertIsNotNone(lt_data.get('SecurityGroupIds'))
ebs_settings = lt_data['BlockDeviceMappings'][0]['Ebs']
self.assertTrue(ebs_settings['DeleteOnTermination'])
self.assertEqual(ebs_settings['Iops'], 1000)
self.assertEqual(ebs_settings['VolumeSize'], 40)
self.assertEqual(ebs_settings['VolumeType'], 'gp3')
self.assertEqual(ebs_settings['Throughput'], 200)
# Restart pool, the launch templates must be the same and
# must not be recreated
pool.stop()
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
lt_2nd_run = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(lt_2nd_run), 2)
self.assertEqual(lt1['LaunchTemplateId'],
lt_2nd_run[0]['LaunchTemplateId'])
self.assertEqual(lt2['LaunchTemplateId'],
lt_2nd_run[1]['LaunchTemplateId'])
def test_aws_cleanup_launch_templates(self):
# start nodepool with old templates config
configfile = self.setup_config('aws/aws-fleet-old-template.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
launch_tempaltes = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(launch_tempaltes), 1)
# Restart pool with the config that not include the old template,
# the old template should be deleted.
pool.stop()
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
lt_2nd_run = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(lt_2nd_run), 2)
def test_aws_create_fleet_on_demand(self):
req = self.requestNode('aws/aws-fleet.yaml', 'ubuntu1404-on-demand')
node = self.assertSuccess(req)
self.assertEqual(
self.create_fleet_calls[0]['OnDemandOptions']
['AllocationStrategy'], 'prioritized')
self.assertTrue(
self.create_fleet_calls[0]['LaunchTemplateConfigs'][0]
['LaunchTemplateSpecification']['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['ResourceType'], 'instance')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['Tags'][1]['Key'], 'nodepool_pool_name')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['Tags'][1]['Value'], 'main')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['ResourceType'], 'volume')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['Tags'][1]['Key'], 'nodepool_pool_name')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['Tags'][1]['Value'], 'main')
self.assertEqual(
self.create_fleet_results[0]['Instances'][0]['Lifecycle'],
'on-demand')
self.assertIn(self.create_fleet_results[0]['Instances'][0]
['InstanceType'],
('t3.nano', 't3.micro', 't3.small', 't3.medium'))
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
def test_aws_create_fleet_spot(self):
req = self.requestNode('aws/aws-fleet.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
self.assertEqual(
self.create_fleet_calls[0]['SpotOptions']
['AllocationStrategy'], 'price-capacity-optimized')
self.assertEqual(
self.create_fleet_calls[0]['TargetCapacitySpecification']
['DefaultTargetCapacityType'], 'spot')
self.assertIn(self.create_fleet_results[0]['Instances'][0]
['InstanceType'],
('t3.nano', 't3.micro', 't3.small', 't3.medium'))
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
@ec2_quotas({
'L-1216C47A': 6,
'L-34B43A08': 2
})
def test_aws_fleet_quota(self):
# Test if the quota used by instances launched by fleet API
# are taken into account.
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a node request with fleet API.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('ubuntu1404-fleet-4core')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second node request with non-fleet API; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('ubuntu1404-4core')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)

View File

@ -0,0 +1,6 @@
---
features:
- |
The AWS driver now support EC2 Fleet API to launch instances whose
types are selected from among a specified set based on certain
optimization criteria.