Add EBS direct image upload to AWS

This adds a third (!) method of uploading images to AWS.  It uploads
blocks directly to an EBS snapshot.  It bypasses S3 which makes it
faster and more efficient, but it may incur additional costs, so it
does not replace the existing less-costly options.

The process is very similar to the existing process used in the Azure
driver, so a new helper abstraction is created that should help
support both drivers.  A later change may update the Azure driver to
use the new abstraction.

Change-Id: I5cb707386b4b61987f94862d70067350ae17d80d
Co-Authored-By: Tobias Henkel <tobias.henkel@bmw.de>
This commit is contained in:
James E. Blair 2024-07-09 16:04:59 -07:00
parent 2291d1652a
commit fc2d0ba4b8
8 changed files with 453 additions and 72 deletions

View File

@ -5,10 +5,10 @@
AWS Driver
----------
If using the AWS driver to upload diskimages, see
`VM Import/Export service role`_ for information on configuring
the required permissions in AWS. You must also create an S3 Bucket
for use by Nodepool.
If using the AWS driver to upload diskimages, see `VM Import/Export
service role`_ for information on configuring the required permissions
in AWS. You must also create an S3 Bucket for use by Nodepool if
uploading images (except when using the ebs-direct upload method).
Selecting the ``aws`` driver adds the following options to the
:attr:`providers` section of the configuration.
@ -419,8 +419,9 @@ Selecting the ``aws`` driver adds the following options to the
The root `EBS volume type`_ for the image.
Only used with the
:value:`providers.[aws].diskimages.import-method.snapshot`
import method.
:value:`providers.[aws].diskimages.import-method.snapshot` or
:value:`providers.[aws].diskimages.import-method.ebs-direct`
import methods.
.. attr:: volume-size
:type: int
@ -428,8 +429,9 @@ Selecting the ``aws`` driver adds the following options to the
The size of the root EBS volume, in GiB, for the image. If
omitted, the volume size reported for the imported snapshot
will be used. Only used with the
:value:`providers.[aws].diskimages.import-method.snapshot`
import method.
:value:`providers.[aws].diskimages.import-method.snapshot` or
:value:`providers.[aws].diskimages.import-method.ebs-direct`
import methods.
.. attr:: imds-support
:type: str
@ -437,8 +439,9 @@ Selecting the ``aws`` driver adds the following options to the
To enforce usage of IMDSv2 by default on instances created
from the image, set this value to `v2.0`. If omitted, IMDSv2
is optional by default. This is only supported using the
:value:`providers.[aws].diskimages.import-method.snapshot`
import method.
:value:`providers.[aws].diskimages.import-method.snapshot` or
:value:`providers.[aws].diskimages.import-method.ebs-direct`
import methods.
.. attr:: import-method
:default: snapshot
@ -455,6 +458,12 @@ Selecting the ``aws`` driver adds the following options to the
operating systems which require special licensing or other
metadata in AWS.
.. value:: ebs-direct
This is similar to the `snapshot` method, but uses the
`EBS direct API`_ instead of S3. This may be faster and
more efficient, but it may incur additional costs.
.. value:: image
This method uploads the image file to AWS and performs an
@ -843,3 +852,4 @@ Selecting the ``aws`` driver adds the following options to the
.. _`VM Import/Export service role`: https://docs.aws.amazon.com/vm-import/latest/userguide/vmie_prereqs.html#vmimport-role
.. _`instance quotas`: https://us-west-1.console.aws.amazon.com/servicequotas/home/services/ec2/quotas
.. _`AWS RegisterImage API documentation`: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_RegisterImage.html
.. _`EBS direct API`: https://docs.aws.amazon.com/ebs/latest/userguide/ebs-accessing-snapshot.html

View File

