#!/usr/bin/env python # Copyright (c) 2012 OpenStack Foundation # Copyright (c) 2010 Citrix Systems, Inc. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace # which means the Nova xenapi plugins must use only Python 2.4 features # TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true """Handle the uploading and downloading of images via Glance.""" try: import httplib except ImportError: from six.moves import http_client as httplib try: import json except ImportError: import simplejson as json import md5 # noqa import socket import urllib2 from urlparse import urlparse import dom0_pluginlib import utils import XenAPI dom0_pluginlib.configure_logging('glance') logging = dom0_pluginlib.logging PluginError = dom0_pluginlib.PluginError SOCKET_TIMEOUT_SECONDS = 90 class RetryableError(Exception): pass def _create_connection(scheme, netloc): if scheme == 'https': conn = httplib.HTTPSConnection(netloc) else: conn = httplib.HTTPConnection(netloc) conn.connect() return conn def _download_tarball_and_verify(request, staging_path): # NOTE(johngarbutt) By default, there is no timeout. # To ensure the script does not hang if we lose connection # to glance, we add this socket timeout. # This is here so there is no chance the timeout out has # been adjusted by other library calls. socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) try: response = urllib2.urlopen(request) except urllib2.HTTPError, error: # noqa raise RetryableError(error) except urllib2.URLError, error: # noqa raise RetryableError(error) except httplib.HTTPException, error: # noqa # httplib.HTTPException and derivatives (BadStatusLine in particular) # don't have a useful __repr__ or __str__ raise RetryableError('%s: %s' % (error.__class__.__name__, error)) url = request.get_full_url() logging.info("Reading image data from %s" % url) callback_data = {'bytes_read': 0} checksum = md5.new() def update_md5(chunk): callback_data['bytes_read'] += len(chunk) checksum.update(chunk) try: try: utils.extract_tarball(response, staging_path, callback=update_md5) except Exception, error: # noqa raise RetryableError(error) finally: bytes_read = callback_data['bytes_read'] logging.info("Read %d bytes from %s", bytes_read, url) # Use ETag if available, otherwise content-md5(v2) or # X-Image-Meta-Checksum(v1) etag = response.info().getheader('etag', None) if etag is None: etag = response.info().getheader('content-md5', None) if etag is None: etag = response.info().getheader('x-image-meta-checksum', None) # Verify checksum using ETag checksum = checksum.hexdigest() if etag is None: msg = "No ETag found for comparison to checksum %(checksum)s" logging.info(msg % {'checksum': checksum}) elif checksum != etag: msg = 'ETag %(etag)s does not match computed md5sum %(checksum)s' raise RetryableError(msg % {'checksum': checksum, 'etag': etag}) else: msg = "Verified image checksum %(checksum)s" logging.info(msg % {'checksum': checksum}) def _download_tarball_v1(sr_path, staging_path, image_id, glance_host, glance_port, glance_use_ssl, extra_headers): # Download the tarball image from Glance v1 and extract it into the # staging area. Retry if there is any failure. if glance_use_ssl: scheme = 'https' else: scheme = 'http' endpoint = "%(scheme)s://%(glance_host)s:%(glance_port)d" % { 'scheme': scheme, 'glance_host': glance_host, 'glance_port': glance_port} _download_tarball_by_url_v1(sr_path, staging_path, image_id, endpoint, extra_headers) def _download_tarball_by_url_v1( sr_path, staging_path, image_id, glance_endpoint, extra_headers): # Download the tarball image from Glance v1 and extract it into the # staging area. url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { 'glance_endpoint': glance_endpoint, 'image_id': image_id} logging.info("Downloading %s with glance v1 api" % url) request = urllib2.Request(url, headers=extra_headers) try: _download_tarball_and_verify(request, staging_path) except Exception: logging.exception('Failed to retrieve %(url)s' % {'url': url}) raise def _download_tarball_by_url_v2( sr_path, staging_path, image_id, glance_endpoint, extra_headers): # Download the tarball image from Glance v2 and extract it into the # staging area. url = "%(glance_endpoint)s/v2/images/%(image_id)s/file" % { 'glance_endpoint': glance_endpoint, 'image_id': image_id} logging.debug("Downloading %s with glance v2 api" % url) request = urllib2.Request(url, headers=extra_headers) try: _download_tarball_and_verify(request, staging_path) except Exception: logging.exception('Failed to retrieve %(url)s' % {'url': url}) raise def _upload_tarball_v1(staging_path, image_id, glance_host, glance_port, glance_use_ssl, extra_headers, properties): if glance_use_ssl: scheme = 'https' else: scheme = 'http' url = '%s://%s:%s' % (scheme, glance_host, glance_port) _upload_tarball_by_url_v1(staging_path, image_id, url, extra_headers, properties) def _upload_tarball_by_url_v1(staging_path, image_id, glance_endpoint, extra_headers, properties): """Create a tarball of the image and then stream that into Glance v1 Using chunked-transfer-encoded HTTP. """ # NOTE(johngarbutt) By default, there is no timeout. # To ensure the script does not hang if we lose connection # to glance, we add this socket timeout. # This is here so there is no chance the timeout out has # been adjusted by other library calls. socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) logging.debug("Uploading image %s with glance v1 api" % image_id) url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { 'glance_endpoint': glance_endpoint, 'image_id': image_id} logging.info("Writing image data to %s" % url) # NOTE(sdague): this is python 2.4, which means urlparse returns a # tuple, not a named tuple. # 0 - scheme # 1 - host:port (aka netloc) # 2 - path parts = urlparse(url) try: conn = _create_connection(parts[0], parts[1]) except Exception, error: # noqa logging.exception('Failed to connect %(url)s' % {'url': url}) raise RetryableError(error) try: validate_image_status_before_upload_v1(conn, url, extra_headers) try: # NOTE(sirp): httplib under python2.4 won't accept # a file-like object to request conn.putrequest('PUT', parts[2]) # NOTE(sirp): There is some confusion around OVF. Here's a summary # of where we currently stand: # 1. OVF as a container format is misnamed. We really should be # using OVA since that is the name for the container format; # OVF is the standard applied to the manifest file contained # within. # 2. We're currently uploading a vanilla tarball. In order to be # OVF/OVA compliant, we'll need to embed a minimal OVF # manifest as the first file. # NOTE(dprince): In order to preserve existing Glance properties # we set X-Glance-Registry-Purge-Props on this request. headers = { 'content-type': 'application/octet-stream', 'transfer-encoding': 'chunked', 'x-image-meta-is-public': 'False', 'x-image-meta-status': 'queued', 'x-image-meta-disk-format': 'vhd', 'x-image-meta-container-format': 'ovf', 'x-glance-registry-purge-props': 'False'} headers.update(**extra_headers) for key, value in properties.items(): header_key = "x-image-meta-property-%s" % key.replace('_', '-') headers[header_key] = str(value) for header, value in headers.items(): conn.putheader(header, value) conn.endheaders() except Exception, error: # noqa logging.exception('Failed to upload %(url)s' % {'url': url}) raise RetryableError(error) callback_data = {'bytes_written': 0} def send_chunked_transfer_encoded(chunk): chunk_len = len(chunk) callback_data['bytes_written'] += chunk_len try: conn.send("%x\r\n%s\r\n" % (chunk_len, chunk)) except Exception, error: # noqa logging.exception('Failed to upload when sending chunks') raise RetryableError(error) compression_level = properties.get('xenapi_image_compression_level') utils.create_tarball( None, staging_path, callback=send_chunked_transfer_encoded, compression_level=compression_level) send_chunked_transfer_encoded('') # Chunked-Transfer terminator bytes_written = callback_data['bytes_written'] logging.info("Wrote %d bytes to %s" % (bytes_written, url)) resp = conn.getresponse() if resp.status == httplib.OK: return logging.error("Unexpected response while writing image data to %s: " "Response Status: %i, Response body: %s" % (url, resp.status, resp.read())) check_resp_status_and_retry(resp, image_id, url) finally: conn.close() def _update_image_meta_v2(conn, extra_headers, properties, patch_path): # NOTE(sirp): There is some confusion around OVF. Here's a summary # of where we currently stand: # 1. OVF as a container format is misnamed. We really should be # using OVA since that is the name for the container format; # OVF is the standard applied to the manifest file contained # within. # 2. We're currently uploading a vanilla tarball. In order to be # OVF/OVA compliant, we'll need to embed a minimal OVF # manifest as the first file. body = [ {"path": "/container_format", "value": "ovf", "op": "add"}, {"path": "/disk_format", "value": "vhd", "op": "add"}, {"path": "/visibility", "value": "private", "op": "add"}] headers = {'Content-Type': 'application/openstack-images-v2.1-json-patch'} headers.update(**extra_headers) for key, value in properties.items(): prop = {"path": "/%s" % key.replace('_', '-'), "value": str(value), "op": "add"} body.append(prop) body = json.dumps(body) conn.request('PATCH', patch_path, body=body, headers=headers) resp = conn.getresponse() resp.read() if resp.status == httplib.OK: return logging.error("Image meta was not updated. Status: %s, Reason: %s" % ( resp.status, resp.reason)) def _upload_tarball_by_url_v2(staging_path, image_id, glance_endpoint, extra_headers, properties): """Create a tarball of the image and then stream that into Glance v2 Using chunked-transfer-encoded HTTP. """ # NOTE(johngarbutt) By default, there is no timeout. # To ensure the script does not hang if we lose connection # to glance, we add this socket timeout. # This is here so there is no chance the timeout out has # been adjusted by other library calls. socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) logging.debug("Uploading imaged %s with glance v2 api" % image_id) url = "%(glance_endpoint)s/v2/images/%(image_id)s/file" % { 'glance_endpoint': glance_endpoint, 'image_id': image_id} # NOTE(sdague): this is python 2.4, which means urlparse returns a # tuple, not a named tuple. # 0 - scheme # 1 - host:port (aka netloc) # 2 - path parts = urlparse(url) try: conn = _create_connection(parts[0], parts[1]) except Exception, error: # noqa raise RetryableError(error) try: mgt_url = "%(glance_endpoint)s/v2/images/%(image_id)s" % { 'glance_endpoint': glance_endpoint, 'image_id': image_id} mgt_parts = urlparse(mgt_url) mgt_path = mgt_parts[2] _update_image_meta_v2(conn, image_id, extra_headers, properties, mgt_path) validate_image_status_before_upload_v2(conn, url, extra_headers, mgt_path) try: conn.connect() # NOTE(sirp): httplib under python2.4 won't accept # a file-like object to request conn.putrequest('PUT', parts[2]) headers = { 'content-type': 'application/octet-stream', 'transfer-encoding': 'chunked'} headers.update(**extra_headers) for header, value in headers.items(): conn.putheader(header, value) conn.endheaders() except Exception, error: # noqa logging.exception('Failed to upload %(url)s' % {'url': url}) raise RetryableError(error) callback_data = {'bytes_written': 0} def send_chunked_transfer_encoded(chunk): chunk_len = len(chunk) callback_data['bytes_written'] += chunk_len try: conn.send("%x\r\n%s\r\n" % (chunk_len, chunk)) except Exception, error: # noqa logging.exception('Failed to upload when sending chunks') raise RetryableError(error) compression_level = properties.get('xenapi_image_compression_level') utils.create_tarball( None, staging_path, callback=send_chunked_transfer_encoded, compression_level=compression_level) send_chunked_transfer_encoded('') # Chunked-Transfer terminator bytes_written = callback_data['bytes_written'] logging.info("Wrote %d bytes to %s" % (bytes_written, url)) resp = conn.getresponse() if resp.status == httplib.NO_CONTENT: return logging.error("Unexpected response while writing image data to %s: " "Response Status: %i, Response body: %s" % (url, resp.status, resp.read())) check_resp_status_and_retry(resp, image_id, url) finally: conn.close() def check_resp_status_and_retry(resp, image_id, url): # Note(Jesse): This branch sorts errors into those that are permanent, # those that are ephemeral, and those that are unexpected. if resp.status in (httplib.BAD_REQUEST, # 400 httplib.UNAUTHORIZED, # 401 httplib.PAYMENT_REQUIRED, # 402 httplib.FORBIDDEN, # 403 httplib.METHOD_NOT_ALLOWED, # 405 httplib.NOT_ACCEPTABLE, # 406 httplib.PROXY_AUTHENTICATION_REQUIRED, # 407 httplib.CONFLICT, # 409 httplib.GONE, # 410 httplib.LENGTH_REQUIRED, # 411 httplib.PRECONDITION_FAILED, # 412 httplib.REQUEST_ENTITY_TOO_LARGE, # 413 httplib.REQUEST_URI_TOO_LONG, # 414 httplib.UNSUPPORTED_MEDIA_TYPE, # 415 httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416 httplib.EXPECTATION_FAILED, # 417 httplib.UNPROCESSABLE_ENTITY, # 422 httplib.LOCKED, # 423 httplib.FAILED_DEPENDENCY, # 424 httplib.UPGRADE_REQUIRED, # 426 httplib.NOT_IMPLEMENTED, # 501 httplib.HTTP_VERSION_NOT_SUPPORTED, # 505 httplib.NOT_EXTENDED, # 510 ): raise PluginError("Got Permanent Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) # Nova service would process the exception elif resp.status == httplib.NOT_FOUND: # 404 exc = XenAPI.Failure('ImageNotFound') raise exc # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We # optimistically retry on 500 errors below. elif resp.status in (httplib.REQUEST_TIMEOUT, # 408 httplib.INTERNAL_SERVER_ERROR, # 500 httplib.BAD_GATEWAY, # 502 httplib.SERVICE_UNAVAILABLE, # 503 httplib.GATEWAY_TIMEOUT, # 504 httplib.INSUFFICIENT_STORAGE, # 507 ): raise RetryableError("Got Ephemeral Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) else: # Note(Jesse): Assume unexpected errors are retryable. If you are # seeing this error message, the error should probably be added # to either the ephemeral or permanent error list. raise RetryableError("Got Unexpected Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) def validate_image_status_before_upload_v1(conn, url, extra_headers): try: parts = urlparse(url) path = parts[2] image_id = path.split('/')[-1] # NOTE(nikhil): Attempt to determine if the Image has a status # of 'queued'. Because data will continued to be sent to Glance # until it has a chance to check the Image state, discover that # it is not 'active' and send back a 409. Hence, the data will be # unnecessarily buffered by Glance. This wastes time and bandwidth. # LP bug #1202785 conn.request('HEAD', path, headers=extra_headers) head_resp = conn.getresponse() # NOTE(nikhil): read the response to re-use the conn object. body_data = head_resp.read(8192) if len(body_data) > 8: err_msg = ('Cannot upload data for image %(image_id)s as the ' 'HEAD call had more than 8192 bytes of data in ' 'the response body.' % {'image_id': image_id}) raise PluginError("Got Permanent Error while uploading image " "[%s] to glance [%s]. " "Message: %s" % (image_id, url, err_msg)) else: head_resp.read() except Exception, error: # noqa logging.exception('Failed to HEAD the image %(image_id)s while ' 'checking image status before attempting to ' 'upload %(url)s' % {'image_id': image_id, 'url': url}) raise RetryableError(error) if head_resp.status != httplib.OK: logging.error("Unexpected response while doing a HEAD call " "to image %s , url = %s , Response Status: " "%i" % (image_id, url, head_resp.status)) check_resp_status_and_retry(head_resp, image_id, url) else: image_status = head_resp.getheader('x-image-meta-status') if image_status not in ('queued', ): err_msg = ('Cannot upload data for image %(image_id)s as the ' 'image status is %(image_status)s' % {'image_id': image_id, 'image_status': image_status}) logging.exception(err_msg) raise PluginError("Got Permanent Error while uploading image " "[%s] to glance [%s]. " "Message: %s" % (image_id, url, err_msg)) else: logging.info('Found image %(image_id)s in status ' '%(image_status)s. Attempting to ' 'upload.' % {'image_id': image_id, 'image_status': image_status}) def validate_image_status_before_upload_v2(conn, url, extra_headers, get_path): try: parts = urlparse(url) path = parts[2] image_id = path.split('/')[-2] # NOTE(nikhil): Attempt to determine if the Image has a status # of 'queued'. Because data will continued to be sent to Glance # until it has a chance to check the Image state, discover that # it is not 'active' and send back a 409. Hence, the data will be # unnecessarily buffered by Glance. This wastes time and bandwidth. # LP bug #1202785 conn.request('GET', get_path, headers=extra_headers) get_resp = conn.getresponse() except Exception, error: # noqa logging.exception('Failed to GET the image %(image_id)s while ' 'checking image status before attempting to ' 'upload %(url)s' % {'image_id': image_id, 'url': url}) raise RetryableError(error) if get_resp.status != httplib.OK: logging.error("Unexpected response while doing a GET call " "to image %s , url = %s , Response Status: " "%i" % (image_id, url, get_resp.status)) check_resp_status_and_retry(get_resp, image_id, url) else: body = json.loads(get_resp.read()) image_status = body['status'] if image_status not in ('queued', ): err_msg = ('Cannot upload data for image %(image_id)s as the ' 'image status is %(image_status)s' % {'image_id': image_id, 'image_status': image_status}) logging.exception(err_msg) raise PluginError("Got Permanent Error while uploading image " "[%s] to glance [%s]. " "Message: %s" % (image_id, url, err_msg)) else: logging.info('Found image %(image_id)s in status ' '%(image_status)s. Attempting to ' 'upload.' % {'image_id': image_id, 'image_status': image_status}) get_resp.read() def download_vhd2(session, image_id, endpoint, uuid_stack, sr_path, extra_headers, api_version=1): # Download an image from Glance v2, unbundle it, and then deposit the # VHDs into the storage repository. staging_path = utils.make_staging_area(sr_path) try: # Download tarball into staging area and extract it # TODO(mfedosin): remove this check when v1 is deprecated. if api_version == 1: _download_tarball_by_url_v1( sr_path, staging_path, image_id, endpoint, extra_headers) else: _download_tarball_by_url_v2( sr_path, staging_path, image_id, endpoint, extra_headers) # Move the VHDs from the staging area into the storage repository return utils.import_vhds(sr_path, staging_path, uuid_stack) finally: utils.cleanup_staging_area(staging_path) def upload_vhd2(session, vdi_uuids, image_id, endpoint, sr_path, extra_headers, properties, api_version=1): """Bundle the VHDs comprising an image and then stream them into Glance""" staging_path = utils.make_staging_area(sr_path) try: utils.prepare_staging_area(sr_path, staging_path, vdi_uuids) # TODO(mfedosin): remove this check when v1 is deprecated. if api_version == 1: _upload_tarball_by_url_v1(staging_path, image_id, endpoint, extra_headers, properties) else: _upload_tarball_by_url_v2(staging_path, image_id, endpoint, extra_headers, properties) finally: utils.cleanup_staging_area(staging_path) if __name__ == '__main__': utils.register_plugin_calls(download_vhd2, upload_vhd2)