Transition member flows to use dicts
This patch converts the member flows to use the provider driver data model. It also stops storing sqlalchemy models in the flow storage in preparation for enabling jobboard. Change-Id: Ic15e4311ce244e71b65069359c6c08e49f6b8d00 Story: 2005072 Task: 30810
This commit is contained in:
parent
b9e74630a6
commit
d6dc2b9a3e
@ -196,22 +196,21 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
id=pool_id)
|
||||
self._validate_members(db_pool, [member])
|
||||
|
||||
payload = {consts.MEMBER_ID: member.member_id}
|
||||
payload = {consts.MEMBER: member.to_dict()}
|
||||
self.client.cast({}, 'create_member', **payload)
|
||||
|
||||
def member_delete(self, member):
|
||||
member_id = member.member_id
|
||||
payload = {consts.MEMBER_ID: member_id}
|
||||
payload = {consts.MEMBER: member.to_dict()}
|
||||
self.client.cast({}, 'delete_member', **payload)
|
||||
|
||||
def member_update(self, old_member, new_member):
|
||||
member_dict = new_member.to_dict()
|
||||
if 'admin_state_up' in member_dict:
|
||||
member_dict['enabled'] = member_dict.pop('admin_state_up')
|
||||
member_id = member_dict.pop('member_id')
|
||||
|
||||
payload = {consts.MEMBER_ID: member_id,
|
||||
consts.MEMBER_UPDATES: member_dict}
|
||||
original_member = old_member.to_dict()
|
||||
member_updates = new_member.to_dict()
|
||||
if 'admin_state_up' in member_updates:
|
||||
member_updates['enabled'] = member_updates.pop('admin_state_up')
|
||||
member_updates.pop(consts.MEMBER_ID)
|
||||
payload = {consts.ORIGINAL_MEMBER: original_member,
|
||||
consts.MEMBER_UPDATES: member_updates}
|
||||
self.client.cast({}, 'update_member', **payload)
|
||||
|
||||
def member_batch_update(self, pool_id, members):
|
||||
@ -248,8 +247,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
deleted_members.append(m)
|
||||
|
||||
if deleted_members or new_members or updated_members:
|
||||
payload = {'old_member_ids': [m.id for m in deleted_members],
|
||||
'new_member_ids': [m.member_id for m in new_members],
|
||||
payload = {'old_members': [m.to_dict() for m in deleted_members],
|
||||
'new_members': [m.to_dict() for m in new_members],
|
||||
'updated_members': updated_members}
|
||||
self.client.cast({}, 'batch_update_members', **payload)
|
||||
else:
|
||||
|
@ -346,6 +346,7 @@ NETWORK_ID = 'network_id'
|
||||
NICS = 'nics'
|
||||
OBJECT = 'object'
|
||||
ORIGINAL_LISTENER = 'original_listener'
|
||||
ORIGINAL_MEMBER = 'original_member'
|
||||
ORIGINAL_POOL = 'original_pool'
|
||||
PEER_PORT = 'peer_port'
|
||||
POOL = 'pool'
|
||||
|
@ -100,28 +100,31 @@ class Endpoints(object):
|
||||
LOG.info('Deleting health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.delete_health_monitor(health_monitor_id)
|
||||
|
||||
def create_member(self, context, member_id):
|
||||
LOG.info('Creating member \'%s\'...', member_id)
|
||||
self.worker.create_member(member_id)
|
||||
def create_member(self, context, member):
|
||||
LOG.info('Creating member \'%s\'...', member.get(constants.ID))
|
||||
self.worker.create_member(member)
|
||||
|
||||
def update_member(self, context, member_id, member_updates):
|
||||
LOG.info('Updating member \'%s\'...', member_id)
|
||||
self.worker.update_member(member_id, member_updates)
|
||||
def update_member(self, context, original_member, member_updates):
|
||||
LOG.info('Updating member \'%s\'...', original_member.get(
|
||||
constants.ID))
|
||||
self.worker.update_member(original_member, member_updates)
|
||||
|
||||
def batch_update_members(self, context, old_member_ids, new_member_ids,
|
||||
def batch_update_members(self, context, old_members, new_members,
|
||||
updated_members):
|
||||
updated_member_ids = [m.get('id') for m in updated_members]
|
||||
updated_member_ids = [m.get(constants.ID) for m in updated_members]
|
||||
new_member_ids = [m.get(constants.ID) for m in new_members]
|
||||
old_member_ids = [m.get(constants.ID) for m in old_members]
|
||||
LOG.info(
|
||||
'Batch updating members: old=\'%(old)s\', new=\'%(new)s\', '
|
||||
'updated=\'%(updated)s\'...',
|
||||
{'old': old_member_ids, 'new': new_member_ids,
|
||||
'updated': updated_member_ids})
|
||||
self.worker.batch_update_members(
|
||||
old_member_ids, new_member_ids, updated_members)
|
||||
old_members, new_members, updated_members)
|
||||
|
||||
def delete_member(self, context, member_id):
|
||||
LOG.info('Deleting member \'%s\'...', member_id)
|
||||
self.worker.delete_member(member_id)
|
||||
def delete_member(self, context, member):
|
||||
LOG.info('Deleting member \'%s\'...', member.get(constants.ID))
|
||||
self.worker.delete_member(member)
|
||||
|
||||
def create_l7policy(self, context, l7policy_id):
|
||||
LOG.info('Creating l7policy \'%s\'...', l7policy_id)
|
||||
|
@ -395,26 +395,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
log=LOG):
|
||||
update_lb_tf.run()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def create_member(self, member_id):
|
||||
def create_member(self, member):
|
||||
"""Creates a pool member.
|
||||
|
||||
:param member_id: ID of the member to create
|
||||
:param member: A member provider dictionary to create
|
||||
:returns: None
|
||||
:raises NoSuitablePool: Unable to find the node pool
|
||||
"""
|
||||
member = self._member_repo.get(db_apis.get_session(),
|
||||
id=member_id)
|
||||
if not member:
|
||||
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
||||
'60 seconds.', 'member', member_id)
|
||||
raise db_exceptions.NoResultFound
|
||||
|
||||
pool = member.pool
|
||||
pool = self._pool_repo.get(db_apis.get_session(),
|
||||
id=member[constants.POOL_ID])
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
listeners_dicts = (
|
||||
@ -432,16 +421,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
log=LOG):
|
||||
create_member_tf.run()
|
||||
|
||||
def delete_member(self, member_id):
|
||||
def delete_member(self, member):
|
||||
"""Deletes a pool member.
|
||||
|
||||
:param member_id: ID of the member to delete
|
||||
:param member: A member provider dictionary to delete
|
||||
:returns: None
|
||||
:raises MemberNotFound: The referenced member was not found
|
||||
"""
|
||||
member = self._member_repo.get(db_apis.get_session(),
|
||||
id=member_id)
|
||||
pool = member.pool
|
||||
pool = self._pool_repo.get(db_apis.get_session(),
|
||||
id=member[constants.POOL_ID])
|
||||
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
listeners_dicts = (
|
||||
@ -454,27 +443,38 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER: load_balancer,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.POOL_ID: pool.id}
|
||||
constants.POOL_ID: pool.id,
|
||||
constants.PROJECT_ID: load_balancer.project_id
|
||||
}
|
||||
|
||||
)
|
||||
with tf_logging.DynamicLoggingListener(delete_member_tf,
|
||||
log=LOG):
|
||||
delete_member_tf.run()
|
||||
|
||||
def batch_update_members(self, old_member_ids, new_member_ids,
|
||||
def batch_update_members(self, old_members, new_members,
|
||||
updated_members):
|
||||
old_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||
for mid in old_member_ids]
|
||||
new_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||
for mid in new_member_ids]
|
||||
updated_members = [
|
||||
(self._member_repo.get(db_apis.get_session(), id=m.get('id')), m)
|
||||
(provider_utils.db_member_to_provider_member(
|
||||
self._member_repo.get(db_apis.get_session(),
|
||||
id=m.get(constants.ID))).to_dict(),
|
||||
m)
|
||||
for m in updated_members]
|
||||
provider_old_members = [
|
||||
provider_utils.db_member_to_provider_member(
|
||||
self._member_repo.get(db_apis.get_session(),
|
||||
id=m.get(constants.ID))).to_dict()
|
||||
for m in old_members]
|
||||
if old_members:
|
||||
pool = old_members[0].pool
|
||||
pool = self._pool_repo.get(db_apis.get_session(),
|
||||
id=old_members[0][constants.POOL_ID])
|
||||
elif new_members:
|
||||
pool = new_members[0].pool
|
||||
pool = self._pool_repo.get(db_apis.get_session(),
|
||||
id=new_members[0][constants.POOL_ID])
|
||||
else:
|
||||
pool = updated_members[0][0].pool
|
||||
pool = self._pool_repo.get(
|
||||
db_apis.get_session(),
|
||||
id=updated_members[0][0][constants.POOL_ID])
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
listeners_dicts = (
|
||||
@ -483,36 +483,27 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
batch_update_members_tf = self._taskflow_load(
|
||||
self._member_flows.get_batch_update_members_flow(
|
||||
old_members, new_members, updated_members),
|
||||
provider_old_members, new_members, updated_members),
|
||||
store={constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER: load_balancer,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.POOL_ID: pool.id})
|
||||
constants.POOL_ID: pool.id,
|
||||
constants.PROJECT_ID: load_balancer.project_id})
|
||||
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
||||
log=LOG):
|
||||
batch_update_members_tf.run()
|
||||
|
||||
def update_member(self, member_id, member_updates):
|
||||
def update_member(self, member, member_updates):
|
||||
"""Updates a pool member.
|
||||
|
||||
:param member_id: ID of the member to update
|
||||
:param member_id: A member provider dictionary to update
|
||||
:param member_updates: Dict containing updated member attributes
|
||||
:returns: None
|
||||
:raises MemberNotFound: The referenced member was not found
|
||||
"""
|
||||
member = None
|
||||
try:
|
||||
member = self._get_db_obj_until_pending_update(
|
||||
self._member_repo, member_id)
|
||||
except tenacity.RetryError as e:
|
||||
LOG.warning('Member did not go into %s in 60 seconds. '
|
||||
'This either due to an in-progress Octavia upgrade '
|
||||
'or an overloaded and failing database. Assuming '
|
||||
'an upgrade is in progress and continuing.',
|
||||
constants.PENDING_UPDATE)
|
||||
member = e.last_attempt.result()
|
||||
|
||||
pool = member.pool
|
||||
# TODO(ataraday) when other flows will use dicts - revisit this
|
||||
pool = self._pool_repo.get(db_apis.get_session(),
|
||||
id=member[constants.POOL_ID])
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
listeners_dicts = (
|
||||
|
@ -77,7 +77,7 @@ class MemberFlows(object):
|
||||
delete_member_flow.add(database_tasks.DeleteMemberInDB(
|
||||
requires=constants.MEMBER))
|
||||
delete_member_flow.add(database_tasks.DecrementMemberQuota(
|
||||
requires=constants.MEMBER))
|
||||
requires=constants.PROJECT_ID))
|
||||
delete_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
requires=constants.POOL_ID))
|
||||
delete_member_flow.add(database_tasks.
|
||||
@ -138,11 +138,13 @@ class MemberFlows(object):
|
||||
unordered_members_flow.add(database_tasks.DeleteMemberInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.DELETE_MEMBER_INDB)))
|
||||
id=m[constants.MEMBER_ID],
|
||||
flow=constants.DELETE_MEMBER_INDB)))
|
||||
unordered_members_flow.add(database_tasks.DecrementMemberQuota(
|
||||
inject={constants.MEMBER: m},
|
||||
requires=constants.PROJECT_ID,
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.DECREMENT_MEMBER_QUOTA_FLOW)))
|
||||
id=m[constants.MEMBER_ID],
|
||||
flow=constants.DECREMENT_MEMBER_QUOTA_FLOW)))
|
||||
|
||||
# Create new members
|
||||
unordered_members_flow.add(
|
||||
@ -155,7 +157,8 @@ class MemberFlows(object):
|
||||
database_tasks.MarkMemberActiveInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
id=m[constants.MEMBER_ID],
|
||||
flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
|
||||
# Update existing members
|
||||
unordered_members_flow.add(
|
||||
@ -165,12 +168,13 @@ class MemberFlows(object):
|
||||
name='{flow}-updated'.format(
|
||||
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||
for m, um in updated_members:
|
||||
um.pop('id', None)
|
||||
um.pop(constants.ID, None)
|
||||
unordered_members_active_flow.add(
|
||||
database_tasks.MarkMemberActiveInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
id=m[constants.MEMBER_ID],
|
||||
flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
|
||||
batch_update_members_flow.add(unordered_members_flow)
|
||||
|
||||
|
@ -222,8 +222,9 @@ class DeleteMemberInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("DB delete member for id: %s ", member.id)
|
||||
self.member_repo.delete(db_apis.get_session(), id=member.id)
|
||||
LOG.debug("DB delete member for id: %s ", member[constants.MEMBER_ID])
|
||||
self.member_repo.delete(db_apis.get_session(),
|
||||
id=member[constants.MEMBER_ID])
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
"""Mark the member ERROR since the delete couldn't happen
|
||||
@ -232,14 +233,16 @@ class DeleteMemberInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting delete in DB for member id %s", member.id)
|
||||
LOG.warning("Reverting delete in DB for member id %s",
|
||||
member[constants.MEMBER_ID])
|
||||
try:
|
||||
self.member_repo.update(db_apis.get_session(), member.id,
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.ERROR)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to update member %(mem)s "
|
||||
"provisioning_status to ERROR due to: %(except)s",
|
||||
{'mem': member.id, 'except': e})
|
||||
{'mem': member[constants.MEMBER_ID], 'except': e})
|
||||
|
||||
|
||||
class DeleteListenerInDB(BaseDatabaseTask):
|
||||
@ -1034,7 +1037,7 @@ class MarkLBActiveInDB(BaseDatabaseTask):
|
||||
|
||||
def _mark_member_status(self, member, status):
|
||||
self.member_repo.update(
|
||||
db_apis.get_session(), member.id,
|
||||
db_apis.get_session(), member[constants.MEMBER_ID],
|
||||
provisioning_status=status)
|
||||
|
||||
def revert(self, loadbalancer, *args, **kwargs):
|
||||
@ -1465,8 +1468,9 @@ class UpdateMemberInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Update DB for member id: %s ", member.id)
|
||||
self.member_repo.update(db_apis.get_session(), member.id,
|
||||
LOG.debug("Update DB for member id: %s ", member[constants.MEMBER_ID])
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member[constants.MEMBER_ID],
|
||||
**update_dict)
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
@ -1477,14 +1481,15 @@ class UpdateMemberInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting update member in DB "
|
||||
"for member id %s", member.id)
|
||||
"for member id %s", member[constants.MEMBER_ID])
|
||||
try:
|
||||
self.member_repo.update(db_apis.get_session(), member.id,
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.ERROR)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to update member %(member)s provisioning_status "
|
||||
"to ERROR due to: %(except)s", {'member': member.id,
|
||||
'except': e})
|
||||
"to ERROR due to: %(except)s",
|
||||
{'member': member[constants.MEMBER_ID], 'except': e})
|
||||
|
||||
|
||||
class UpdatePoolInDB(BaseDatabaseTask):
|
||||
@ -2153,9 +2158,10 @@ class MarkMemberActiveInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Mark ACTIVE in DB for member id: %s", member.id)
|
||||
LOG.debug("Mark ACTIVE in DB for member id: %s",
|
||||
member[constants.MEMBER_ID])
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member.id,
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.ACTIVE)
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
@ -2166,8 +2172,9 @@ class MarkMemberActiveInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark member ACTIVE in DB "
|
||||
"for member id %s", member.id)
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
"for member id %s", member[constants.MEMBER_ID])
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
member[constants.MEMBER_ID])
|
||||
|
||||
|
||||
class MarkMemberPendingCreateInDB(BaseDatabaseTask):
|
||||
@ -2183,9 +2190,10 @@ class MarkMemberPendingCreateInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING CREATE in DB for member id: %s", member.id)
|
||||
LOG.debug("Mark PENDING CREATE in DB for member id: %s",
|
||||
member[constants.MEMBER_ID])
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member.id,
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.PENDING_CREATE)
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
@ -2196,8 +2204,9 @@ class MarkMemberPendingCreateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark member pending create in DB "
|
||||
"for member id %s", member.id)
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
"for member id %s", member[constants.MEMBER_ID])
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
member[constants.MEMBER_ID])
|
||||
|
||||
|
||||
class MarkMemberPendingDeleteInDB(BaseDatabaseTask):
|
||||
@ -2213,9 +2222,10 @@ class MarkMemberPendingDeleteInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING DELETE in DB for member id: %s", member.id)
|
||||
LOG.debug("Mark PENDING DELETE in DB for member id: %s",
|
||||
member[constants.MEMBER_ID])
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member.id,
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.PENDING_DELETE)
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
@ -2226,8 +2236,9 @@ class MarkMemberPendingDeleteInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark member pending delete in DB "
|
||||
"for member id %s", member.id)
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
"for member id %s", member[constants.MEMBER_ID])
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
member[constants.MEMBER_ID])
|
||||
|
||||
|
||||
class MarkMemberPendingUpdateInDB(BaseDatabaseTask):
|
||||
@ -2244,9 +2255,9 @@ class MarkMemberPendingUpdateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING UPDATE in DB for member id: %s",
|
||||
member.id)
|
||||
member[constants.MEMBER_ID])
|
||||
self.member_repo.update(db_apis.get_session(),
|
||||
member.id,
|
||||
member[constants.MEMBER_ID],
|
||||
provisioning_status=constants.PENDING_UPDATE)
|
||||
|
||||
def revert(self, member, *args, **kwargs):
|
||||
@ -2257,8 +2268,9 @@ class MarkMemberPendingUpdateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark member pending update in DB "
|
||||
"for member id %s", member.id)
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
"for member id %s", member[constants.MEMBER_ID])
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
member[constants.MEMBER_ID])
|
||||
|
||||
|
||||
class MarkPoolActiveInDB(BaseDatabaseTask):
|
||||
@ -2567,7 +2579,7 @@ class DecrementMemberQuota(BaseDatabaseTask):
|
||||
Since sqlalchemy will likely retry by itself always revert if it fails
|
||||
"""
|
||||
|
||||
def execute(self, member):
|
||||
def execute(self, project_id):
|
||||
"""Decrements the member quota.
|
||||
|
||||
:param member: The member to decrement the quota on.
|
||||
@ -2575,22 +2587,22 @@ class DecrementMemberQuota(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Decrementing member quota for "
|
||||
"project: %s ", member.project_id)
|
||||
"project: %s ", project_id)
|
||||
|
||||
lock_session = db_apis.get_session(autocommit=False)
|
||||
try:
|
||||
self.repos.decrement_quota(lock_session,
|
||||
data_models.Member,
|
||||
member.project_id)
|
||||
project_id)
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error('Failed to decrement member quota for project: '
|
||||
'%(proj)s the project may have excess quota in use.',
|
||||
{'proj': member.project_id})
|
||||
{'proj': project_id})
|
||||
lock_session.rollback()
|
||||
|
||||
def revert(self, member, result, *args, **kwargs):
|
||||
def revert(self, project_id, result, *args, **kwargs):
|
||||
"""Re-apply the quota
|
||||
|
||||
:param member: The member to decrement the quota on.
|
||||
@ -2599,7 +2611,7 @@ class DecrementMemberQuota(BaseDatabaseTask):
|
||||
|
||||
LOG.warning('Reverting decrement quota for member on project %(proj)s '
|
||||
'Project quota counts may be incorrect.',
|
||||
{'proj': member.project_id})
|
||||
{'proj': project_id})
|
||||
|
||||
# Increment the quota back if this task wasn't the failure
|
||||
if not isinstance(result, failure.Failure):
|
||||
@ -2611,7 +2623,7 @@ class DecrementMemberQuota(BaseDatabaseTask):
|
||||
self.repos.check_quota_met(session,
|
||||
lock_session,
|
||||
data_models.Member,
|
||||
member.project_id)
|
||||
project_id)
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
lock_session.rollback()
|
||||
|
@ -145,7 +145,8 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
|
||||
|
||||
def revert(self, member, listeners, loadbalancer, pool_id, *args,
|
||||
**kwargs):
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
member[constants.MEMBER_ID])
|
||||
for listener in listeners:
|
||||
self.task_utils.mark_listener_prov_status_active(
|
||||
listener[constants.LISTENER_ID])
|
||||
@ -162,7 +163,8 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
||||
def revert(self, members, listeners, loadbalancer, pool_id, *args,
|
||||
**kwargs):
|
||||
for m in members:
|
||||
self.task_utils.mark_member_prov_status_error(m.id)
|
||||
self.task_utils.mark_member_prov_status_error(
|
||||
m[constants.MEMBER_ID])
|
||||
for listener in listeners:
|
||||
self.task_utils.mark_listener_prov_status_active(
|
||||
listener[constants.LISTENER_ID])
|
||||
|
@ -239,9 +239,9 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
@mock.patch('oslo_messaging.RPCClient.cast')
|
||||
def test_member_create(self, mock_cast, mock_pool_get, mock_session):
|
||||
provider_member = driver_dm.Member(
|
||||
member_id=self.sample_data.member1_id)
|
||||
member_id=self.sample_data)
|
||||
self.amp_driver.member_create(provider_member)
|
||||
payload = {consts.MEMBER_ID: self.sample_data.member1_id}
|
||||
payload = {consts.MEMBER: provider_member.to_dict()}
|
||||
mock_cast.assert_called_with({}, 'create_member', **payload)
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
@ -263,7 +263,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
member_id=self.sample_data.member1_id,
|
||||
address="192.0.2.1")
|
||||
self.amp_driver.member_create(provider_member)
|
||||
payload = {consts.MEMBER_ID: self.sample_data.member1_id}
|
||||
payload = {consts.MEMBER: provider_member.to_dict()}
|
||||
mock_cast.assert_called_with({}, 'create_member', **payload)
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
@ -293,7 +293,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
provider_member = driver_dm.Member(
|
||||
member_id=self.sample_data.member1_id)
|
||||
self.amp_driver.member_delete(provider_member)
|
||||
payload = {consts.MEMBER_ID: self.sample_data.member1_id}
|
||||
payload = {consts.MEMBER: provider_member.to_dict()}
|
||||
mock_cast.assert_called_with({}, 'delete_member', **payload)
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.cast')
|
||||
@ -302,9 +302,12 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
member_id=self.sample_data.member1_id)
|
||||
provider_member = driver_dm.Member(
|
||||
member_id=self.sample_data.member1_id, admin_state_up=True)
|
||||
member_dict = {'enabled': True}
|
||||
self.amp_driver.member_update(old_provider_member, provider_member)
|
||||
payload = {consts.MEMBER_ID: self.sample_data.member1_id,
|
||||
member_dict = provider_member.to_dict()
|
||||
member_dict.pop(consts.MEMBER_ID)
|
||||
member_dict['enabled'] = member_dict.pop('admin_state_up')
|
||||
self.amp_driver.member_update(old_provider_member,
|
||||
provider_member)
|
||||
payload = {consts.ORIGINAL_MEMBER: old_provider_member.to_dict(),
|
||||
consts.MEMBER_UPDATES: member_dict}
|
||||
mock_cast.assert_called_with({}, 'update_member', **payload)
|
||||
|
||||
@ -314,9 +317,11 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
member_id=self.sample_data.member1_id)
|
||||
provider_member = driver_dm.Member(
|
||||
member_id=self.sample_data.member1_id, name='Great member')
|
||||
member_dict = {'name': 'Great member'}
|
||||
self.amp_driver.member_update(old_provider_member, provider_member)
|
||||
payload = {consts.MEMBER_ID: self.sample_data.member1_id,
|
||||
member_dict = provider_member.to_dict()
|
||||
member_dict.pop(consts.MEMBER_ID)
|
||||
self.amp_driver.member_update(old_provider_member,
|
||||
provider_member)
|
||||
payload = {consts.ORIGINAL_MEMBER: old_provider_member.to_dict(),
|
||||
consts.MEMBER_UPDATES: member_dict}
|
||||
mock_cast.assert_called_with({}, 'update_member', **payload)
|
||||
|
||||
@ -351,9 +356,10 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
self.amp_driver.member_batch_update(
|
||||
self.sample_data.pool1_id, prov_members)
|
||||
|
||||
payload = {'old_member_ids': [self.sample_data.member1_id],
|
||||
'new_member_ids': [self.sample_data.member3_id],
|
||||
'updated_members': [update_mem_dict]}
|
||||
payload = {
|
||||
'old_members': [self.sample_data.db_pool1_members[0].to_dict()],
|
||||
'new_members': [prov_new_member.to_dict()],
|
||||
'updated_members': [update_mem_dict]}
|
||||
mock_cast.assert_called_with({}, 'batch_update_members', **payload)
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
@ -386,9 +392,10 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
self.amp_driver.member_batch_update(
|
||||
self.sample_data.pool1_id, prov_members)
|
||||
|
||||
payload = {'old_member_ids': [self.sample_data.member1_id],
|
||||
'new_member_ids': [self.sample_data.member3_id],
|
||||
'updated_members': [update_mem_dict]}
|
||||
payload = {
|
||||
'old_members': [self.sample_data.db_pool1_members[0].to_dict()],
|
||||
'new_members': [prov_new_member.to_dict()],
|
||||
'updated_members': [update_mem_dict]}
|
||||
mock_cast.assert_called_with({}, 'batch_update_members', **payload)
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
@ -461,8 +468,9 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
self.amp_driver.member_batch_update(
|
||||
self.sample_data.pool1_id, prov_members)
|
||||
|
||||
payload = {'old_member_ids': [self.sample_data.member1_id],
|
||||
'new_member_ids': [self.sample_data.member3_id],
|
||||
payload = {'old_members':
|
||||
[self.sample_data.db_pool1_members[0].to_dict()],
|
||||
'new_members': [prov_new_member.to_dict()],
|
||||
'updated_members': [update_mem_dict]}
|
||||
mock_cast.assert_called_with({}, 'batch_update_members', **payload)
|
||||
|
||||
|
@ -124,26 +124,29 @@ class TestEndpoints(base.TestCase):
|
||||
self.resource_id)
|
||||
|
||||
def test_create_member(self):
|
||||
self.ep.create_member(self.context, self.resource_id)
|
||||
self.ep.create_member(self.context, self.resource)
|
||||
self.ep.worker.create_member.assert_called_once_with(
|
||||
self.resource_id)
|
||||
self.resource)
|
||||
|
||||
def test_update_member(self):
|
||||
self.ep.update_member(self.context, self.resource_id,
|
||||
self.ep.update_member(self.context, self.resource,
|
||||
self.resource_updates)
|
||||
self.ep.worker.update_member.assert_called_once_with(
|
||||
self.resource_id, self.resource_updates)
|
||||
self.resource, self.resource_updates)
|
||||
|
||||
def test_batch_update_members(self):
|
||||
self.ep.batch_update_members(
|
||||
self.context, [9], [11], [self.resource_updates])
|
||||
self.context, [{constants.MEMBER_ID: 9}],
|
||||
[{constants.MEMBER_ID: 11}],
|
||||
[self.resource_updates])
|
||||
self.ep.worker.batch_update_members.assert_called_once_with(
|
||||
[9], [11], [self.resource_updates])
|
||||
[{constants.MEMBER_ID: 9}], [{constants.MEMBER_ID: 11}],
|
||||
[self.resource_updates])
|
||||
|
||||
def test_delete_member(self):
|
||||
self.ep.delete_member(self.context, self.resource_id)
|
||||
self.ep.delete_member(self.context, self.resource)
|
||||
self.ep.worker.delete_member.assert_called_once_with(
|
||||
self.resource_id)
|
||||
self.resource)
|
||||
|
||||
def test_create_l7policy(self):
|
||||
self.ep.create_l7policy(self.context, self.resource_id)
|
||||
|
@ -56,8 +56,9 @@ class TestMemberFlows(base.TestCase):
|
||||
self.assertIn(constants.LISTENERS, member_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
|
||||
self.assertIn(constants.POOL_ID, member_flow.requires)
|
||||
self.assertIn(constants.PROJECT_ID, member_flow.requires)
|
||||
|
||||
self.assertEqual(5, len(member_flow.requires))
|
||||
self.assertEqual(6, len(member_flow.requires))
|
||||
self.assertEqual(0, len(member_flow.provides))
|
||||
|
||||
def test_get_update_member_flow(self, mock_get_net_driver):
|
||||
|
@ -121,6 +121,11 @@ class TestDatabaseTasks(base.TestCase):
|
||||
self.db_pool_mock.id = POOL_ID
|
||||
self.db_pool_mock.health_monitor = self.health_mon_mock
|
||||
|
||||
self.member_mock = {
|
||||
constants.MEMBER_ID: MEMBER_ID,
|
||||
constants.POOL_ID: POOL_ID,
|
||||
}
|
||||
|
||||
self.l7policy_mock = mock.MagicMock()
|
||||
self.l7policy_mock.id = L7POLICY_ID
|
||||
|
||||
@ -1279,16 +1284,16 @@ class TestDatabaseTasks(base.TestCase):
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
unused_pool = data_models.Pool(id='unused_pool')
|
||||
members1 = [data_models.Member(id='member1'),
|
||||
data_models.Member(id='member2')]
|
||||
members1 = [{constants.MEMBER_ID: 'member1'},
|
||||
{constants.MEMBER_ID: 'member2'}]
|
||||
health_monitor = data_models.HealthMonitor(id='hm1')
|
||||
default_pool = data_models.Pool(id='default_pool',
|
||||
members=members1,
|
||||
health_monitor=health_monitor)
|
||||
listener1 = data_models.Listener(id='listener1',
|
||||
default_pool=default_pool)
|
||||
members2 = [data_models.Member(id='member3'),
|
||||
data_models.Member(id='member4')]
|
||||
members2 = [{constants.MEMBER_ID: 'member3'},
|
||||
{constants.MEMBER_ID: 'member4'}]
|
||||
redirect_pool = data_models.Pool(id='redirect_pool',
|
||||
members=members2)
|
||||
l7rules = [data_models.L7Rule(id='rule1')]
|
||||
@ -1323,16 +1328,6 @@ class TestDatabaseTasks(base.TestCase):
|
||||
provisioning_status=constants.ACTIVE),
|
||||
mock.call('TEST', redirect_pool.id,
|
||||
provisioning_status=constants.ACTIVE)])
|
||||
self.assertEqual(4, repo.MemberRepository.update.call_count)
|
||||
repo.MemberRepository.update.has_calls(
|
||||
[mock.call('TEST', members1[0].id,
|
||||
provisioning_status=constants.ACTIVE),
|
||||
mock.call('TEST', members1[1].id,
|
||||
provisioning_status=constants.ACTIVE),
|
||||
mock.call('TEST', members2[0].id,
|
||||
provisioning_status=constants.ACTIVE),
|
||||
mock.call('TEST', members2[1].id,
|
||||
provisioning_status=constants.ACTIVE)])
|
||||
self.assertEqual(1, repo.HealthMonitorRepository.update.call_count)
|
||||
repo.HealthMonitorRepository.update.has_calls(
|
||||
[mock.call('TEST', health_monitor.id,
|
||||
@ -1371,16 +1366,6 @@ class TestDatabaseTasks(base.TestCase):
|
||||
provisioning_status=constants.ERROR),
|
||||
mock.call('TEST', redirect_pool.id,
|
||||
provisioning_status=constants.ERROR)])
|
||||
self.assertEqual(4, repo.MemberRepository.update.call_count)
|
||||
repo.MemberRepository.update.has_calls(
|
||||
[mock.call('TEST', members1[0].id,
|
||||
provisioning_status=constants.ERROR),
|
||||
mock.call('TEST', members1[1].id,
|
||||
provisioning_status=constants.ERROR),
|
||||
mock.call('TEST', members2[0].id,
|
||||
provisioning_status=constants.ERROR),
|
||||
mock.call('TEST', members2[1].id,
|
||||
provisioning_status=constants.ERROR)])
|
||||
self.assertEqual(1, repo.HealthMonitorRepository.update.call_count)
|
||||
repo.HealthMonitorRepository.update.has_calls(
|
||||
[mock.call('TEST', health_monitor.id,
|
||||
|
@ -175,9 +175,11 @@ class TestDatabaseTasksQuota(base.TestCase):
|
||||
self._test_decrement_quota(task, data_model, project_id=project_id)
|
||||
|
||||
def test_decrement_member_quota(self):
|
||||
project_id = uuidutils.generate_uuid()
|
||||
task = database_tasks.DecrementMemberQuota()
|
||||
data_model = data_models.Member
|
||||
self._test_decrement_quota(task, data_model)
|
||||
self._test_decrement_quota(task, data_model,
|
||||
project_id=project_id)
|
||||
|
||||
@mock.patch('octavia.db.repositories.Repositories.decrement_quota')
|
||||
@mock.patch('octavia.db.repositories.Repositories.check_quota_met')
|
||||
|
@ -304,7 +304,8 @@ class TestLifecycleTasks(base.TestCase):
|
||||
member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask()
|
||||
|
||||
# Execute
|
||||
member_to_error_on_revert.execute(self.MEMBER,
|
||||
member_to_error_on_revert.execute({constants.MEMBER_ID:
|
||||
self.MEMBER_ID},
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL_ID)
|
||||
@ -312,7 +313,7 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.assertFalse(mock_member_prov_status_error.called)
|
||||
|
||||
# Revert
|
||||
member_to_error_on_revert.revert(self.MEMBER,
|
||||
member_to_error_on_revert.revert({constants.MEMBER_ID: self.MEMBER_ID},
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL_ID)
|
||||
@ -344,7 +345,8 @@ class TestLifecycleTasks(base.TestCase):
|
||||
lifecycle_tasks.MembersToErrorOnRevertTask())
|
||||
|
||||
# Execute
|
||||
members_to_error_on_revert.execute(self.MEMBERS,
|
||||
members_to_error_on_revert.execute([{constants.MEMBER_ID:
|
||||
self.MEMBER_ID}],
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL_ID)
|
||||
@ -352,7 +354,8 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.assertFalse(mock_member_prov_status_error.called)
|
||||
|
||||
# Revert
|
||||
members_to_error_on_revert.revert(self.MEMBERS,
|
||||
members_to_error_on_revert.revert([{constants.MEMBER_ID:
|
||||
self.MEMBER_ID}],
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL_ID)
|
||||
|
@ -53,6 +53,7 @@ _vip_mock = mock.MagicMock()
|
||||
_listener_mock = mock.MagicMock()
|
||||
_load_balancer_mock = mock.MagicMock()
|
||||
_load_balancer_mock.listeners = [_listener_mock]
|
||||
_load_balancer_mock.project_id = PROJECT_ID
|
||||
_member_mock = mock.MagicMock()
|
||||
_pool_mock = {constants.POOL_ID: POOL_ID}
|
||||
_db_pool_mock = mock.MagicMock()
|
||||
@ -740,13 +741,13 @@ class TestControllerWorker(base.TestCase):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
mock_member_repo_get.side_effect = [None, _member_mock]
|
||||
|
||||
_member = _member_mock.to_dict()
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_member(MEMBER_ID)
|
||||
cw.create_member(_member)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.MEMBER: _member_mock,
|
||||
store={constants.MEMBER: _member,
|
||||
constants.LISTENERS:
|
||||
[self.ref_listener_dict],
|
||||
constants.LOADBALANCER_ID:
|
||||
@ -757,7 +758,6 @@ class TestControllerWorker(base.TestCase):
|
||||
POOL_ID}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
self.assertEqual(2, mock_member_repo_get.call_count)
|
||||
|
||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||
'member_flows.MemberFlows.get_delete_member_flow',
|
||||
@ -777,13 +777,13 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
|
||||
_member = _member_mock.to_dict()
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_member(MEMBER_ID)
|
||||
cw.delete_member(_member)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock, store={constants.MEMBER: _member_mock,
|
||||
_flow_mock, store={constants.MEMBER: _member,
|
||||
constants.LISTENERS:
|
||||
[self.ref_listener_dict],
|
||||
constants.LOADBALANCER_ID:
|
||||
@ -791,7 +791,8 @@ class TestControllerWorker(base.TestCase):
|
||||
constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
constants.POOL_ID:
|
||||
POOL_ID}))
|
||||
POOL_ID,
|
||||
constants.PROJECT_ID: PROJECT_ID}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
@ -813,14 +814,15 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
_member_mock.provisioning_status = constants.PENDING_UPDATE
|
||||
_member = _member_mock.to_dict()
|
||||
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
|
||||
cw.update_member(_member, MEMBER_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.MEMBER: _member_mock,
|
||||
store={constants.MEMBER: _member,
|
||||
constants.LISTENERS:
|
||||
[self.ref_listener_dict],
|
||||
constants.LOADBALANCER:
|
||||
@ -854,15 +856,18 @@ class TestControllerWorker(base.TestCase):
|
||||
_flow_mock.reset_mock()
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
||||
|
||||
cw.batch_update_members([{constants.MEMBER_ID: 9,
|
||||
constants.POOL_ID: 'testtest'}],
|
||||
[{constants.MEMBER_ID: 11}],
|
||||
[MEMBER_UPDATE_DICT])
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.LISTENERS: [self.ref_listener_dict],
|
||||
constants.LOADBALANCER_ID: _load_balancer_mock.id,
|
||||
constants.LOADBALANCER: _load_balancer_mock,
|
||||
constants.POOL_ID: POOL_ID}))
|
||||
constants.POOL_ID: POOL_ID,
|
||||
constants.PROJECT_ID: PROJECT_ID}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user