Add group policy configuration

Allow configuration of a zone group default sync policy. This is useful
in scenarios where we want to have selective buckets sync. Valuable
especially with the new `cloud-sync` relation.

This is based on Ceph multisite sync policy:
https://docs.ceph.com/en/latest/radosgw/multisite-sync-policy/

Additionally, three more Juju actions are added to selectively enable,
disable, or reset buckets sync:
* `enable-buckets-sync`
* `disable-buckets-sync`
* `reset-buckets-sync`

These new actions are meant to be used in conjunction with a default
zone group sync policy that allows syncing, but it's disabled by default.

Change-Id: I4a8076192269aaeaca50668ebcebc0a52c6d2c84
func-test-pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/1193
Signed-off-by: Ionut Balutoiu <ibalutoiu@cloudbasesolutions.com>
This commit is contained in:
Ionut Balutoiu 2023-10-19 13:16:04 +03:00
parent 8ca8dc7cd8
commit 4e608c1485
19 changed files with 1404 additions and 3 deletions

View File

@ -243,6 +243,9 @@ not deployed then see file `actions.yaml`.
* `readwrite` * `readwrite`
* `resume` * `resume`
* `tidydefaults` * `tidydefaults`
* `enable-buckets-sync`
* `disable-buckets-sync`
* `reset-buckets-sync`
# Documentation # Documentation

View File

@ -19,3 +19,29 @@ force-enable-multisite:
zonegroup: zonegroup:
type: string type: string
description: Existing Zonegroup to be reconfigured as the 'zonegroup' config value. description: Existing Zonegroup to be reconfigured as the 'zonegroup' config value.
enable-buckets-sync:
description: |
Enable buckets sync in the multi-site replication. This is meant to be
used only when the default zonegroup sync policy is not "enabled", but it is
"allowed".
params:
buckets:
type: string
description: Comma-separated list of buckets' names to enable syncing.
disable-buckets-sync:
description: |
Forbid buckets sync in the multi-site replication. This is useful when you
want to disable syncing for some buckets, but you want to sync all the
other buckets.
params:
buckets:
type: string
description: Comma-separated list of buckets' names to disable syncing.
reset-buckets-sync:
description: |
Reset buckets sync policy. After this is executed, the buckets will be
synced according to the default zone group sync policy.
params:
buckets:
type: string
description: Comma-separated list of buckets' names to reset sync policy.

View File