@ -13,23 +13,27 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
import base64
import cachetools.func
from concurrent.futures import ThreadPoolExecutor
import copy
import functools
import hashlib
import json
import logging
import math
import queue
import re
import threading
import queue
import time
import urllib.parse
from uuid import uuid4
from nodepool.driver.utils import (
QuotaInformation,
LazyExecutorTTLCache,
RateLimiter,
ImageUploader,
)
from nodepool.driver import statemachine
from nodepool import exceptions
@ -214,6 +218,8 @@ CACHE_TTL = 10
SERVICE_QUOTA_CACHE_TTL = 300
ON_DEMAND = 0
SPOT = 1
KIB = 1024
GIB = 1024 ** 3
class AwsInstance(statemachine.Instance):
@ -416,6 +422,100 @@ class AwsCreateStateMachine(statemachine.StateMachine):
self.host, self.quota)
class EBSSnapshotUploader(ImageUploader):
segment_size = 512 * KIB
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.segment_count = 0
def shouldRetryException(self, exception):
# Strictly speaking, ValidationException is only retryable
# if we get a particular message, but that's impractical
# to reproduce for testing.
# https://docs.aws.amazon.com/ebs/latest/userguide/error-retries.html
ex = self.adapter.ebs_client.exceptions
if isinstance(exception, (
ex.RequestThrottledException,
ex.InternalServerException,
ex.ValidationException,
)):
return True
return False
def _rateLimited(self, func):
def rateLimitedFunc(*args, **kw):
with self.adapter.rate_limiter:
return func(*args, **kw)
return rateLimitedFunc
def uploadSegment(self, segment):
# There is a default limit of 1000 put requests/second.
# Actual value is available as a service quota. We don't
# expect to hit this. If we do, and we need to rate-limit, we
# will need to coordinate with other builders.
# https://docs.aws.amazon.com/ebs/latest/userguide/ebs-resource-quotas.html
data = segment.data
if len(data) < self.segment_size:
# Add zeros if the last block is smaller since the
# block size in AWS is constant.
data = data.ljust(self.segment_size, b'\0')
checksum = hashlib.sha256(data)
checksum_base64 = base64.b64encode(checksum.digest()).decode('utf-8')
response = self.retry(
self.adapter.ebs_client.put_snapshot_block,
SnapshotId=self.snapshot_id,
BlockIndex=segment.index,
BlockData=data,
DataLength=len(data),
Checksum=checksum_base64,
ChecksumAlgorithm='SHA256',
)
if (response['Checksum'] != checksum_base64):
raise Exception("Checksums do not match; received "
f"{response['Checksum']} expected {checksum}")
self.segment_count += 1
def startUpload(self):
# This is used by AWS to ensure idempotency across retries
token = uuid4().hex
# Volume size is in GiB
size = math.ceil(self.size / GIB)
response = self.retry(
self._rateLimited(self.adapter.ebs_client.start_snapshot),
VolumeSize=size,
ClientToken=token,
Tags=tag_dict_to_list(self.metadata),
)
self.snapshot_id = response['SnapshotId']
def finishUpload(self):
while True:
response = self.retry(
self._rateLimited(self.adapter.ebs_client.complete_snapshot),
SnapshotId=self.snapshot_id,
ChangedBlocksCount=self.segment_count,
)
if response['Status'] == 'error':
raise Exception("Snapshot in error state")
if response['Status'] == 'completed':
break
self.checkTimeout()
return self.size, self.snapshot_id
def abortUpload(self):
try:
self.finishUpload()
except Exception:
pass
with self.adapter.rate_limiter:
snapshot_id = getattr(self, 'snapshot_id', None)
if snapshot_id:
self.adapter.ec2_client.delete_snapshot(
SnapshotId=self.snapshot_id)
class AwsAdapter(statemachine.Adapter):
IMAGE_UPLOAD_SLEEP = 30
@ -489,6 +589,7 @@ class AwsAdapter(statemachine.Adapter):
self.s3 = self.aws.resource('s3')
self.s3_client = self.aws.client('s3')
self.aws_quotas = self.aws.client("service-quotas")
self.ebs_client = self.aws.client('ebs')
workers = 10
self.log.info("Create executor with max workers=%s", workers)
@ -748,34 +849,93 @@ class AwsAdapter(statemachine.Adapter):
image_format, metadata, md5, sha256):
self.log.debug(f"Uploading image {image_name}")
# Upload image to S3
bucket_name = self.provider.object_storage['bucket-name']
bucket = self.s3.Bucket(bucket_name)
object_filename = f'{image_name}.{image_format}'
extra_args = {'Tagging': urllib.parse.urlencode(metadata)}
# There is no IMDS support option for the import_image call
if (provider_image.import_method == 'image' and
provider_image.imds_support == 'v2.0'):
raise Exception("IMDSv2 requires 'snapshot' import method")
with open(filename, "rb") as fobj:
with self.rate_limiter:
bucket.upload_fileobj(fobj, object_filename,
ExtraArgs=extra_args)
if provider_image.import_method != 'ebs-direct':
# Upload image to S3
bucket_name = self.provider.object_storage['bucket-name']
bucket = self.s3.Bucket(bucket_name)
object_filename = f'{image_name}.{image_format}'
extra_args = {'Tagging': urllib.parse.urlencode(metadata)}
with open(filename, "rb") as fobj:
with self.rate_limiter:
bucket.upload_fileobj(fobj, object_filename,
ExtraArgs=extra_args)
if provider_image.import_method == 'image':
image_id = self._uploadImageImage(
provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename)
else:
elif provider_image.import_method == 'snapshot':
image_id = self._uploadImageSnapshot(
provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename)
elif provider_image.import_method == 'ebs-direct':
image_id = self._uploadImageSnapshotEBS(
provider_image, image_name, filename,
image_format, metadata)
else:
raise Exception("Unknown image import method")
return image_id
def _registerImage(self, provider_image, image_name, metadata,
volume_size, snapshot_id):
# Register the snapshot as an AMI
with self.rate_limiter:
bdm = {
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': snapshot_id,
'VolumeSize': volume_size,
'VolumeType': provider_image.volume_type,
},
}
if provider_image.iops:
bdm['Ebs']['Iops'] = provider_image.iops
if provider_image.throughput:
bdm['Ebs']['Throughput'] = provider_image.throughput
args = dict(
Architecture=provider_image.architecture,
BlockDeviceMappings=[bdm],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
EnaSupport=provider_image.ena_support,
Name=image_name,
TagSpecifications=[
{
'ResourceType': 'image',
'Tags': tag_dict_to_list(metadata),
},
]
)
if provider_image.imds_support == 'v2.0':
args['ImdsSupport'] = 'v2.0'
return self.ec2_client.register_image(**args)
def _uploadImageSnapshotEBS(self, provider_image, image_name, filename,
image_format, metadata):
# Import snapshot
uploader = EBSSnapshotUploader(self, self.log, filename, metadata)
self.log.debug(f"Importing {image_name} as EBS snapshot")
volume_size, snapshot_id = uploader.upload(
self.provider.image_import_timeout)
register_response = self._registerImage(
provider_image, image_name, metadata, volume_size, snapshot_id,
)
self.log.debug(f"Upload of {image_name} complete as "
f"{register_response['ImageId']}")
return register_response['ImageId']
def _uploadImageSnapshot(self, provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename):
@ -848,43 +1008,10 @@ class AwsAdapter(statemachine.Adapter):
self.log.exception("Error tagging snapshot:")
volume_size = provider_image.volume_size or snap['VolumeSize']
# Register the snapshot as an AMI
with self.rate_limiter:
bdm = {
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': task[
'SnapshotTaskDetail']['SnapshotId'],
'VolumeSize': volume_size,
'VolumeType': provider_image.volume_type,
},
}
if provider_image.iops:
bdm['Ebs']['Iops'] = provider_image.iops
if provider_image.throughput:
bdm['Ebs']['Throughput'] = provider_image.throughput
args = dict(
Architecture=provider_image.architecture,
BlockDeviceMappings=[bdm],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
EnaSupport=provider_image.ena_support,
Name=image_name,
)
if provider_image.imds_support == 'v2.0':
args['ImdsSupport'] = 'v2.0'
register_response = self.ec2_client.register_image(**args)
# Tag the AMI
try:
with self.rate_limiter:
self.ec2_client.create_tags(
Resources=[register_response['ImageId']],
Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging AMI:")
snapshot_id = task['SnapshotTaskDetail']['SnapshotId']
register_response = self._registerImage(
provider_image, image_name, metadata, volume_size, snapshot_id,
)
self.log.debug(f"Upload of {image_name} complete as "
f"{register_response['ImageId']}")

View File

@ -107,8 +107,9 @@ class AwsProviderDiskImage(ConfigValue):
self.import_method = image.get('import-method', 'snapshot')
self.imds_support = image.get('imds-support', None)
if (self.imds_support == 'v2.0' and
self.import_method != 'snapshot'):
raise Exception("IMDSv2 requires 'snapshot' import method")
self.import_method == 'image'):
raise Exception("IMDSv2 requires 'snapshot' or 'ebs-direct' "
"import method")
self.iops = image.get('iops', None)
self.throughput = image.get('throughput', None)
@ -131,7 +132,7 @@ class AwsProviderDiskImage(ConfigValue):
'ena-support': bool,
'volume-size': int,
'volume-type': str,
'import-method': v.Any('snapshot', 'image'),
'import-method': v.Any('snapshot', 'ebs-direct', 'image'),
'imds-support': v.Any('v2.0', None),
'iops': int,
'throughput': int,

