Re-use RBDImageMetadata and RBDVolumeIOWrapper from os-brick
Change-Id: Ic359d60939ef15bbdf863f09aef66e4055914417 Depends-On: I7d2b2f6ff90b33fe614efd0ff10dbd3b933fd92e
This commit is contained in:
parent
02c262a92e
commit
bdce7f95cc
@ -49,6 +49,7 @@ import subprocess
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
from os_brick.initiator import linuxrbd
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
@ -712,11 +713,11 @@ class CephBackupDriver(driver.BackupDriver):
|
||||
LOG.debug("Copying data from volume %s.", volume_id)
|
||||
dest_rbd = self.rbd.Image(client.ioctx, backup_name)
|
||||
try:
|
||||
rbd_meta = rbd_driver.RBDImageMetadata(dest_rbd,
|
||||
backup.container,
|
||||
self._ceph_backup_user,
|
||||
self._ceph_backup_conf)
|
||||
rbd_fd = rbd_driver.RBDImageIOWrapper(rbd_meta)
|
||||
rbd_meta = linuxrbd.RBDImageMetadata(dest_rbd,
|
||||
backup.container,
|
||||
self._ceph_backup_user,
|
||||
self._ceph_backup_conf)
|
||||
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
|
||||
self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
|
||||
length)
|
||||
finally:
|
||||
@ -909,11 +910,11 @@ class CephBackupDriver(driver.BackupDriver):
|
||||
src_rbd = self.rbd.Image(client.ioctx, backup_name,
|
||||
snapshot=src_snap, read_only=True)
|
||||
try:
|
||||
rbd_meta = rbd_driver.RBDImageMetadata(src_rbd,
|
||||
backup.container,
|
||||
self._ceph_backup_user,
|
||||
self._ceph_backup_conf)
|
||||
rbd_fd = rbd_driver.RBDImageIOWrapper(rbd_meta)
|
||||
rbd_meta = linuxrbd.RBDImageMetadata(src_rbd,
|
||||
backup.container,
|
||||
self._ceph_backup_user,
|
||||
self._ceph_backup_conf)
|
||||
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
|
||||
self._transfer_data(rbd_fd, backup_name, dest_file, dest_name,
|
||||
length)
|
||||
finally:
|
||||
|
@ -20,6 +20,7 @@ import tempfile
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from os_brick.initiator import linuxrbd
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import units
|
||||
@ -35,7 +36,6 @@ from cinder.i18n import _
|
||||
from cinder import objects
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.volume.drivers import rbd as rbddriver
|
||||
|
||||
# This is used to collect raised exceptions so that tests may check what was
|
||||
# raised.
|
||||
@ -117,9 +117,9 @@ class BackupCephTestCase(test.TestCase):
|
||||
return self.counter
|
||||
|
||||
def _get_wrapped_rbd_io(self, rbd_image):
|
||||
rbd_meta = rbddriver.RBDImageMetadata(rbd_image, 'pool_foo',
|
||||
'user_foo', 'conf_foo')
|
||||
return rbddriver.RBDImageIOWrapper(rbd_meta)
|
||||
rbd_meta = linuxrbd.RBDImageMetadata(rbd_image, 'pool_foo',
|
||||
'user_foo', 'conf_foo')
|
||||
return linuxrbd.RBDVolumeIOWrapper(rbd_meta)
|
||||
|
||||
def _setup_mock_popen(self, mock_popen, retval=None, p1hook=None,
|
||||
p2hook=None):
|
||||
@ -432,11 +432,11 @@ class BackupCephTestCase(test.TestCase):
|
||||
with tempfile.NamedTemporaryFile() as test_file:
|
||||
checksum = hashlib.sha256()
|
||||
image = self.service.rbd.Image()
|
||||
meta = rbddriver.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = rbddriver.RBDImageIOWrapper(meta)
|
||||
meta = linuxrbd.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = linuxrbd.RBDVolumeIOWrapper(meta)
|
||||
self.service.backup(self.backup, rbdio)
|
||||
|
||||
self.assertEqual(['popen_init',
|
||||
@ -515,11 +515,11 @@ class BackupCephTestCase(test.TestCase):
|
||||
with tempfile.NamedTemporaryFile() as test_file:
|
||||
checksum = hashlib.sha256()
|
||||
image = self.service.rbd.Image()
|
||||
meta = rbddriver.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = rbddriver.RBDImageIOWrapper(meta)
|
||||
meta = linuxrbd.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = linuxrbd.RBDVolumeIOWrapper(meta)
|
||||
|
||||
# We expect that the second exception is
|
||||
# notified.
|
||||
@ -580,11 +580,11 @@ class BackupCephTestCase(test.TestCase):
|
||||
with tempfile.NamedTemporaryFile() as test_file:
|
||||
checksum = hashlib.sha256()
|
||||
image = self.service.rbd.Image()
|
||||
meta = rbddriver.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = rbddriver.RBDImageIOWrapper(meta)
|
||||
meta = linuxrbd.RBDImageMetadata(image,
|
||||
'pool_foo',
|
||||
'user_foo',
|
||||
'conf_foo')
|
||||
rbdio = linuxrbd.RBDVolumeIOWrapper(meta)
|
||||
|
||||
# We expect that the second exception is
|
||||
# notified.
|
||||
|
@ -26,7 +26,6 @@ from oslo_utils import units
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
import cinder.image.glance
|
||||
from cinder.image import image_utils
|
||||
from cinder import objects
|
||||
@ -1051,123 +1050,6 @@ class RBDTestCase(test.TestCase):
|
||||
3, self.mock_rados.Rados.return_value.shutdown.call_count)
|
||||
|
||||
|
||||
class RBDImageIOWrapperTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(RBDImageIOWrapperTestCase, self).setUp()
|
||||
self.meta = mock.Mock()
|
||||
self.meta.user = 'mock_user'
|
||||
self.meta.conf = 'mock_conf'
|
||||
self.meta.pool = 'mock_pool'
|
||||
self.meta.image = mock.Mock()
|
||||
self.meta.image.read = mock.Mock()
|
||||
self.meta.image.write = mock.Mock()
|
||||
self.meta.image.size = mock.Mock()
|
||||
self.mock_rbd_wrapper = driver.RBDImageIOWrapper(self.meta)
|
||||
self.data_length = 1024
|
||||
self.full_data = b'abcd' * 256
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.mock_rbd_wrapper._rbd_meta, self.meta)
|
||||
self.assertEqual(0, self.mock_rbd_wrapper._offset)
|
||||
|
||||
def test_inc_offset(self):
|
||||
self.mock_rbd_wrapper._inc_offset(10)
|
||||
self.mock_rbd_wrapper._inc_offset(10)
|
||||
self.assertEqual(20, self.mock_rbd_wrapper._offset)
|
||||
|
||||
def test_rbd_image(self):
|
||||
self.assertEqual(self.mock_rbd_wrapper.rbd_image, self.meta.image)
|
||||
|
||||
def test_rbd_user(self):
|
||||
self.assertEqual(self.mock_rbd_wrapper.rbd_user, self.meta.user)
|
||||
|
||||
def test_rbd_pool(self):
|
||||
self.assertEqual(self.mock_rbd_wrapper.rbd_conf, self.meta.conf)
|
||||
|
||||
def test_rbd_conf(self):
|
||||
self.assertEqual(self.mock_rbd_wrapper.rbd_pool, self.meta.pool)
|
||||
|
||||
def test_read(self):
|
||||
|
||||
def mock_read(offset, length):
|
||||
return self.full_data[offset:length]
|
||||
|
||||
self.meta.image.read.side_effect = mock_read
|
||||
self.meta.image.size.return_value = self.data_length
|
||||
|
||||
data = self.mock_rbd_wrapper.read()
|
||||
self.assertEqual(self.full_data, data)
|
||||
|
||||
data = self.mock_rbd_wrapper.read()
|
||||
self.assertEqual(b'', data)
|
||||
|
||||
self.mock_rbd_wrapper.seek(0)
|
||||
data = self.mock_rbd_wrapper.read()
|
||||
self.assertEqual(self.full_data, data)
|
||||
|
||||
self.mock_rbd_wrapper.seek(0)
|
||||
data = self.mock_rbd_wrapper.read(10)
|
||||
self.assertEqual(self.full_data[:10], data)
|
||||
|
||||
def test_write(self):
|
||||
self.mock_rbd_wrapper.write(self.full_data)
|
||||
self.assertEqual(1024, self.mock_rbd_wrapper._offset)
|
||||
|
||||
def test_seekable(self):
|
||||
self.assertTrue(self.mock_rbd_wrapper.seekable)
|
||||
|
||||
def test_seek(self):
|
||||
self.assertEqual(0, self.mock_rbd_wrapper._offset)
|
||||
self.mock_rbd_wrapper.seek(10)
|
||||
self.assertEqual(10, self.mock_rbd_wrapper._offset)
|
||||
self.mock_rbd_wrapper.seek(10)
|
||||
self.assertEqual(10, self.mock_rbd_wrapper._offset)
|
||||
self.mock_rbd_wrapper.seek(10, 1)
|
||||
self.assertEqual(20, self.mock_rbd_wrapper._offset)
|
||||
|
||||
self.mock_rbd_wrapper.seek(0)
|
||||
self.mock_rbd_wrapper.write(self.full_data)
|
||||
self.meta.image.size.return_value = self.data_length
|
||||
self.mock_rbd_wrapper.seek(0)
|
||||
self.assertEqual(0, self.mock_rbd_wrapper._offset)
|
||||
|
||||
self.mock_rbd_wrapper.seek(10, 2)
|
||||
self.assertEqual(self.data_length + 10, self.mock_rbd_wrapper._offset)
|
||||
self.mock_rbd_wrapper.seek(-10, 2)
|
||||
self.assertEqual(self.data_length - 10, self.mock_rbd_wrapper._offset)
|
||||
|
||||
# test exceptions.
|
||||
self.assertRaises(IOError, self.mock_rbd_wrapper.seek, 0, 3)
|
||||
self.assertRaises(IOError, self.mock_rbd_wrapper.seek, -1)
|
||||
# offset should not have been changed by any of the previous
|
||||
# operations.
|
||||
self.assertEqual(self.data_length - 10, self.mock_rbd_wrapper._offset)
|
||||
|
||||
def test_tell(self):
|
||||
self.assertEqual(0, self.mock_rbd_wrapper.tell())
|
||||
self.mock_rbd_wrapper._inc_offset(10)
|
||||
self.assertEqual(10, self.mock_rbd_wrapper.tell())
|
||||
|
||||
def test_flush(self):
|
||||
with mock.patch.object(driver, 'LOG') as mock_logger:
|
||||
self.meta.image.flush = mock.Mock()
|
||||
self.mock_rbd_wrapper.flush()
|
||||
self.meta.image.flush.assert_called_once_with()
|
||||
self.meta.image.flush.reset_mock()
|
||||
# this should be caught and logged silently.
|
||||
self.meta.image.flush.side_effect = AttributeError
|
||||
self.mock_rbd_wrapper.flush()
|
||||
self.meta.image.flush.assert_called_once_with()
|
||||
msg = _("flush() not supported in this version of librbd")
|
||||
mock_logger.warning.assert_called_with(msg)
|
||||
|
||||
def test_fileno(self):
|
||||
self.assertRaises(IOError, self.mock_rbd_wrapper.fileno)
|
||||
|
||||
def test_close(self):
|
||||
self.mock_rbd_wrapper.close()
|
||||
|
||||
|
||||
class ManagedRBDTestCase(test_volume.DriverTestCase):
|
||||
driver_name = "cinder.volume.drivers.rbd.RBDDriver"
|
||||
|
||||
|
@ -14,13 +14,13 @@
|
||||
"""RADOS Block Device Driver"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
import io
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from eventlet import tpool
|
||||
from os_brick.initiator import linuxrbd
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import fileutils
|
||||
@ -94,114 +94,6 @@ CONF = cfg.CONF
|
||||
CONF.register_opts(RBD_OPTS)
|
||||
|
||||
|
||||
class RBDImageMetadata(object):
|
||||
"""RBD image metadata to be used with RBDImageIOWrapper."""
|
||||
def __init__(self, image, pool, user, conf):
|
||||
self.image = image
|
||||
self.pool = utils.convert_str(pool)
|
||||
self.user = utils.convert_str(user)
|
||||
self.conf = utils.convert_str(conf)
|
||||
|
||||
|
||||
class RBDImageIOWrapper(io.RawIOBase):
|
||||
"""Enables LibRBD.Image objects to be treated as Python IO objects.
|
||||
|
||||
Calling unimplemented interfaces will raise IOError.
|
||||
"""
|
||||
|
||||
def __init__(self, rbd_meta):
|
||||
super(RBDImageIOWrapper, self).__init__()
|
||||
self._rbd_meta = rbd_meta
|
||||
self._offset = 0
|
||||
|
||||
def _inc_offset(self, length):
|
||||
self._offset += length
|
||||
|
||||
@property
|
||||
def rbd_image(self):
|
||||
return self._rbd_meta.image
|
||||
|
||||
@property
|
||||
def rbd_user(self):
|
||||
return self._rbd_meta.user
|
||||
|
||||
@property
|
||||
def rbd_pool(self):
|
||||
return self._rbd_meta.pool
|
||||
|
||||
@property
|
||||
def rbd_conf(self):
|
||||
return self._rbd_meta.conf
|
||||
|
||||
def read(self, length=None):
|
||||
offset = self._offset
|
||||
total = self._rbd_meta.image.size()
|
||||
|
||||
# NOTE(dosaboy): posix files do not barf if you read beyond their
|
||||
# length (they just return nothing) but rbd images do so we need to
|
||||
# return empty string if we have reached the end of the image.
|
||||
if (offset >= total):
|
||||
return b''
|
||||
|
||||
if length is None:
|
||||
length = total
|
||||
|
||||
if (offset + length) > total:
|
||||
length = total - offset
|
||||
|
||||
self._inc_offset(length)
|
||||
return self._rbd_meta.image.read(int(offset), int(length))
|
||||
|
||||
def write(self, data):
|
||||
self._rbd_meta.image.write(data, self._offset)
|
||||
self._inc_offset(len(data))
|
||||
|
||||
def seekable(self):
|
||||
return True
|
||||
|
||||
def seek(self, offset, whence=0):
|
||||
if whence == 0:
|
||||
new_offset = offset
|
||||
elif whence == 1:
|
||||
new_offset = self._offset + offset
|
||||
elif whence == 2:
|
||||
new_offset = self._rbd_meta.image.size()
|
||||
new_offset += offset
|
||||
else:
|
||||
raise IOError(_("Invalid argument - whence=%s not supported") %
|
||||
(whence))
|
||||
|
||||
if (new_offset < 0):
|
||||
raise IOError(_("Invalid argument"))
|
||||
|
||||
self._offset = new_offset
|
||||
|
||||
def tell(self):
|
||||
return self._offset
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
self._rbd_meta.image.flush()
|
||||
except AttributeError:
|
||||
LOG.warning(_LW("flush() not supported in "
|
||||
"this version of librbd"))
|
||||
|
||||
def fileno(self):
|
||||
"""RBD does not have support for fileno() so we raise IOError.
|
||||
|
||||
Raising IOError is recommended way to notify caller that interface is
|
||||
not supported - see http://docs.python.org/2/library/io.html#io.IOBase
|
||||
"""
|
||||
raise IOError(_("fileno() not supported by RBD()"))
|
||||
|
||||
# NOTE(dosaboy): if IO object is not closed explicitly, Python auto closes
|
||||
# it which, if this is not overridden, calls flush() prior to close which
|
||||
# in this case is unwanted since the rbd image may have been closed prior
|
||||
# to the autoclean - currently triggering a segfault in librbd.
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class RBDVolumeProxy(object):
|
||||
"""Context manager for dealing with an existing rbd volume.
|
||||
|
||||
@ -977,10 +869,11 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
|
||||
with RBDVolumeProxy(self, volume.name,
|
||||
self.configuration.rbd_pool) as rbd_image:
|
||||
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
|
||||
self.configuration.rbd_user,
|
||||
self.configuration.rbd_ceph_conf)
|
||||
rbd_fd = RBDImageIOWrapper(rbd_meta)
|
||||
rbd_meta = linuxrbd.RBDImageMetadata(
|
||||
rbd_image, self.configuration.rbd_pool,
|
||||
self.configuration.rbd_user,
|
||||
self.configuration.rbd_ceph_conf)
|
||||
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
|
||||
backup_service.backup(backup, rbd_fd)
|
||||
|
||||
LOG.debug("volume backup complete.")
|
||||
@ -989,10 +882,11 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
"""Restore an existing backup to a new or existing volume."""
|
||||
with RBDVolumeProxy(self, volume.name,
|
||||
self.configuration.rbd_pool) as rbd_image:
|
||||
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
|
||||
self.configuration.rbd_user,
|
||||
self.configuration.rbd_ceph_conf)
|
||||
rbd_fd = RBDImageIOWrapper(rbd_meta)
|
||||
rbd_meta = linuxrbd.RBDImageMetadata(
|
||||
rbd_image, self.configuration.rbd_pool,
|
||||
self.configuration.rbd_user,
|
||||
self.configuration.rbd_ceph_conf)
|
||||
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
|
||||
backup_service.restore(backup, volume.id, rbd_fd)
|
||||
|
||||
LOG.debug("volume restore complete.")
|
||||
|
Loading…
x
Reference in New Issue
Block a user