@ -49,6 +49,8 @@ from charmhelpers.core.host import (
service_restart, service_restart,
) )
DEFAULT_SYNC_POLICY_ID = 'default'
def pause(args): def pause(args):
"""Pause the Ceilometer services. """Pause the Ceilometer services.
@ -227,6 +229,179 @@ def force_enable_multisite(args):
action_fail(message + " : {}".format(cpe.output)) action_fail(message + " : {}".format(cpe.output))
def is_multisite_sync_policy_action_allowed():
"""Check if the current Juju unit is allowed to run sync policy actions.
This method checks if the current Juju unit is allowed to execute
the Juju actions to configure Multisite sync policies:
* enable-buckets-sync
* disable-buckets-sync
* reset-buckets-sync
These Juju actions are allowed to run only on the leader unit of the
primary RGW zone.
:return: Whether the current Juju unit is allowed to run the Multisite
sync policy Juju actions.
:rtype: Boolean
"""
if not is_leader():
action_fail("This action can only be executed on leader unit.")
return False
realm = config('realm')
zone = config('zone')
zonegroup = config('zonegroup')
if not all((realm, zonegroup, zone)):
action_fail("Missing required charm configurations realm({}), "
"zonegroup({}) and zone({}).".format(
realm, zonegroup, zone
))
return False
if not multisite.is_multisite_configured(zone=zone, zonegroup=zonegroup):
action_fail("Multisite is not configured")
return False
zonegroup_info = multisite.get_zonegroup_info(zonegroup)
if zonegroup_info is None:
action_fail("Failed to fetch zonegroup ({}) info".format(zonegroup))
return False
zone_info = multisite.get_zone_info(zone)
if zone_info is None:
action_fail("Failed to fetch zone ({}) info".format(zone))
return False
if zonegroup_info['master_zone'] != zone_info['id']:
action_fail('This action can only be executed on primary RGW '
'application units.')
return False
return True
def update_buckets_sync_policy(buckets, sync_policy_state):
"""Update the sync policy state for all the given buckets.
This method gets a list of bucket names and a sync policy state to set
for all of them. The sync policy state can be one of the following:
"allowed", "enabled", or "forbidden". Validation for the sync policy
state is done in the "multisite.create_sync_group" module method.
The sync policy state is set by creating a bucket-level sync group with
the given state, followed by a sync group pipe that match all the source
and destination buckets. If the bucket already has a sync group, it is
updated with the new state.
:param buckets: List of bucket names.
:type buckets: list
:param sync_policy_state: The sync policy state to set for the buckets.
:type sync_policy_state: str
"""
zone = config('zone')
zonegroup = config('zonegroup')
existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone)
messages = []
for bucket in buckets:
if bucket in existing_buckets:
multisite.create_sync_group(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID,
status=sync_policy_state)
multisite.create_sync_group_pipe(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID,
pipe_id=DEFAULT_SYNC_POLICY_ID,
source_zones=['*'],
dest_zones=['*'])
message = 'Updated "{}" bucket sync policy to "{}"'.format(
bucket, sync_policy_state)
else:
message = ('Bucket "{}" does not exist in the zonegroup "{}" and '
'zone "{}"'.format(bucket, zonegroup, zone))
log(message)
messages.append(message)
action_set(
values={
'message': '\n'.join(messages)
}
)
def reset_buckets_sync_policy(buckets):
"""Reset the sync policy state for all the given buckets.
For every bucket in the given list, this method resets the sync policy
state. This is done by removing the bucket-level sync group.
:param buckets: List of bucket names.
:type buckets: list
"""
zone = config('zone')
zonegroup = config('zonegroup')
existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone)
messages = []
for bucket in buckets:
if bucket in existing_buckets:
multisite.remove_sync_group(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID)
message = 'Reset "{}" bucket sync policy'.format(bucket)
else:
message = ('Bucket "{}" does not exist in the zonegroup "{}" and '
'zone "{}"'.format(bucket, zonegroup, zone))
log(message)
messages.append(message)
action_set(
values={
'message': '\n'.join(messages)
}
)
def enable_buckets_sync(args):
"""Enable sync for the given buckets"""
if not is_multisite_sync_policy_action_allowed():
return
try:
update_buckets_sync_policy(
buckets=action_get('buckets').split(','),
sync_policy_state=multisite.SYNC_POLICY_ENABLED,
)
except subprocess.CalledProcessError as cpe:
message = "Failed to enable sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
def disable_buckets_sync(args):
"""Disable sync for the given buckets"""
if not is_multisite_sync_policy_action_allowed():
return
try:
update_buckets_sync_policy(
buckets=action_get('buckets').split(','),
sync_policy_state=multisite.SYNC_POLICY_FORBIDDEN,
)
except subprocess.CalledProcessError as cpe:
message = "Failed to disable sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
def reset_buckets_sync(args):
"""Reset sync policy for the given buckets"""
if not is_multisite_sync_policy_action_allowed():
return
try:
reset_buckets_sync_policy(buckets=action_get('buckets').split(','))
except subprocess.CalledProcessError as cpe:
message = "Failed to reset sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
# A dictionary of all the defined actions to callables (which take # A dictionary of all the defined actions to callables (which take
# parsed arguments). # parsed arguments).
ACTIONS = { ACTIONS = {
@ -237,6 +412,9 @@ ACTIONS = {
"readwrite": readwrite, "readwrite": readwrite,
"tidydefaults": tidydefaults, "tidydefaults": tidydefaults,
"force-enable-multisite": force_enable_multisite, "force-enable-multisite": force_enable_multisite,
"enable-buckets-sync": enable_buckets_sync,
"disable-buckets-sync": disable_buckets_sync,
"reset-buckets-sync": reset_buckets_sync,
} }

View File

@ -0,0 +1 @@
actions.py

1
actions/enable-buckets-sync Symbolic link
View File

@ -0,0 +1 @@
actions.py

1
actions/reset-buckets-sync Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -429,6 +429,34 @@ options:
description: | description: |
Name of RADOS Gateway Zone to create for multi-site replication. This Name of RADOS Gateway Zone to create for multi-site replication. This
option must be specific to the local site e.g. us-west or us-east. option must be specific to the local site e.g. us-west or us-east.
sync-policy-state:
type: string
default: enabled
description: |
This setting is used by the primary ceph-radosgw in multi-site
replication.
By default, all the buckets are synced from a primary RGW zone to the
secondary zone. This config option allows us to have selective buckets
sync. If this is set, it will be used as the default policy state for
all the buckets in the zonegroup.
Valid values are:
* enabled - sync is allowed and enabled
* allowed - sync is allowed
* forbidden - sync is not allowed
sync-policy-flow-type:
type: string
default: symmetrical
description: |
This setting is used by the secondary ceph-radosgw in multi-site
replication, and it's effective only when 'sync-policy-state' config is
set on the primary ceph-radosgw.
Valid values are:
* directional - data is only synced in one direction, from primary to
secondary.
* symmetrical - data is synced in both directions.
namespace-tenants: namespace-tenants:
type: boolean type: boolean
default: False default: False

View File

@ -43,6 +43,7 @@ from charmhelpers.core.hookenv import (
relation_set, relation_set,
log, log,
DEBUG, DEBUG,
WARNING,
Hooks, UnregisteredHookError, Hooks, UnregisteredHookError,
status_set, status_set,
is_leader, is_leader,
@ -134,6 +135,7 @@ APACHE_PACKAGES = [
] ]
MULTISITE_SYSTEM_USER = 'multisite-sync' MULTISITE_SYSTEM_USER = 'multisite-sync'
MULTISITE_DEFAULT_SYNC_GROUP_ID = 'default'
def upgrade_available(): def upgrade_available():
@ -845,6 +847,86 @@ def primary_relation_joined(relation_id=None):
secret=secret) secret=secret)
@hooks.hook('primary-relation-changed')
def primary_relation_changed(relation_id=None, unit=None):
if not is_leader():
log('Cannot setup multisite configuration, this unit is not the '
'leader')
return
if not ready_for_service(legacy=False):
log('unit not ready, deferring multisite configuration')
return
sync_policy_state = config('sync-policy-state')
if not sync_policy_state:
log("The config sync-policy-state is not set. Skipping zone group "
"default sync policy configuration")
return
secondary_data = relation_get(rid=relation_id, unit=unit)
if not all((secondary_data.get('zone'),
secondary_data.get('sync_policy_flow_type'))):
log("Defer processing until secondary RGW has provided required data")
return
zonegroup = config('zonegroup')
primary_zone = config('zone')
secondary_zone = secondary_data['zone']
sync_flow_type = secondary_data['sync_policy_flow_type']
if (secondary_data.get('zone_tier_type') == 'cloud' and
sync_flow_type != multisite.SYNC_FLOW_DIRECTIONAL):
log("The secondary zone is set with cloud tier type. Ignoring "
"configured {} sync policy flow, and using {}.".format(
sync_flow_type,
multisite.SYNC_FLOW_DIRECTIONAL),
level=WARNING)
sync_flow_type = multisite.SYNC_FLOW_DIRECTIONAL
flow_id = '{}-{}'.format(primary_zone, secondary_zone)
pipe_id = '{}-{}'.format(primary_zone, secondary_zone)
mutation = multisite.is_sync_group_update_needed(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id=flow_id,
pipe_id=pipe_id,
source_zone=primary_zone,
dest_zone=secondary_zone,
desired_status=sync_policy_state,
desired_flow_type=sync_flow_type,
)
if mutation:
multisite.create_sync_group(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
status=sync_policy_state)
multisite.create_sync_group_flow(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id=flow_id,
flow_type=sync_flow_type,
source_zone=primary_zone,
dest_zone=secondary_zone)
source_zones = [primary_zone, secondary_zone]
dest_zones = [primary_zone, secondary_zone]
if sync_flow_type == multisite.SYNC_FLOW_DIRECTIONAL:
source_zones = [primary_zone]
dest_zones = [secondary_zone]
multisite.create_sync_group_pipe(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
pipe_id=pipe_id,
source_zones=source_zones,
dest_zones=dest_zones)
log(
'Mutation detected. Restarting {}.'.format(service_name()),
'INFO')
multisite.update_period(zonegroup=zonegroup, zone=primary_zone)
CONFIGS.write_all()
service_restart(service_name())
leader_set(restart_nonce=str(uuid.uuid4()))
else:
log('No mutation detected.', 'INFO')
@hooks.hook('primary-relation-departed') @hooks.hook('primary-relation-departed')
@hooks.hook('secondary-relation-departed') @hooks.hook('secondary-relation-departed')
def multisite_relation_departed(): def multisite_relation_departed():
@ -935,6 +1017,9 @@ def secondary_relation_changed(relation_id=None, unit=None):
# this operation but a period update will force it to be created. # this operation but a period update will force it to be created.
multisite.update_period(fatal=False) multisite.update_period(fatal=False)
relation_set(relation_id=relation_id,
sync_policy_flow_type=config('sync-policy-flow-type'))
mutation = False mutation = False
# NOTE(utkarshbhatthere): # NOTE(utkarshbhatthere):
@ -979,6 +1064,8 @@ def secondary_relation_changed(relation_id=None, unit=None):
else: else:
log('No mutation detected.', 'INFO') log('No mutation detected.', 'INFO')
relation_set(relation_id=relation_id, zone=zone)
@hooks.hook('master-relation-departed') @hooks.hook('master-relation-departed')
@hooks.hook('slave-relation-departed') @hooks.hook('slave-relation-departed')
@ -1016,6 +1103,8 @@ def leader_settings_changed():
# Primary/Secondary relation # Primary/Secondary relation
for r_id in relation_ids('primary'): for r_id in relation_ids('primary'):
primary_relation_joined(r_id) primary_relation_joined(r_id)
for unit in related_units(r_id):
primary_relation_changed(r_id, unit)
for r_id in relation_ids('radosgw-user'): for r_id in relation_ids('radosgw-user'):
radosgw_user_changed(r_id) radosgw_user_changed(r_id)
@ -1031,6 +1120,8 @@ def process_multisite_relations():
# Primary/Secondary relation # Primary/Secondary relation
for r_id in relation_ids('primary'): for r_id in relation_ids('primary'):
primary_relation_joined(r_id) primary_relation_joined(r_id)
for unit in related_units(r_id):
primary_relation_changed(r_id, unit)
for r_id in relation_ids('secondary'): for r_id in relation_ids('secondary'):
for unit in related_units(r_id): for unit in related_units(r_id):
secondary_relation_changed(r_id, unit) secondary_relation_changed(r_id, unit)

