
As uploading image, it will compress the vdh file and generate the gzipped tar file. This commit will change it to use default compress level as 6; and also allow the API's user to specify a customizing compress level basing on demand. Change-Id: If343326d7a501b25662fecb0b57d7637acabe250
298 lines
11 KiB
Python
298 lines
11 KiB
Python
# Copyright 2017 Citrix Systems
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import eventlet
|
|
import logging
|
|
from six.moves import http_client as httplib
|
|
import six.moves.urllib.parse as urlparse
|
|
import tarfile
|
|
|
|
from os_xenapi.client import exception
|
|
from os_xenapi.client.image import vhd_utils
|
|
from os_xenapi.client import utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
CHUNK_SIZE = 4 * 1024 * 1024
|
|
DEFAULT_COMPRESSLEVEL = 6
|
|
|
|
|
|
class ImageStreamToVDIs(object):
|
|
def __init__(self, context, session, instance, host_url, sr_ref,
|
|
image_stream_in):
|
|
self.context = context
|
|
self.session = session
|
|
self.instance = instance
|
|
self.host_url = urlparse.urlparse(host_url)
|
|
self.image_stream = image_stream_in
|
|
self.sr_ref = sr_ref
|
|
self.task_ref = None
|
|
self.vdis = {}
|
|
|
|
def _clean(self):
|
|
if self.task_ref:
|
|
self.session.task.destroy(self.task_ref)
|
|
|
|
def start(self):
|
|
label = 'VDI_IMPORT_for_' + self.instance['name']
|
|
desc = 'Importing VDI for instance: %s' % self.instance['name']
|
|
self.task_ref = self.session.task.create(label, desc)
|
|
try:
|
|
with tarfile.open(mode="r|gz", fileobj=self.image_stream) as tar:
|
|
for vhd in tar:
|
|
file_size = vhd.size
|
|
LOG.debug("file_name:file_size is %(n)s:%(s)d",
|
|
{'n': vhd.name, 's': vhd.size})
|
|
vhd_file = tar.extractfile(vhd)
|
|
vhd_file_parser = vhd_utils.VHDFileParser(vhd_file)
|
|
vhd_footer = vhd_file_parser.parse_vhd_footer()
|
|
virtual_size = vhd_footer.current_size
|
|
vdi_ref = self._createVDI(self.session,
|
|
self.instance,
|
|
virtual_size)
|
|
|
|
self._vhd_stream_to_vdi(vhd_file_parser, vdi_ref,
|
|
file_size)
|
|
vdi_uuid = self.session.VDI.get_uuid(vdi_ref)
|
|
if 'root' in self.vdis.keys():
|
|
# we only support single vdi. If 'root' already exists
|
|
# in the dict, should raise exception.
|
|
msg = "Only support single VDI; but there are " + \
|
|
"multiple VDIs in the image."
|
|
raise exception.InvalidImage(details=msg)
|
|
|
|
self.vdis['root'] = dict(uuid=vdi_uuid)
|
|
finally:
|
|
self._clean()
|
|
|
|
def _createVDI(self, session, instance, virtual_size):
|
|
vdi_ref = utils.create_vdi(session, self.sr_ref, instance,
|
|
instance['name'], 'root', virtual_size)
|
|
vdi_uuid = session.VDI.get_uuid(vdi_ref)
|
|
LOG.debug("Created a new VDI: uuid=%s" % vdi_uuid)
|
|
return vdi_ref
|
|
|
|
def _vhd_stream_to_vdi(self, vhd_file_parser, vdi_ref, file_size):
|
|
|
|
headers = {'Content-Type': 'application/octet-stream',
|
|
'Content-Length': '%s' % file_size}
|
|
|
|
if self.host_url.scheme == 'http':
|
|
conn = httplib.HTTPConnection(self.host_url.netloc)
|
|
elif self.host_url.scheme == 'https':
|
|
conn = httplib.HTTPSConnection(self.host_url.netloc)
|
|
|
|
vdi_import_path = utils.get_vdi_import_path(
|
|
self.session, self.task_ref, vdi_ref)
|
|
try:
|
|
conn.connect()
|
|
except Exception:
|
|
LOG.error('Failed connecting to host: %s', self.host_url.netloc)
|
|
raise exception.HostConnectionFailure(
|
|
host_netloc=self.host_url.netloc)
|
|
|
|
try:
|
|
conn.request('PUT', vdi_import_path, headers=headers)
|
|
# Send the data already processed by vhd file parser firstly;
|
|
# then send the remaining data from the stream.
|
|
conn.send(vhd_file_parser.cached_buff)
|
|
remain_size = file_size - len(vhd_file_parser.cached_buff)
|
|
file_obj = vhd_file_parser.src_file
|
|
while remain_size >= CHUNK_SIZE:
|
|
chunk = file_obj.read(CHUNK_SIZE)
|
|
remain_size -= CHUNK_SIZE
|
|
conn.send(chunk)
|
|
if remain_size != 0:
|
|
chunk = file_obj.read(remain_size)
|
|
conn.send(chunk)
|
|
except Exception:
|
|
LOG.error('Failed importing VDI from VHD stream - vdi_ref:%s',
|
|
vdi_ref)
|
|
raise exception.VdiImportFailure(vdi_ref=vdi_ref)
|
|
finally:
|
|
resp = conn.getresponse()
|
|
LOG.debug("Connection response status/reason is "
|
|
"%(status)s:%(reason)s",
|
|
{'status': resp.status, 'reason': resp.reason})
|
|
conn.close()
|
|
|
|
|
|
class GenerateImageStream(object):
|
|
def __init__(self, context, session, instance, host_url, vdi_uuids,
|
|
compresslevel=None):
|
|
self.context = context
|
|
self.session = session
|
|
self.instance = instance
|
|
self.host_url = host_url
|
|
self.vdi_uuids = vdi_uuids
|
|
self.compresslevel = self._get_compress_level(compresslevel)
|
|
|
|
def _get_compress_level(self, compresslevel):
|
|
if compresslevel and compresslevel >= 1 and compresslevel <= 9:
|
|
return compresslevel
|
|
# if compresslevel is not set or not a valid value, just return
|
|
# the default value.
|
|
return DEFAULT_COMPRESSLEVEL
|
|
|
|
def get_image_data(self):
|
|
"""This function will:
|
|
|
|
1). export VDI as VHD stream;
|
|
2). make gzipped tarball from the VHD stream;
|
|
3). read from the tarball stream and return the iterable data.
|
|
"""
|
|
|
|
tarpipe_out, tarpipe_in = utils.create_pipe()
|
|
pool = eventlet.GreenPool()
|
|
pool.spawn(self.start_image_stream_generator, tarpipe_in)
|
|
try:
|
|
while True:
|
|
data = tarpipe_out.read(CHUNK_SIZE)
|
|
if not data:
|
|
break
|
|
yield data
|
|
except Exception:
|
|
LOG.debug("Failed to read chunks from the tarfile "
|
|
"stream.")
|
|
raise
|
|
finally:
|
|
tarpipe_out.close()
|
|
pool.waitall()
|
|
|
|
def start_image_stream_generator(self, tarpipe_in):
|
|
tar_generator = VdisToTarStream(
|
|
self.context, self.session, self.instance, self.host_url,
|
|
self.vdi_uuids, tarpipe_in, self.compresslevel)
|
|
try:
|
|
tar_generator.start()
|
|
finally:
|
|
tarpipe_in.close()
|
|
|
|
|
|
class VdisToTarStream(object):
|
|
def __init__(self, context, session, instance, host_url, vdi_uuids,
|
|
tarpipe_in, compresslevel):
|
|
self.context = context
|
|
self.session = session
|
|
self.instance = instance
|
|
self.host_url = host_url
|
|
self.vdi_uuids = vdi_uuids
|
|
self.tarpipe_in = tarpipe_in
|
|
self.compresslevel = compresslevel
|
|
self.conn = None
|
|
self.task_ref = None
|
|
|
|
def start(self):
|
|
# Start thread to generate tgz and write tgz data into tarpipe_in.
|
|
with tarfile.TarFile.gzopen(
|
|
name=None, fileobj=self.tarpipe_in, mode='w',
|
|
compresslevel=self.compresslevel) as tar_file:
|
|
|
|
# only need export the leaf vdi.
|
|
vdi_uuid = self.vdi_uuids[0]
|
|
vdi_ref = self.session.VDI.get_by_uuid(vdi_uuid)
|
|
vhd_stream = self._connect_request(vdi_ref)
|
|
tar_info = tarfile.TarInfo('0.vhd')
|
|
try:
|
|
# the VHD must be dynamical hard disk, otherwise it will raise
|
|
# VhdDiskTypeNotSupported exception when parsing VDH file.
|
|
vhd_DynDisk = vhd_utils.VHDDynDiskParser(vhd_stream)
|
|
tar_info.size = vhd_DynDisk.get_vhd_file_size()
|
|
LOG.debug("VHD size for tarfile is %d" % tar_info.size)
|
|
vhdpipe_out, vhdpipe_in = utils.create_pipe()
|
|
pool = eventlet.GreenPool()
|
|
pool.spawn(self.convert_vhd_to_tar, vhdpipe_out,
|
|
tar_file, tar_info)
|
|
try:
|
|
self._vhd_to_pipe(vhd_DynDisk, vhdpipe_in)
|
|
finally:
|
|
vhdpipe_in.close()
|
|
|
|
pool.waitall()
|
|
finally:
|
|
self._clean()
|
|
|
|
def convert_vhd_to_tar(self, vhdpipe_out, tar_file, tar_info):
|
|
tarGenerator = AddVhdToTar(tar_file, tar_info, vhdpipe_out)
|
|
try:
|
|
tarGenerator.start()
|
|
finally:
|
|
vhdpipe_out.close()
|
|
|
|
def _connect_request(self, vdi_ref):
|
|
# request connection to xapi url service for VDI export
|
|
try:
|
|
# create task for VDI export
|
|
label = 'VDI_EXPORT_for_' + self.instance['name']
|
|
desc = 'Exporting VDI for instance: %s' % self.instance['name']
|
|
self.task_ref = self.session.task.create(label, desc)
|
|
LOG.debug("task_ref is %s" % self.task_ref)
|
|
# connect to XS
|
|
xs_url = urlparse.urlparse(self.host_url)
|
|
if xs_url.scheme == 'http':
|
|
conn = httplib.HTTPConnection(xs_url.netloc)
|
|
LOG.debug("using http")
|
|
elif xs_url.scheme == 'https':
|
|
conn = httplib.HTTPSConnection(xs_url.netloc)
|
|
LOG.debug("using https")
|
|
vdi_export_path = utils.get_vdi_export_path(
|
|
self.session, self.task_ref, vdi_ref)
|
|
conn.request('GET', vdi_export_path)
|
|
conn_resp = conn.getresponse()
|
|
except Exception:
|
|
LOG.debug('request connect for vdi export failed')
|
|
raise
|
|
return conn_resp
|
|
|
|
def _vhd_to_pipe(self, vhd_dynDisk, vhdpipe_in):
|
|
# Firstly write the data already parsed by vhd_dynDisk obj;
|
|
# then write all of the remaining data to the pipe also.
|
|
vhdpipe_in.write(vhd_dynDisk.cached_buff)
|
|
remain_data = vhd_dynDisk.src_file
|
|
while True:
|
|
data = remain_data.read(CHUNK_SIZE)
|
|
if not data:
|
|
break
|
|
try:
|
|
vhdpipe_in.write(data)
|
|
except Exception:
|
|
LOG.debug("Failed when writing data to VHD stream.")
|
|
raise
|
|
|
|
def _clean(self):
|
|
if self.conn:
|
|
self.conn.close()
|
|
if self.task_ref:
|
|
self.session.task.destroy(self.task_ref)
|
|
|
|
|
|
class AddVhdToTar(object):
|
|
def __init__(self, tar_file, tar_info, vhdpipe_out):
|
|
self.tar_file = tar_file
|
|
self.tar_info = tar_info
|
|
self.stream = vhdpipe_out
|
|
|
|
def start(self):
|
|
self._add_stream_to_tar()
|
|
|
|
def _add_stream_to_tar(self):
|
|
try:
|
|
LOG.debug('self.tar_info.size=%d' % self.tar_info.size)
|
|
self.tar_file.addfile(self.tar_info, fileobj=self.stream)
|
|
LOG.debug('added file %s' % self.tar_info.name)
|
|
except IOError:
|
|
LOG.debug('IOError when streaming vhd to tarball')
|
|
raise
|