Refactor VmdkWriteHandle and VmdkReadHandle

Add a class VmdkHandle as the base class of VmdkWriteHandle and
VmdkReadHandle, and add related methods into it such as:
 def update_progress()
 def _release_lease()

Change-Id: I29c48773842c1006b8499aad054e3901074d6195
Depends-On: Ib5b388f7eeb9f26de66c308a89f70c85ba6dc7a9
This commit is contained in:
Javeme 2016-02-17 20:00:36 +08:00
parent 8ec8c09417
commit c19184765d
2 changed files with 228 additions and 234 deletions

View File

@ -59,8 +59,6 @@ class FileHandle(object):
"""
self._eof = False
self._file_handle = file_handle
self._last_logged_progress = 0
self._last_progress_udpate = 0
def _create_connection(self, url, method, cacerts=False,
ssl_thumbprint=None):
@ -192,51 +190,6 @@ class FileHandle(object):
return '%s://[%s]:%d' % (scheme, host, port)
return '%s://%s:%d' % (scheme, host, port)
def _fix_esx_url(self, url, host, port):
"""Fix netloc in the case of an ESX host.
In the case of an ESX host, the netloc is set to '*' in the URL
returned in HttpNfcLeaseInfo. It should be replaced with host name
or IP address.
"""
urlp = urlparse.urlparse(url)
if urlp.netloc == '*':
scheme, netloc, path, params, query, fragment = urlp
if netutils.is_valid_ipv6(host):
netloc = '[%s]:%d' % (host, port)
else:
netloc = "%s:%d" % (host, port)
url = urlparse.urlunparse((scheme,
netloc,
path,
params,
query,
fragment))
return url
def _find_vmdk_url(self, lease_info, host, port):
"""Find the URL corresponding to a VMDK file in lease info."""
url = None
ssl_thumbprint = None
for deviceUrl in lease_info.deviceUrl:
if deviceUrl.disk:
url = self._fix_esx_url(deviceUrl.url, host, port)
ssl_thumbprint = deviceUrl.sslThumbprint
break
if not url:
excep_msg = _("Could not retrieve VMDK URL from lease info.")
LOG.error(excep_msg)
raise exceptions.VimException(excep_msg)
LOG.debug("Found VMDK URL: %s from lease info.", url)
return url, ssl_thumbprint
def _log_progress(self, progress):
"""Log data transfer progress."""
if (progress == 100 or (progress - self._last_logged_progress >=
MIN_PROGRESS_DIFF_TO_LOG)):
LOG.debug("Data transfer progress is %d%%.", progress)
self._last_logged_progress = progress
class FileWriteHandle(FileHandle):
"""Write handle for a file in VMware server."""
@ -308,7 +261,170 @@ class FileWriteHandle(FileHandle):
return "File write handle for %s" % self._url
class VmdkWriteHandle(FileHandle):
class VmdkHandle(FileHandle):
"""VMDK handle based on HttpNfcLease."""
def __init__(self, session, lease, url, file_handle):
self._session = session
self._lease = lease
self._url = url
self._last_logged_progress = 0
self._last_progress_udpate = 0
super(VmdkHandle, self).__init__(file_handle)
def _log_progress(self, progress):
"""Log data transfer progress."""
if (progress == 100 or (progress - self._last_logged_progress >=
MIN_PROGRESS_DIFF_TO_LOG)):
LOG.debug("Data transfer progress is %d%%.", progress)
self._last_logged_progress = progress
def _get_progress(self):
"""Get current progress for updating progress to lease."""
pass
def update_progress(self):
"""Updates progress to lease.
This call back to the lease is essential to keep the lease alive
across long running write/read operations.
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
now = time.time()
if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL):
return
self._last_progress_udpate = now
progress = int(self._get_progress())
self._log_progress(progress)
try:
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseProgress',
self._lease,
percent=progress)
except exceptions.VimException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error occurred while updating the "
"write/read progress of VMDK file "
"with URL = %s."),
self._url)
def _release_lease(self):
"""Release the lease
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
LOG.debug("Getting lease state for %s.", self._url)
state = self._session.invoke_api(vim_util,
'get_object_property',
self._session.vim,
self._lease,
'state')
LOG.debug("Lease for %(url)s is in state: %(state)s.",
{'url': self._url,
'state': state})
if state == 'ready':
LOG.debug("Releasing lease for %s.", self._url)
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseComplete',
self._lease)
else:
LOG.debug("Lease for %(url)s is in state: %(state)s; no "
"need to release.",
{'url': self._url,
'state': state})
@staticmethod
def _create_import_vapp_lease(session, rp_ref, import_spec, vm_folder_ref):
"""Create and wait for HttpNfcLease lease for vApp import."""
LOG.debug("Creating HttpNfcLease lease for vApp import into resource"
" pool: %s.",
rp_ref)
lease = session.invoke_api(session.vim,
'ImportVApp',
rp_ref,
spec=import_spec,
folder=vm_folder_ref)
LOG.debug("Lease: %(lease)s obtained for vApp import into resource"
" pool %(rp_ref)s.",
{'lease': lease,
'rp_ref': rp_ref})
session.wait_for_lease_ready(lease)
LOG.debug("Invoking VIM API for reading info of lease: %s.", lease)
lease_info = session.invoke_api(vim_util,
'get_object_property',
session.vim,
lease,
'info')
return lease, lease_info
@staticmethod
def _create_export_vm_lease(session, vm_ref):
"""Create and wait for HttpNfcLease lease for VM export."""
LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.",
vm_ref)
lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.",
{'lease': lease,
'vm_ref': vm_ref})
session.wait_for_lease_ready(lease)
LOG.debug("Invoking VIM API for reading info of lease: %s.", lease)
lease_info = session.invoke_api(vim_util,
'get_object_property',
session.vim,
lease,
'info')
return lease, lease_info
@staticmethod
def _fix_esx_url(url, host, port):
"""Fix netloc in the case of an ESX host.
In the case of an ESX host, the netloc is set to '*' in the URL
returned in HttpNfcLeaseInfo. It should be replaced with host name
or IP address.
"""
urlp = urlparse.urlparse(url)
if urlp.netloc == '*':
scheme, netloc, path, params, query, fragment = urlp
if netutils.is_valid_ipv6(host):
netloc = '[%s]:%d' % (host, port)
else:
netloc = "%s:%d" % (host, port)
url = urlparse.urlunparse((scheme,
netloc,
path,
params,
query,
fragment))
return url
@staticmethod
def _find_vmdk_url(lease_info, host, port):
"""Find the URL corresponding to a VMDK file in lease info."""
url = None
ssl_thumbprint = None
for deviceUrl in lease_info.deviceUrl:
if deviceUrl.disk:
url = VmdkHandle._fix_esx_url(deviceUrl.url, host, port)
ssl_thumbprint = deviceUrl.sslThumbprint
break
if not url:
excep_msg = _("Could not retrieve VMDK URL from lease info.")
LOG.error(excep_msg)
raise exceptions.VimException(excep_msg)
LOG.debug("Found VMDK URL: %s from lease info.", url)
return url, ssl_thumbprint
class VmdkWriteHandle(VmdkHandle):
"""VMDK write handle based on HttpNfcLease.
This class creates a vApp in the specified resource pool and uploads the
@ -332,25 +448,17 @@ class VmdkWriteHandle(FileHandle):
VimSessionOverLoadException, VimConnectionException,
ValueError
"""
self._session = session
self._vmdk_size = vmdk_size
self._bytes_written = 0
# Get lease and its info for vApp import
self._lease = self._create_and_wait_for_lease(session,
rp_ref,
import_spec,
vm_folder_ref)
LOG.debug("Invoking VIM API for reading info of lease: %s.",
self._lease)
lease_info = session.invoke_api(vim_util,
'get_object_property',
session.vim,
self._lease,
'info')
lease, lease_info = self._create_import_vapp_lease(session,
rp_ref,
import_spec,
vm_folder_ref)
# Find VMDK URL where data is to be written
self._url, thumbprint = self._find_vmdk_url(lease_info, host, port)
url, thumbprint = self._find_vmdk_url(lease_info, host, port)
self._vm_ref = lease_info.entity
cookies = session.vim.client.options.transport.cookiejar
@ -364,36 +472,18 @@ class VmdkWriteHandle(FileHandle):
else:
raise ValueError('http_method must be either PUT or POST')
self._conn = self._create_write_connection(http_method,
self._url,
url,
vmdk_size,
cookies=cookies,
overwrite=overwrite,
content_type=content_type,
ssl_thumbprint=thumbprint)
FileHandle.__init__(self, self._conn)
super(VmdkWriteHandle, self).__init__(session, lease, url, self._conn)
def get_imported_vm(self):
""""Get managed object reference of the VM created for import."""
return self._vm_ref
def _create_and_wait_for_lease(self, session, rp_ref, import_spec,
vm_folder_ref):
"""Create and wait for HttpNfcLease lease for vApp import."""
LOG.debug("Creating HttpNfcLease lease for vApp import into resource"
" pool: %s.",
rp_ref)
lease = session.invoke_api(session.vim,
'ImportVApp',
rp_ref,
spec=import_spec,
folder=vm_folder_ref)
LOG.debug("Lease: %(lease)s obtained for vApp import into resource"
" pool %(rp_ref)s.",
{'lease': lease,
'rp_ref': rp_ref})
session.wait_for_lease_ready(lease)
return lease
def write(self, data):
"""Write data to the file.
@ -417,61 +507,14 @@ class VmdkWriteHandle(FileHandle):
LOG.exception(excep_msg)
raise exceptions.VimException(excep_msg, excep)
# TODO(vbala) Move this method to FileHandle.
def update_progress(self):
"""Updates progress to lease.
This call back to the lease is essential to keep the lease alive
across long running write operations.
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
now = time.time()
if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL):
return
self._last_progress_udpate = now
progress = int(float(self._bytes_written) / self._vmdk_size * 100)
self._log_progress(progress)
try:
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseProgress',
self._lease,
percent=progress)
except exceptions.VimException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error occurred while updating the "
"write progress of VMDK file with "
"URL = %s."),
self._url)
def close(self):
"""Releases the lease and close the connection.
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
:raises: VimAttributeException, VimSessionOverLoadException,
VimConnectionException
"""
LOG.debug("Getting lease state for %s.", self._url)
try:
state = self._session.invoke_api(vim_util,
'get_object_property',
self._session.vim,
self._lease,
'state')
LOG.debug("Lease for %(url)s is in state: %(state)s.",
{'url': self._url,
'state': state})
if state == 'ready':
LOG.debug("Releasing lease for %s.", self._url)
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseComplete',
self._lease)
else:
LOG.debug("Lease for %(url)s is in state: %(state)s; no "
"need to release.",
{'url': self._url,
'state': state})
self._release_lease()
except exceptions.VimException:
LOG.warning(_LW("Error occurred while releasing the lease "
"for %s."),
@ -480,11 +523,14 @@ class VmdkWriteHandle(FileHandle):
super(VmdkWriteHandle, self).close()
LOG.debug("Closed VMDK write handle for %s.", self._url)
def _get_progress(self):
return float(self._bytes_written) / self._vmdk_size * 100
def __str__(self):
return "VMDK write handle for %s" % self._url
class VmdkReadHandle(FileHandle):
class VmdkReadHandle(VmdkHandle):
"""VMDK read handle based on HttpNfcLease."""
def __init__(self, session, host, port, vm_ref, vmdk_path,
@ -505,38 +551,19 @@ class VmdkReadHandle(FileHandle):
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
self._session = session
self._vmdk_size = vmdk_size
self._bytes_read = 0
# Obtain lease for VM export
self._lease = self._create_and_wait_for_lease(session, vm_ref)
LOG.debug("Invoking VIM API for reading info of lease: %s.",
self._lease)
lease_info = session.invoke_api(vim_util,
'get_object_property',
session.vim,
self._lease,
'info')
lease, lease_info = self._create_export_vm_lease(session, vm_ref)
# find URL of the VMDK file to be read and open connection
self._url, thumbprint = self._find_vmdk_url(lease_info, host, port)
url, thumbprint = self._find_vmdk_url(lease_info, host, port)
cookies = session.vim.client.options.transport.cookiejar
self._conn = self._create_read_connection(self._url,
self._conn = self._create_read_connection(url,
cookies=cookies,
ssl_thumbprint=thumbprint)
FileHandle.__init__(self, self._conn)
def _create_and_wait_for_lease(self, session, vm_ref):
"""Create and wait for HttpNfcLease lease for VM export."""
LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.",
vm_ref)
lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.",
{'lease': lease,
'vm_ref': vm_ref})
session.wait_for_lease_ready(lease)
return lease
super(VmdkReadHandle, self).__init__(session, lease, url, self._conn)
def read(self, chunk_size):
"""Read a chunk of data from the VMDK file.
@ -558,59 +585,14 @@ class VmdkReadHandle(FileHandle):
LOG.exception(excep_msg)
raise exceptions.VimException(excep_msg, excep)
def update_progress(self):
"""Updates progress to lease.
This call back to the lease is essential to keep the lease alive
across long running read operations.
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
now = time.time()
if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL):
return
self._last_progress_udpate = now
progress = int(float(self._bytes_read) / self._vmdk_size * 100)
self._log_progress(progress)
try:
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseProgress',
self._lease,
percent=progress)
except exceptions.VimException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error occurred while updating the "
"read progress of VMDK file with URL = %s."),
self._url)
def close(self):
"""Releases the lease and close the connection.
:raises: VimException, VimFaultException, VimAttributeException,
VimSessionOverLoadException, VimConnectionException
"""
LOG.debug("Getting lease state for %s.", self._url)
try:
state = self._session.invoke_api(vim_util,
'get_object_property',
self._session.vim,
self._lease,
'state')
LOG.debug("Lease for %(url)s is in state: %(state)s.",
{'url': self._url,
'state': state})
if state == 'ready':
LOG.debug("Releasing lease for %s.", self._url)
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseComplete',
self._lease)
else:
LOG.debug("Lease for %(url)s is in state: %(state)s; no "
"need to release.",
{'url': self._url,
'state': state})
self._release_lease()
except exceptions.VimException:
LOG.warning(_LW("Error occurred while releasing the lease "
"for %s."),
@ -621,6 +603,9 @@ class VmdkReadHandle(FileHandle):
super(VmdkReadHandle, self).close()
LOG.debug("Closed VMDK read handle for %s.", self._url)
def _get_progress(self):
return float(self._bytes_read) / self._vmdk_size * 100
def __str__(self):
return "VMDK read handle for %s" % self._url

View File

@ -37,23 +37,6 @@ class FileHandleTest(base.TestCase):
vmw_http_file.close()
file_handle.close.assert_called_once_with()
def test_find_vmdk_url(self):
device_url_0 = mock.Mock()
device_url_0.disk = False
device_url_1 = mock.Mock()
device_url_1.disk = True
device_url_1.url = 'https://*/ds1/vm1.vmdk'
device_url_1.sslThumbprint = '11:22:33:44:55'
lease_info = mock.Mock()
lease_info.deviceUrl = [device_url_0, device_url_1]
host = '10.1.2.3'
port = 443
exp_url = 'https://%s:%d/ds1/vm1.vmdk' % (host, port)
vmw_http_file = rw_handles.FileHandle(None)
url, thumbprint = vmw_http_file._find_vmdk_url(lease_info, host, port)
self.assertEqual(exp_url, url)
self.assertEqual('11:22:33:44:55', thumbprint)
@mock.patch('urllib3.connection.HTTPConnection')
def test_create_connection_http(self, http_conn):
conn = mock.Mock()
@ -140,6 +123,48 @@ class FileWriteHandleTest(base.TestCase):
self._conn.close.assert_called_once_with()
class VmdkHandleTest(base.TestCase):
"""Tests for VmdkHandle."""
def test_find_vmdk_url(self):
device_url_0 = mock.Mock()
device_url_0.disk = False
device_url_1 = mock.Mock()
device_url_1.disk = True
device_url_1.url = 'https://*/ds1/vm1.vmdk'
device_url_1.sslThumbprint = '11:22:33:44:55'
lease_info = mock.Mock()
lease_info.deviceUrl = [device_url_0, device_url_1]
host = '10.1.2.3'
port = 443
exp_url = 'https://%s:%d/ds1/vm1.vmdk' % (host, port)
vmw_http_file = rw_handles.VmdkHandle(None, None, None, None)
url, thumbprint = vmw_http_file._find_vmdk_url(lease_info, host, port)
self.assertEqual(exp_url, url)
self.assertEqual('11:22:33:44:55', thumbprint)
def test_update_progress(self):
session = mock.Mock()
lease = mock.Mock()
handle = rw_handles.VmdkHandle(session, lease, 'fake-url', None)
handle._get_progress = mock.Mock(return_value=50)
handle.update_progress()
session.invoke_api.assert_called_once_with(session.vim,
'HttpNfcLeaseProgress',
lease, percent=50)
def test_update_progress_with_error(self):
session = mock.Mock()
handle = rw_handles.VmdkHandle(session, None, 'fake-url', None)
handle._get_progress = mock.Mock(return_value=0)
session.invoke_api.side_effect = exceptions.VimException(None)
self.assertRaises(exceptions.VimException, handle.update_progress)
class VmdkWriteHandleTest(base.TestCase):
"""Tests for VmdkWriteHandle."""
@ -220,14 +245,6 @@ class VmdkWriteHandleTest(base.TestCase):
handle.write([1] * data_size)
handle.update_progress()
def test_update_progress_with_error(self):
session = self._create_mock_session(True, 10)
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
'rp-1', 'folder-1', None,
100)
session.invoke_api.side_effect = exceptions.VimException(None)
self.assertRaises(exceptions.VimException, handle.update_progress)
def test_close(self):
session = self._create_mock_session()
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
@ -316,14 +333,6 @@ class VmdkReadHandleTest(base.TestCase):
handle.update_progress()
self.assertEqual('fake-data', data)
def test_update_progress_with_error(self):
session = self._create_mock_session(True, 10)
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,
'vm-1', '[ds] disk1.vmdk',
100)
session.invoke_api.side_effect = exceptions.VimException(None)
self.assertRaises(exceptions.VimException, handle.update_progress)
def test_close(self):
session = self._create_mock_session()
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,