View File

@ -24,6 +24,31 @@ import charmhelpers.core.decorators as decorators
RGW_ADMIN = 'radosgw-admin' RGW_ADMIN = 'radosgw-admin'
SYNC_POLICY_ENABLED = 'enabled'
SYNC_POLICY_ALLOWED = 'allowed'
SYNC_POLICY_FORBIDDEN = 'forbidden'
SYNC_POLICY_STATES = [
SYNC_POLICY_ENABLED,
SYNC_POLICY_ALLOWED,
SYNC_POLICY_FORBIDDEN
]
SYNC_FLOW_DIRECTIONAL = 'directional'
SYNC_FLOW_SYMMETRICAL = 'symmetrical'
SYNC_FLOW_TYPES = [
SYNC_FLOW_DIRECTIONAL,
SYNC_FLOW_SYMMETRICAL,
]
class UnknownSyncPolicyState(Exception):
"""Raised when an unknown sync policy state is encountered"""
pass
class UnknownSyncFlowType(Exception):
"""Raised when an unknown sync flow type is encountered"""
pass
@decorators.retry_on_exception(num_retries=10, base_delay=5, @decorators.retry_on_exception(num_retries=10, base_delay=5,
exc_type=subprocess.CalledProcessError) exc_type=subprocess.CalledProcessError)
@ -370,6 +395,28 @@ def modify_zone(name, endpoints=None, default=False, master=False,
return None return None
def get_zone_info(name, zonegroup=None):
"""Fetch detailed info for the provided zone
:param name: zone name
:type name: str
:param zonegroup: parent zonegroup name
:type zonegroup: str
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'zone', 'get',
'--rgw-zone={}'.format(name),
]
if zonegroup:
cmd.append('--rgw-zonegroup={}'.format(zonegroup))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_zone_from_zonegroup(zone, zonegroup): def remove_zone_from_zonegroup(zone, zonegroup):
"""Remove RADOS Gateway zone from provided parent zonegroup """Remove RADOS Gateway zone from provided parent zonegroup
@ -888,3 +935,357 @@ def check_cluster_has_buckets():
if check_zonegroup_has_buckets(zonegroup): if check_zonegroup_has_buckets(zonegroup):
return True return True
return False return False
def list_sync_groups(bucket=None):
"""List sync policy groups.
:param bucket: Bucket name. If this this given, the bucket level group
policies are listed.
:type bucket: str
:return: List of sync policy groups.
:rtype: list
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'get',
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return []
def sync_group_exists(group_id, bucket=None):
"""Check if the sync policy group exists.
:param group_id: Sync policy group id.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is checked.
:type bucket: str
:rtype: Boolean
"""
for group in list_sync_groups(bucket=bucket):
if group['key'] == group_id:
return True
return False
def get_sync_group(group_id, bucket=None):
"""Get the sync policy group configuration.
:param group_id: Sync policy group id.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is returned.
:type bucket: str
:return: Sync policy group configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'get',
'--group-id={}'.format(group_id),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def create_sync_group(group_id, status, bucket=None):
"""Create a sync policy group.
:param group_id: ID of the sync policy group to be created.
:type group_id: str
:param status: Status of the sync policy group to be created. Must be one
of the following: 'enabled', 'allowed', 'forbidden'.
:type status: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is created.
:type bucket: str
:raises UnknownSyncPolicyState: if the provided status is not one of the
allowed values.
:return: Sync policy group configuration.
:rtype: dict
"""
if status not in SYNC_POLICY_STATES:
raise UnknownSyncPolicyState(
'Unknown sync policy state: {}'.format(status))
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'create',
'--group-id={}'.format(group_id),
'--status={}'.format(status),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_sync_group(group_id, bucket=None):
"""Remove a sync group with the given group ID and optional bucket.
:param group_id: The ID of the sync group to remove.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is removed.
:type bucket: str
:return: The output of the command as a dict.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'remove',
'--group-id={}'.format(group_id),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def is_sync_group_update_needed(group_id, flow_id, pipe_id, source_zone,
dest_zone, desired_status, desired_flow_type):
"""Check if the sync group (with the given ID) needs updating.
:param group_id: The ID of the sync group to check.
:type group_id: str
:param flow_id: The ID of the sync group flow to check.
:type flow_id: str
:param pipe_id: The ID of the sync group pipe to check.
:type pipe_id: str
:param source_zone: Source zone of the sync group flow to check.
:type source_zone: str
:param dest_zone: Dest zone of the sync group flow to check.
:type dest_zone: str
:param desired_status: Desired status of the sync group.
:type desired_status: str
:param desired_flow_type: Desired flow type of the sync group data flow.
:type desired_flow_type: str
:rtype: Boolean
"""
# Check if sync group exists.
if not sync_group_exists(group_id):
hookenv.log('Sync group "{}" not configured yet'.format(group_id))
return True
group = get_sync_group(group_id)
# Check sync group status.
if group.get('status') != desired_status:
hookenv.log('Sync group "{}" status changed to "{}"'.format(
group["id"], desired_status))
return True
# Check if data flow needs to be created or updated.
if is_sync_group_flow_update_needed(group=group,
flow_id=flow_id,
source_zone=source_zone,
dest_zone=dest_zone,
desired_flow_type=desired_flow_type):
return True
# Check if data pipe needs to be created.
pipes = group.get('pipes', [])
pipes_ids = [pipe['id'] for pipe in pipes]
if pipe_id not in pipes_ids:
hookenv.log('Sync group pipe "{}" not created yet'.format(pipe_id))
return True
# Sync group configuration is up-to-date.
return False
def create_sync_group_flow(group_id, flow_id, flow_type, source_zone,
dest_zone):
"""Create a new sync group data flow with the given parameters.
:param group_id: The ID of the sync group to create the data flow for.
:type group_id: str
:param flow_id: The ID of the new data flow.
:type flow_id: str
:param flow_type: The type of the new data flow.
:type flow_type: str
:param source_zone: The source zone for the new data flow.
:type source_zone: str
:param dest_zone: The destination zone for the new data flow.
:type dest_zone: str
:raises UnknownSyncFlowType: If an unknown sync flow type is provided.
:return: Sync group data flow configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'flow', 'create',
'--group-id={}'.format(group_id),
'--flow-id={}'.format(flow_id),
'--flow-type={}'.format(flow_type),
]
if flow_type == SYNC_FLOW_SYMMETRICAL:
cmd.append('--zones={},{}'.format(source_zone, dest_zone))
elif flow_type == SYNC_FLOW_DIRECTIONAL:
cmd.append('--source-zone={}'.format(source_zone))
cmd.append('--dest-zone={}'.format(dest_zone))
else:
raise UnknownSyncFlowType(
'Unknown sync flow type {}'.format(flow_type))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_sync_group_flow(group_id, flow_id, flow_type, source_zone=None,
dest_zone=None):
"""Remove a sync group data flow.
:param group_id: The ID of the sync group.
:type group_id: str
:param flow_id: The ID of the flow to remove.
:type flow_id: str
:param flow_type: The type of the flow to remove.
:type flow_type: str
:param source_zone: The source zone of the flow to remove (only for
directional flows).
:type source_zone: str
:param dest_zone: The destination zone of the flow to remove (only for
directional flows).
:type dest_zone: str
:return: The output of the command as a dict.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'flow', 'remove',
'--group-id={}'.format(group_id),
'--flow-id={}'.format(flow_id),
'--flow-type={}'.format(flow_type),
]
if flow_type == SYNC_FLOW_DIRECTIONAL:
cmd.append('--source-zone={}'.format(source_zone))
cmd.append('--dest-zone={}'.format(dest_zone))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def create_sync_group_pipe(group_id, pipe_id, source_zones, dest_zones,
source_bucket='*', dest_bucket='*', bucket=None):
"""Create a sync group pipe between source and destination zones.
:param group_id: The ID of the sync group.
:type group_id: str
:param pipe_id: The ID of the sync group pipe.
:type pipe_id: str
:param source_zones: A list of source zones.
:type source_zones: list
:param dest_zones: A list of destination zones.
:type dest_zones: list
:param source_bucket: The source bucket name. Default is '*'.
:type source_bucket: str
:param dest_bucket: The destination bucket name. Default is '*'.
:type dest_bucket: str
:param bucket: The bucket name. If specified, the sync group pipe will be
created for this bucket only.
:type bucket: str
:return: Sync group pipe configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'pipe', 'create',
'--group-id={}'.format(group_id),
'--pipe-id={}'.format(pipe_id),
'--source-zones={}'.format(','.join(source_zones)),
'--source-bucket={}'.format(source_bucket),
'--dest-zones={}'.format(','.join(dest_zones)),
'--dest-bucket={}'.format(dest_bucket),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def is_sync_group_flow_update_needed(group, flow_id, source_zone, dest_zone,
desired_flow_type):
"""Check if the given sync group flow needs updating.
:param group: The sync policy group configuration.
:type group: dict
:param flow_id: The ID of the sync group flow to check.
:type flow_id: str
:param source_zone: Source zone of the sync group flow to check.
:type source_zone: str
:param dest_zone: Dest zone of the sync group flow to check.
:type dest_zone: str
:param desired_flow_type: Desired flow type of the sync group data flow.
:type desired_flow_type: str
:rtype: Boolean
"""
symmetrical_flows = group['data_flow'].get('symmetrical', [])
symmetrical_flows_ids = [flow['id'] for flow in symmetrical_flows]
directional_flows = group['data_flow'].get('directional', [])
directional_flows_ids = [
# NOTE: Directional flows IDs are not present in the sync group
# configuration. We assume that the ID is a concatenation of the source
# zone and destination zone, as currently configured by the charm code.
# This is a safe assumption, because there are unique directional
# flows for each pair of zones.
"{}-{}".format(flow['source_zone'], flow['dest_zone'])
for flow in directional_flows
]
data_flows_ids = symmetrical_flows_ids + directional_flows_ids
if flow_id not in data_flows_ids:
hookenv.log('Data flow "{}" not configured yet'.format(flow_id))
return True
# Check if the flow type is consistent with the current configuration.
is_symmetrical = (desired_flow_type == SYNC_FLOW_SYMMETRICAL and
flow_id in symmetrical_flows_ids)
is_directional = (desired_flow_type == SYNC_FLOW_DIRECTIONAL and
flow_id in directional_flows_ids)
if is_symmetrical or is_directional:
# Data flow is consistent with the current configuration.
return False
# Data flow type has changed. We need to remove the old data flow.
hookenv.log('Data flow "{}" type changed to "{}"'.format(
flow_id, desired_flow_type))
old_flow_type = (
SYNC_FLOW_SYMMETRICAL if desired_flow_type == SYNC_FLOW_DIRECTIONAL
else SYNC_FLOW_DIRECTIONAL)
hookenv.log(
'Removing old data flow "{}" before configuring the new one'.format(
flow_id))
remove_sync_group_flow(
group_id=group["id"], flow_id=flow_id, flow_type=old_flow_type,
source_zone=source_zone, dest_zone=dest_zone)
return True

View File

@ -0,0 +1 @@
hooks.py

View File

@ -82,6 +82,7 @@ class MultisiteActionsTestCase(CharmTestCase):
TO_PATCH = [ TO_PATCH = [
'action_fail', 'action_fail',
'action_get',
'action_set', 'action_set',
'multisite', 'multisite',
'config', 'config',
@ -89,6 +90,7 @@ class MultisiteActionsTestCase(CharmTestCase):
'leader_set', 'leader_set',
'service_name', 'service_name',
'service_restart', 'service_restart',
'log',
] ]
def setUp(self): def setUp(self):
@ -154,3 +156,176 @@ class MultisiteActionsTestCase(CharmTestCase):
self.test_config.set('zone', None) self.test_config.set('zone', None)
actions.tidydefaults([]) actions.tidydefaults([])
self.action_fail.assert_called_once() self.action_fail.assert_called_once()
def test_enable_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,testbucket2,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1',
'testbucket2']
actions.enable_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.assertEqual(self.multisite.create_sync_group.call_count, 2)
self.multisite.create_sync_group.assert_has_calls([
mock.call(bucket='testbucket1',
group_id='default',
status=self.multisite.SYNC_POLICY_ENABLED),
mock.call(bucket='testbucket2',
group_id='default',
status=self.multisite.SYNC_POLICY_ENABLED),
])
self.assertEqual(self.multisite.create_sync_group_pipe.call_count, 2)
self.multisite.create_sync_group_pipe.assert_has_calls([
mock.call(bucket='testbucket1',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*']),
mock.call(bucket='testbucket2',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*']),
])
expected_messages = [
'Updated "testbucket1" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_ENABLED),
'Updated "testbucket2" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_ENABLED),
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 3)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
mock.call(expected_messages[2]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})
def test_disable_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1']
actions.disable_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.multisite.create_sync_group.assert_called_once_with(
bucket='testbucket1',
group_id='default',
status=self.multisite.SYNC_POLICY_FORBIDDEN,
)
self.multisite.create_sync_group_pipe.assert_called_once_with(
bucket='testbucket1',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*'],
)
expected_messages = [
'Updated "testbucket1" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_FORBIDDEN),
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 2)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})
def test_reset_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1']
actions.reset_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.multisite.remove_sync_group.assert_called_once_with(
bucket='testbucket1',
group_id='default',
)
expected_messages = [
'Reset "testbucket1" bucket sync policy',
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 2)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})

View File

@ -287,6 +287,7 @@ class MonContextTest(CharmTestCase):
self.assertEqual(expect, mon_ctxt()) self.assertEqual(expect, mon_ctxt())
self.assertTrue(mock_ensure_rsv_v6.called) self.assertTrue(mock_ensure_rsv_v6.called)
@patch.object(context, 'format_ipv6_addr', lambda *_: None)
@patch('ceph_radosgw_context.https') @patch('ceph_radosgw_context.https')
@patch('charmhelpers.contrib.hahelpers.cluster.relation_ids') @patch('charmhelpers.contrib.hahelpers.cluster.relation_ids')
@patch('charmhelpers.contrib.hahelpers.cluster.config_get') @patch('charmhelpers.contrib.hahelpers.cluster.config_get')

View File

@ -305,11 +305,11 @@ class CephRadosGWUtilTests(CharmTestCase):
def test_listen_port(self): def test_listen_port(self):
self.https.return_value = False self.https.return_value = False
self.assertEquals(80, utils.listen_port()) self.assertEqual(80, utils.listen_port())
self.https.return_value = True self.https.return_value = True
self.assertEquals(443, utils.listen_port()) self.assertEqual(443, utils.listen_port())
self.test_config.set('port', 42) self.test_config.set('port', 42)
self.assertEquals(42, utils.listen_port()) self.assertEqual(42, utils.listen_port())
def test_set_s3_app(self): def test_set_s3_app(self):
self.leader_get.return_value = None self.leader_get.return_value = None

View File

@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import base64 import base64
import json import json
import os
from unittest.mock import ( from unittest.mock import (
patch, call, MagicMock, ANY patch, call, MagicMock, ANY
) )
@ -338,6 +339,8 @@ class CephRadosGWTests(CharmTestCase):
@patch.object(ceph_hooks, 'leader_get') @patch.object(ceph_hooks, 'leader_get')
@patch('charmhelpers.contrib.openstack.ip.service_name', @patch('charmhelpers.contrib.openstack.ip.service_name',
lambda *args: 'ceph-radosgw') lambda *args: 'ceph-radosgw')
@patch('charmhelpers.contrib.openstack.ip.resolve_address',
lambda *args: 'myserv')
@patch('charmhelpers.contrib.openstack.ip.config') @patch('charmhelpers.contrib.openstack.ip.config')
def test_identity_joined_early_version(self, _config, _leader_get): def test_identity_joined_early_version(self, _config, _leader_get):
self.cmp_pkgrevno.return_value = -1 self.cmp_pkgrevno.return_value = -1
@ -687,6 +690,7 @@ class MiscMultisiteTests(CharmTestCase):
'leader_get', 'leader_get',
'is_leader', 'is_leader',
'primary_relation_joined', 'primary_relation_joined',
'primary_relation_changed',
'secondary_relation_changed', 'secondary_relation_changed',
'service_restart', 'service_restart',
'service_name', 'service_name',
@ -724,6 +728,12 @@ class MiscMultisiteTests(CharmTestCase):
def test_process_multisite_relations(self): def test_process_multisite_relations(self):
ceph_hooks.process_multisite_relations() ceph_hooks.process_multisite_relations()
self.primary_relation_joined.assert_called_once_with('primary:1') self.primary_relation_joined.assert_called_once_with('primary:1')
self.assertEqual(self.primary_relation_changed.call_count, 2)
self.primary_relation_changed.assert_has_calls([
call('primary:1', 'rgw/0'),
call('primary:1', 'rgw/1'),
])
self.assertEqual(self.secondary_relation_changed.call_count, 2)
self.secondary_relation_changed.assert_has_calls([ self.secondary_relation_changed.assert_has_calls([
call('secondary:1', 'rgw-s/0'), call('secondary:1', 'rgw-s/0'),
call('secondary:1', 'rgw-s/1'), call('secondary:1', 'rgw-s/1'),
@ -889,6 +899,87 @@ class PrimaryMultisiteTests(CephRadosMultisiteTests):
) )
self.multisite.list_realms.assert_not_called() self.multisite.list_realms.assert_not_called()
def test_primary_relation_changed_sync_policy_state_unset(self):
self.is_leader.return_value = True
self.test_config.set('sync-policy-state', '')
ceph_hooks.primary_relation_changed('primary:1')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_called_once_with('sync-policy-state')
def test_primary_relation_changed_sync_rel_data_incomplete(self):
self.is_leader.return_value = True
self.test_config.set('sync-policy-state', 'allowed')
self.relation_get.return_value = {'zone': 'secondary'}
ceph_hooks.primary_relation_changed('primary:1', 'rgw/0')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_called_once_with('sync-policy-state')
self.relation_get.assert_called_once_with(rid='primary:1',
unit='rgw/0')
def test_primary_relation_changed(self):
self.is_leader.return_value = True
configs = {
'sync-policy-state': 'allowed',
'zonegroup': 'testzonegroup',
'zone': 'zone_a',
}
for k, v in configs.items():
self.test_config.set(k, v)
self.relation_get.return_value = {
'zone': 'zone_b',
'sync_policy_flow_type': 'symmetrical',
# this should force flow type to directional, and ignore the value
# from the relation data.
'zone_tier_type': 'cloud',
}
self.multisite.is_sync_group_update_needed.return_value = True
group_test_data_file = os.path.join(
os.path.dirname(__file__), 'testdata', 'test_get_sync_group.json')
with open(group_test_data_file, 'r') as f:
self.multisite.get_sync_group.return_value = json.loads(f.read())
ceph_hooks.primary_relation_changed('primary:1', 'rgw/0')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_has_calls([
call('sync-policy-state'),
call('zonegroup'),
call('zone'),
])
self.relation_get.assert_called_once_with(rid='primary:1',
unit='rgw/0')
self.multisite.is_sync_group_update_needed.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id='zone_a-zone_b',
pipe_id='zone_a-zone_b',
source_zone='zone_a',
dest_zone='zone_b',
desired_status='allowed',
desired_flow_type=self.multisite.SYNC_FLOW_DIRECTIONAL)
self.multisite.create_sync_group.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
status='allowed')
self.multisite.create_sync_group_flow.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id='zone_a-zone_b',
flow_type=self.multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a', dest_zone='zone_b')
self.multisite.create_sync_group_pipe.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
pipe_id='zone_a-zone_b',
source_zones=['zone_a'], dest_zones=['zone_b'])
self.multisite.update_period.assert_called_once_with(
zonegroup='testzonegroup', zone='zone_a')
self.service_restart.assert_called_once_with('rgw@hostname')
self.leader_set.assert_called_once_with(restart_nonce=ANY)
@patch.object(json, 'loads') @patch.object(json, 'loads')
def test_multisite_relation_departed(self, json_loads): def test_multisite_relation_departed(self, json_loads):
for k, v in self._complete_config.items(): for k, v in self._complete_config.items():
@ -916,6 +1007,7 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
'realm': 'testrealm', 'realm': 'testrealm',
'zonegroup': 'testzonegroup', 'zonegroup': 'testzonegroup',
'zone': 'testzone2', 'zone': 'testzone2',
'sync-policy-flow-type': 'symmetrical',
} }
_test_relation = { _test_relation = {
@ -978,6 +1070,16 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
]) ])
self.service_restart.assert_called_once() self.service_restart.assert_called_once()
self.leader_set.assert_called_once_with(restart_nonce=ANY) self.leader_set.assert_called_once_with(restart_nonce=ANY)
self.relation_set.assert_has_calls([
call(
relation_id='secondary:1',
sync_policy_flow_type='symmetrical',
),
call(
relation_id='secondary:1',
zone='testzone2',
),
])
def test_secondary_relation_changed_incomplete_relation(self): def test_secondary_relation_changed_incomplete_relation(self):
for k, v in self._complete_config.items(): for k, v in self._complete_config.items():
@ -986,6 +1088,7 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
self.relation_get.return_value = {} self.relation_get.return_value = {}
ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0') ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0')
self.config.assert_not_called() self.config.assert_not_called()
self.relation_set.assert_not_called()
def test_secondary_relation_changed_mismatching_config(self): def test_secondary_relation_changed_mismatching_config(self):
for k, v in self._complete_config.items(): for k, v in self._complete_config.items():
@ -999,11 +1102,13 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
call('zone'), call('zone'),
]) ])
self.multisite.list_realms.assert_not_called() self.multisite.list_realms.assert_not_called()
self.relation_set.assert_not_called()
def test_secondary_relation_changed_not_leader(self): def test_secondary_relation_changed_not_leader(self):
self.is_leader.return_value = False self.is_leader.return_value = False
ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0') ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0')
self.relation_get.assert_not_called() self.relation_get.assert_not_called()
self.relation_set.assert_not_called()
@patch.object(ceph_hooks, 'apt_install') @patch.object(ceph_hooks, 'apt_install')
@patch.object(ceph_hooks, 'services') @patch.object(ceph_hooks, 'services')

View File

@ -484,3 +484,233 @@ class TestMultisiteHelpers(CharmTestCase):
multisite.check_cluster_has_buckets(), multisite.check_cluster_has_buckets(),
True True
) )
def test_get_zone_info(self):
multisite.get_zone_info('test_zone', 'test_zonegroup')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'zone', 'get',
'--rgw-zone=test_zone', '--rgw-zonegroup=test_zonegroup',
])
def test_sync_group_exists(self):
groups = [
{'key': 'group1'},
{'key': 'group2'},
]
self.subprocess.check_output.return_value = json.dumps(groups).encode()
self.assertTrue(multisite.sync_group_exists('group1'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
])
def test_bucket_sync_group_exists(self):
with open(self._testdata('test_list_sync_groups'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
self.assertTrue(multisite.sync_group_exists('default',
bucket='test'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
'--bucket=test',
])
def test_sync_group_does_not_exists(self):
with open(self._testdata('test_list_sync_groups'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
self.assertFalse(multisite.sync_group_exists('group-non-existent'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
])
def test_get_sync_group(self):
with open(self._testdata(whoami()), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.get_sync_group('default')
self.assertEqual(result['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
'--group-id=default',
])
def test_create_sync_group(self):
test_group_json = json.dumps({"id": "default"}).encode()
self.subprocess.check_output.return_value = test_group_json
result = multisite.create_sync_group(
group_id='default',
status=multisite.SYNC_POLICY_ENABLED,
)
self.assertEqual(result['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'create',
'--group-id=default',
'--status={}'.format(multisite.SYNC_POLICY_ENABLED),
])
def test_create_sync_group_wrong_status(self):
self.assertRaises(
multisite.UnknownSyncPolicyState,
multisite.create_sync_group, "default", "wrong_status",
)
def test_remove_sync_group(self):
multisite.remove_sync_group('default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'remove',
'--group-id=default',
])
@mock.patch.object(multisite, 'get_sync_group')
@mock.patch.object(multisite, 'sync_group_exists')
def test_is_sync_group_update_needed(self, mock_sync_group_exists,
mock_get_sync_group):
mock_sync_group_exists.return_value = True
with open(self._testdata('test_get_sync_group'), 'r') as f:
mock_get_sync_group.return_value = json.loads(f.read())
result = multisite.is_sync_group_update_needed(
group_id='default',
flow_id='zone_a-zone_b',
pipe_id='zone_a-zone_b',
source_zone='zone_a',
dest_zone='zone_b',
desired_status=multisite.SYNC_POLICY_ALLOWED,
desired_flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
)
mock_sync_group_exists.assert_called_with('default')
mock_get_sync_group.assert_called_with('default')
self.assertFalse(result)
def test_is_sync_group_flow_update_needed(self):
with open(self._testdata('test_get_sync_group'), 'r') as f:
sync_group = json.loads(f.read())
result = multisite.is_sync_group_flow_update_needed(
sync_group,
flow_id='zone_a-zone_b',
source_zone='zone_a', dest_zone='zone_b',
desired_flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
)
self.assertFalse(result)
@mock.patch.object(multisite, 'remove_sync_group_flow')
def test_is_sync_group_flow_update_needed_flow_type_change(
self, mock_remove_sync_group_flow):
with open(self._testdata('test_get_sync_group'), 'r') as f:
sync_group = json.loads(f.read())
result = multisite.is_sync_group_flow_update_needed(
sync_group,
flow_id='zone_a-zone_b',
source_zone='zone_a', dest_zone='zone_b',
desired_flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
)
mock_remove_sync_group_flow.assert_called_with(
group_id='default',
flow_id='zone_a-zone_b',
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
source_zone='zone_a', dest_zone='zone_b',
)
self.assertTrue(result)
def test_create_sync_group_flow_symmetrical(self):
with open(self._testdata('test_create_sync_group_flow'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'create',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=symmetrical',
'--zones=zone_a,zone_b',
])
def test_create_sync_group_flow_directional(self):
with open(self._testdata('test_create_sync_group_flow'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'create',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=directional',
'--source-zone=zone_a', '--dest-zone=zone_b',
])
def test_create_sync_group_flow_wrong_type(self):
self.assertRaises(
multisite.UnknownSyncFlowType,
multisite.create_sync_group_flow,
group_id='default', flow_id='flow_id', flow_type='wrong_type',
source_zone='zone_a', dest_zone='zone_b',
)
def test_remove_sync_group_flow_symmetrical(self):
multisite.remove_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
)
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'remove',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=symmetrical',
])
def test_remove_sync_group_flow_directional(self):
multisite.remove_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'remove',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=directional',
'--source-zone=zone_a', '--dest-zone=zone_b',
])
def test_create_sync_group_pipe(self):
with open(self._testdata(whoami()), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_pipe(
group_id='default',
pipe_id='pipe_id',
source_zones=['zone_a', 'zone_b'],
dest_zones=['zone_c', 'zone_d'],
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'pipe', 'create',
'--group-id=default',
'--pipe-id=pipe_id',
'--source-zones=zone_a,zone_b', '--source-bucket=*',
'--dest-zones=zone_c,zone_d', '--dest-bucket=*',
])

View File

@ -0,0 +1,20 @@
{
"groups": [
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [],
"status": "allowed"
}
]
}

View File

@ -0,0 +1,49 @@
{
"groups": [
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [
{
"id": "zone_a-zone_b",
"source": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"dest": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"params": {
"source": {
"filter": {
"tags": []
}
},
"dest": {},
"priority": 0,
"mode": "system",
"user": ""
}
}
],
"status": "allowed"
}
]
}

View File

@ -0,0 +1,45 @@
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [
{
"id": "zone_a-zone_b",
"source": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"dest": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"params": {
"source": {
"filter": {
"tags": []
}
},
"dest": {},
"priority": 0,
"mode": "system",
"user": ""
}
}
],
"status": "allowed"
}

View File

@ -0,0 +1,45 @@
[
{
"key": "default",
"val": {
"id": "default",
"data_flow": {
"directional": [
{
"source_zone": "zone_a",
"dest_zone": "zone_b"
}
]
},
"pipes": [
{
"id": "zone_a-zone_b",
"source": {
"bucket": "*",
"zones": [
"zone_a"
]
},
"dest": {
"bucket": "*",
"zones": [
"zone_b"
]
},
"params": {
"source": {
"filter": {
"tags": []
}
},
"dest": {},
"priority": 0,
"mode": "system",
"user": ""
}
}
],
"status": "allowed"
}
}
]