View File

@ -16,9 +16,11 @@
# limitations under the License.
import abc
import concurrent.futures
import copy
import logging
import math
import os
import threading
import time
from collections import defaultdict
@ -536,3 +538,114 @@ class LazyExecutorTTLCache:
self.last_time = time.monotonic()
return self.last_value
return decorator
class Segment:
def __init__(self, index, offset, data):
self.index = index
self.offset = offset
self.data = data
class ImageUploader:
"""
A helper class for drivers that upload large images in chunks.
"""
# These values probably don't need to be changed
error_retries = 3
concurrency = 10
# Subclasses must implement these
segment_size = None
def __init__(self, adapter, log, path, metadata):
if self.segment_size is None:
raise Exception("Subclass must set block size")
self.adapter = adapter
self.log = log
self.path = path
self.size = os.path.getsize(path)
self.metadata = metadata
self.timeout = None
def shouldRetryException(self, exception):
return True
def uploadSegment(self, segment):
pass
def startUpload(self):
pass
def finishUpload(self):
pass
def abortUpload(self):
pass
# Main API
def upload(self, timeout=None):
if timeout:
self.timeout = time.monotonic() + timeout
self.startUpload()
try:
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.concurrency) as executor:
with open(self.path, 'rb') as image_file:
self._uploadInner(executor, image_file)
return self.finishUpload()
except Exception:
self.log.exception("Error uploading image:")
self.abortUpload()
# Subclasses can use this helper method for wrapping retryable calls
def retry(self, func, *args, **kw):
for x in range(self.error_retries):
try:
return func(*args, **kw)
except Exception as e:
if not self.shouldRetryException(e):
raise
if x + 1 >= self.error_retries:
raise
time.sleep(2 * x)
def getTimeout(self):
if self.timeout is None:
return None
return self.timeout - time.monotonic()
def checkTimeout(self):
if self.timeout is None:
return
if self.getTimeout() < 0:
raise Exception("Timed out uploading image")
# Internal methods
def _uploadInner(self, executor, image_file):
futures = set()
for index, offset in enumerate(range(0, self.size, self.segment_size)):
segment = Segment(index, offset,
image_file.read(self.segment_size))
future = executor.submit(self.uploadSegment, segment)
futures.add(future)
# Keep the pool of workers supplied with data but without
# reading the entire file into memory.
if len(futures) >= (self.concurrency * 2):
(done, futures) = concurrent.futures.wait(
futures,
timeout=self.getTimeout(),
return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
future.result()
# Only check the timeout after waiting (not every pass
# through the loop)
self.checkTimeout()
# We're done reading the file, wait for all uploads to finish
(done, futures) = concurrent.futures.wait(
futures,
timeout=self.getTimeout())
for future in done:
future.result()
self.checkTimeout()

View File

@ -0,0 +1,70 @@
elements-dir: .
images-dir: '{images_dir}'
build-log-dir: '{build_log_dir}'
build-log-retention: 1
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: diskimage
providers:
- name: ec2-us-west-2
driver: aws
rate: 2
region-name: us-west-2
object-storage:
bucket-name: nodepool
image-import-timeout: 60
diskimages:
- name: fake-image
tags:
provider_metadata: provider
import-method: ebs-direct
volume-type: gp3
iops: 1000
throughput: 100
imds-support: v2.0
pools:
- name: main
max-servers: 1
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: diskimage
diskimage: fake-image
instance-type: t3.medium
key-name: zuul
iops: 2000
throughput: 200
diskimages:
- name: fake-image
elements:
- fedora-minimal
- vm
release: 21
dib-cmd: nodepool/tests/fake-image-create
env-vars:
TMPDIR: /opt/dib_tmp
DIB_IMAGE_CACHE: /opt/dib_cache
DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/
BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2
metadata:
diskimage_metadata: diskimage
username: another_user

View File

@ -833,10 +833,12 @@ class TestDriverAws(tests.DBTestCase):
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertFalse('ImdsSupport' in self.register_image_calls[0])
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
@ -923,10 +925,12 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
@ -963,6 +967,56 @@ class TestDriverAws(tests.DBTestCase):
with testtools.ExpectedException(Exception, "IMDSv2 requires"):
self.useBuilder(configfile)
def test_aws_diskimage_ebs_snapshot_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config(
'aws/diskimage-imdsv2-ebs-snapshot.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertEqual(
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_removal(self):
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)

View File

@ -0,0 +1,6 @@
---
features:
- |
The AWS driver now supports optionally uploading diskimages using
the EBS direct APIs. This may be faster and more efficient since
it bypasses S3, but it may incur additional costs.

View File

@ -19,7 +19,7 @@ kazoo==2.9.0
Paste
WebOb>=1.8.1
openshift>=0.13.1,<0.14.0
boto3>=1.20.0
boto3>=1.34.141
google-api-python-client
# botocore 1.23.0 (via boto3 1.20.0) requires urllib 1.26.0 or newer:
# https://github.com/boto/botocore/issues/2562