Support ZeroMQ messaging driver in cinder
NOTE:This patch introduces support for ZeroMQ driver for cinder single backend case. Multi-backend will be addressed in the next patch as part of the blueprint. CHANGES:ZeroMQ driver requires hostname for message delivery as there is no broker inbetween. So, extract the hostname and feed to the messaging client for zeromq driver. For the record, ZeroMq is a very lightweight distributed messaging system specially designed for high throughput/low latency scenarios. Addition of support for ZeroMQ would help cinder scale out with high performance and be highly available as there is no centralised broker. DocImpact Document the configurations for ZeroMQ driver for Cinder Change-Id: Ic4b4301e5d7ca1692fc91155ba53f2dd12f99311 Closes-Bug: #1440631 partially Implements bp cinder-zeromq-support
This commit is contained in:
parent
55744deafb
commit
3320a89438
@ -789,6 +789,13 @@ class VolumeUtilsTestCase(test.TestCase):
|
|||||||
self.assertEqual(pool,
|
self.assertEqual(pool,
|
||||||
volume_utils.extract_host(host, 'pool', True))
|
volume_utils.extract_host(host, 'pool', True))
|
||||||
|
|
||||||
|
def test_get_volume_rpc_host(self):
|
||||||
|
host = 'Host@backend'
|
||||||
|
# default level is 'backend'
|
||||||
|
# check if host with backend is returned
|
||||||
|
self.assertEqual(volume_utils.extract_host(host),
|
||||||
|
volume_utils.get_volume_rpc_host(host))
|
||||||
|
|
||||||
def test_append_host(self):
|
def test_append_host(self):
|
||||||
host = 'Host'
|
host = 'Host'
|
||||||
pool = 'Pool'
|
pool = 'Pool'
|
||||||
|
@ -91,22 +91,23 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
TOPIC = CONF.volume_topic
|
TOPIC = CONF.volume_topic
|
||||||
BINARY = 'cinder-volume'
|
BINARY = 'cinder-volume'
|
||||||
|
|
||||||
|
def _get_cctxt(self, host, version):
|
||||||
|
new_host = utils.get_volume_rpc_host(host)
|
||||||
|
return self.client.prepare(server=new_host, version=version)
|
||||||
|
|
||||||
def create_consistencygroup(self, ctxt, group, host):
|
def create_consistencygroup(self, ctxt, group, host):
|
||||||
new_host = utils.extract_host(host)
|
cctxt = self._get_cctxt(host, '1.26')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.26')
|
|
||||||
cctxt.cast(ctxt, 'create_consistencygroup',
|
cctxt.cast(ctxt, 'create_consistencygroup',
|
||||||
group=group)
|
group=group)
|
||||||
|
|
||||||
def delete_consistencygroup(self, ctxt, group):
|
def delete_consistencygroup(self, ctxt, group):
|
||||||
host = utils.extract_host(group.host)
|
cctxt = self._get_cctxt(group.host, '1.26')
|
||||||
cctxt = self.client.prepare(server=host, version='1.26')
|
|
||||||
cctxt.cast(ctxt, 'delete_consistencygroup',
|
cctxt.cast(ctxt, 'delete_consistencygroup',
|
||||||
group=group)
|
group=group)
|
||||||
|
|
||||||
def update_consistencygroup(self, ctxt, group, add_volumes=None,
|
def update_consistencygroup(self, ctxt, group, add_volumes=None,
|
||||||
remove_volumes=None):
|
remove_volumes=None):
|
||||||
host = utils.extract_host(group.host)
|
cctxt = self._get_cctxt(group.host, '1.26')
|
||||||
cctxt = self.client.prepare(server=host, version='1.26')
|
|
||||||
cctxt.cast(ctxt, 'update_consistencygroup',
|
cctxt.cast(ctxt, 'update_consistencygroup',
|
||||||
group=group,
|
group=group,
|
||||||
add_volumes=add_volumes,
|
add_volumes=add_volumes,
|
||||||
@ -114,21 +115,18 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
|
|
||||||
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
|
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
|
||||||
source_cg=None):
|
source_cg=None):
|
||||||
new_host = utils.extract_host(group.host)
|
cctxt = self._get_cctxt(group.host, '1.31')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.31')
|
|
||||||
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
|
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
|
||||||
group=group,
|
group=group,
|
||||||
cgsnapshot=cgsnapshot,
|
cgsnapshot=cgsnapshot,
|
||||||
source_cg=source_cg)
|
source_cg=source_cg)
|
||||||
|
|
||||||
def create_cgsnapshot(self, ctxt, cgsnapshot):
|
def create_cgsnapshot(self, ctxt, cgsnapshot):
|
||||||
host = utils.extract_host(cgsnapshot.consistencygroup.host)
|
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
|
||||||
cctxt = self.client.prepare(server=host, version='1.31')
|
|
||||||
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
|
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
|
||||||
|
|
||||||
def delete_cgsnapshot(self, ctxt, cgsnapshot):
|
def delete_cgsnapshot(self, ctxt, cgsnapshot):
|
||||||
new_host = utils.extract_host(cgsnapshot.consistencygroup.host)
|
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.31')
|
|
||||||
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
|
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
|
||||||
|
|
||||||
def create_volume(self, ctxt, volume, host, request_spec,
|
def create_volume(self, ctxt, volume, host, request_spec,
|
||||||
@ -143,8 +141,7 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.24'
|
version = '1.24'
|
||||||
|
|
||||||
new_host = utils.extract_host(host)
|
cctxt = self._get_cctxt(host, version)
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
|
||||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||||
cctxt.cast(ctxt, 'create_volume', **msg_args)
|
cctxt.cast(ctxt, 'create_volume', **msg_args)
|
||||||
|
|
||||||
@ -156,27 +153,23 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.15'
|
version = '1.15'
|
||||||
|
|
||||||
new_host = utils.extract_host(volume.host)
|
cctxt = self._get_cctxt(volume.host, version)
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
|
||||||
cctxt.cast(ctxt, 'delete_volume', **msg_args)
|
cctxt.cast(ctxt, 'delete_volume', **msg_args)
|
||||||
|
|
||||||
def create_snapshot(self, ctxt, volume, snapshot):
|
def create_snapshot(self, ctxt, volume, snapshot):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], version='1.20')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.20')
|
|
||||||
cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
|
cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
|
||||||
snapshot=snapshot)
|
snapshot=snapshot)
|
||||||
|
|
||||||
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
|
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
|
||||||
new_host = utils.extract_host(host)
|
cctxt = self._get_cctxt(host, version='1.20')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.20')
|
|
||||||
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
|
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
|
||||||
unmanage_only=unmanage_only)
|
unmanage_only=unmanage_only)
|
||||||
|
|
||||||
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
|
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
|
||||||
mountpoint, mode):
|
mountpoint, mode):
|
||||||
|
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.11')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.11')
|
|
||||||
return cctxt.call(ctxt, 'attach_volume',
|
return cctxt.call(ctxt, 'attach_volume',
|
||||||
volume_id=volume['id'],
|
volume_id=volume['id'],
|
||||||
instance_uuid=instance_uuid,
|
instance_uuid=instance_uuid,
|
||||||
@ -185,33 +178,28 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
mode=mode)
|
mode=mode)
|
||||||
|
|
||||||
def detach_volume(self, ctxt, volume, attachment_id):
|
def detach_volume(self, ctxt, volume, attachment_id):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.20')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.20')
|
|
||||||
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
|
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
|
||||||
attachment_id=attachment_id)
|
attachment_id=attachment_id)
|
||||||
|
|
||||||
def copy_volume_to_image(self, ctxt, volume, image_meta):
|
def copy_volume_to_image(self, ctxt, volume, image_meta):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.3')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.3')
|
|
||||||
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
|
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
|
||||||
image_meta=image_meta)
|
image_meta=image_meta)
|
||||||
|
|
||||||
def initialize_connection(self, ctxt, volume, connector):
|
def initialize_connection(self, ctxt, volume, connector):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], version='1.0')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.0')
|
|
||||||
return cctxt.call(ctxt, 'initialize_connection',
|
return cctxt.call(ctxt, 'initialize_connection',
|
||||||
volume_id=volume['id'],
|
volume_id=volume['id'],
|
||||||
connector=connector)
|
connector=connector)
|
||||||
|
|
||||||
def terminate_connection(self, ctxt, volume, connector, force=False):
|
def terminate_connection(self, ctxt, volume, connector, force=False):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], version='1.0')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.0')
|
|
||||||
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
|
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
|
||||||
connector=connector, force=force)
|
connector=connector, force=force)
|
||||||
|
|
||||||
def remove_export(self, ctxt, volume):
|
def remove_export(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.30')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.30')
|
|
||||||
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
|
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
|
||||||
|
|
||||||
def publish_service_capabilities(self, ctxt):
|
def publish_service_capabilities(self, ctxt):
|
||||||
@ -219,13 +207,11 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
cctxt.cast(ctxt, 'publish_service_capabilities')
|
cctxt.cast(ctxt, 'publish_service_capabilities')
|
||||||
|
|
||||||
def accept_transfer(self, ctxt, volume, new_user, new_project):
|
def accept_transfer(self, ctxt, volume, new_user, new_project):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.9')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.9')
|
|
||||||
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
|
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
|
||||||
new_user=new_user, new_project=new_project)
|
new_user=new_user, new_project=new_project)
|
||||||
|
|
||||||
def extend_volume(self, ctxt, volume, new_size, reservations):
|
def extend_volume(self, ctxt, volume, new_size, reservations):
|
||||||
new_host = utils.extract_host(volume.host)
|
|
||||||
|
|
||||||
msg_args = {'volume_id': volume.id, 'new_size': new_size,
|
msg_args = {'volume_id': volume.id, 'new_size': new_size,
|
||||||
'reservations': reservations}
|
'reservations': reservations}
|
||||||
@ -235,11 +221,10 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.14'
|
version = '1.14'
|
||||||
|
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
cctxt = self._get_cctxt(volume.host, version)
|
||||||
cctxt.cast(ctxt, 'extend_volume', **msg_args)
|
cctxt.cast(ctxt, 'extend_volume', **msg_args)
|
||||||
|
|
||||||
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
|
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
|
||||||
new_host = utils.extract_host(volume.host)
|
|
||||||
host_p = {'host': dest_host.host,
|
host_p = {'host': dest_host.host,
|
||||||
'capabilities': dest_host.capabilities}
|
'capabilities': dest_host.capabilities}
|
||||||
|
|
||||||
@ -251,11 +236,10 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.8'
|
version = '1.8'
|
||||||
|
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
cctxt = self._get_cctxt(volume.host, version)
|
||||||
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
|
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
|
||||||
|
|
||||||
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
|
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
|
||||||
new_host = utils.extract_host(volume.host)
|
|
||||||
|
|
||||||
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
|
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
|
||||||
'error': error}
|
'error': error}
|
||||||
@ -266,7 +250,7 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.10'
|
version = '1.10'
|
||||||
|
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
cctxt = self._get_cctxt(volume.host, version)
|
||||||
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
|
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
|
||||||
|
|
||||||
def retype(self, ctxt, volume, new_type_id, dest_host,
|
def retype(self, ctxt, volume, new_type_id, dest_host,
|
||||||
@ -287,29 +271,24 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
else:
|
else:
|
||||||
version = '1.12'
|
version = '1.12'
|
||||||
|
|
||||||
new_host = utils.extract_host(volume.host)
|
cctxt = self._get_cctxt(volume.host, version)
|
||||||
cctxt = self.client.prepare(server=new_host, version=version)
|
|
||||||
cctxt.cast(ctxt, 'retype', **msg_args)
|
cctxt.cast(ctxt, 'retype', **msg_args)
|
||||||
|
|
||||||
def manage_existing(self, ctxt, volume, ref):
|
def manage_existing(self, ctxt, volume, ref):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.15')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.15')
|
|
||||||
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
|
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
|
||||||
|
|
||||||
def promote_replica(self, ctxt, volume):
|
def promote_replica(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.17')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.17')
|
|
||||||
cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
|
cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
|
||||||
|
|
||||||
def reenable_replication(self, ctxt, volume):
|
def reenable_replication(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.17')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.17')
|
|
||||||
cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
|
cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
|
||||||
|
|
||||||
def update_migrated_volume(self, ctxt, volume, new_volume,
|
def update_migrated_volume(self, ctxt, volume, new_volume,
|
||||||
original_volume_status):
|
original_volume_status):
|
||||||
host = utils.extract_host(new_volume['host'])
|
cctxt = self._get_cctxt(new_volume['host'], '1.36')
|
||||||
cctxt = self.client.prepare(server=host, version='1.36')
|
|
||||||
cctxt.call(ctxt,
|
cctxt.call(ctxt,
|
||||||
'update_migrated_volume',
|
'update_migrated_volume',
|
||||||
volume=volume,
|
volume=volume,
|
||||||
@ -317,13 +296,11 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
volume_status=original_volume_status)
|
volume_status=original_volume_status)
|
||||||
|
|
||||||
def enable_replication(self, ctxt, volume):
|
def enable_replication(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.27')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.27')
|
|
||||||
cctxt.cast(ctxt, 'enable_replication', volume=volume)
|
cctxt.cast(ctxt, 'enable_replication', volume=volume)
|
||||||
|
|
||||||
def disable_replication(self, ctxt, volume):
|
def disable_replication(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.27')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.27')
|
|
||||||
cctxt.cast(ctxt, 'disable_replication',
|
cctxt.cast(ctxt, 'disable_replication',
|
||||||
volume=volume)
|
volume=volume)
|
||||||
|
|
||||||
@ -331,24 +308,21 @@ class VolumeAPI(rpc.RPCAPI):
|
|||||||
ctxt,
|
ctxt,
|
||||||
volume,
|
volume,
|
||||||
secondary=None):
|
secondary=None):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.27')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.27')
|
|
||||||
cctxt.cast(ctxt, 'failover_replication',
|
cctxt.cast(ctxt, 'failover_replication',
|
||||||
volume=volume,
|
volume=volume,
|
||||||
secondary=secondary)
|
secondary=secondary)
|
||||||
|
|
||||||
def list_replication_targets(self, ctxt, volume):
|
def list_replication_targets(self, ctxt, volume):
|
||||||
new_host = utils.extract_host(volume['host'])
|
cctxt = self._get_cctxt(volume['host'], '1.27')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.27')
|
|
||||||
return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
|
return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
|
||||||
|
|
||||||
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
|
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
|
||||||
cctxt = self.client.prepare(server=host, version='1.28')
|
cctxt = self._get_cctxt(host, '1.28')
|
||||||
cctxt.cast(ctxt, 'manage_existing_snapshot',
|
cctxt.cast(ctxt, 'manage_existing_snapshot',
|
||||||
snapshot=snapshot,
|
snapshot=snapshot,
|
||||||
ref=ref)
|
ref=ref)
|
||||||
|
|
||||||
def get_capabilities(self, ctxt, host, discover):
|
def get_capabilities(self, ctxt, host, discover):
|
||||||
new_host = utils.extract_host(host)
|
cctxt = self._get_cctxt(host, '1.29')
|
||||||
cctxt = self.client.prepare(server=new_host, version='1.29')
|
|
||||||
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
|
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
|
||||||
|
@ -623,6 +623,14 @@ def extract_host(host, level='backend', default_pool_name=False):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_volume_rpc_host(host):
|
||||||
|
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
|
||||||
|
# ZeroMQ RPC driver requires only the hostname.
|
||||||
|
# So, return just that.
|
||||||
|
return extract_host(host, 'host')
|
||||||
|
return extract_host(host)
|
||||||
|
|
||||||
|
|
||||||
def append_host(host, pool):
|
def append_host(host, pool):
|
||||||
"""Encode pool into host info."""
|
"""Encode pool into host info."""
|
||||||
if not host or not pool:
|
if not host or not pool:
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Added support for ZeroMQ messaging driver in cinder single backend config
|
Loading…
x
Reference in New Issue
Block a user