Allow PUT to /pools/<id>/members to batch update members
Also fix an incorrect exposure of /healthmonitors on /pools and a badly ordered flow for member updates. Change-Id: Id256ea94293519b75983f7a44945ac9bbbf25cd1 Implements: blueprint member-put-list
This commit is contained in:
1
api-ref/source/v2/examples/member-batch-update-curl
Normal file
1
api-ref/source/v2/examples/member-batch-update-curl
Normal file
@@ -0,0 +1 @@
|
|||||||
|
curl -X PUT -H "Content-Type: application/json" -H "X-Auth-Token: <token>" -d '{"members":[{"name":"web-server-1","weight":"20","admin_state_up":true,"subnet_id":"bbb35f84-35cc-4b2f-84c2-a6a29bba68aa","address":"192.0.2.16","protocol_port":"80","monitor_port":8080},{"name":"web-server-2","weight":"10","admin_state_up":true,"subnet_id":"bbb35f84-35cc-4b2f-84c2-a6a29bba68aa","address":"192.0.2.17","protocol_port":"80","monitor_port":8080}]}' http://198.51.100.10:9876/v2.0/lbaas/pools/4029d267-3983-4224-a3d0-afb3fe16a2cd/members
|
22
api-ref/source/v2/examples/member-batch-update-request.json
Normal file
22
api-ref/source/v2/examples/member-batch-update-request.json
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"members": [
|
||||||
|
{
|
||||||
|
"name": "web-server-1",
|
||||||
|
"weight": 20,
|
||||||
|
"admin_state_up": true,
|
||||||
|
"subnet_id": "bbb35f84-35cc-4b2f-84c2-a6a29bba68aa",
|
||||||
|
"address": "192.0.2.16",
|
||||||
|
"protocol_port": 80,
|
||||||
|
"monitor_port": 8080
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "web-server-2",
|
||||||
|
"weight": 10,
|
||||||
|
"admin_state_up": true,
|
||||||
|
"subnet_id": "bbb35f84-35cc-4b2f-84c2-a6a29bba68aa",
|
||||||
|
"address": "192.0.2.17",
|
||||||
|
"protocol_port": 80,
|
||||||
|
"monitor_port": 8080
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@@ -340,8 +340,79 @@ Response Example
|
|||||||
.. literalinclude:: examples/member-update-response.json
|
.. literalinclude:: examples/member-update-response.json
|
||||||
:language: javascript
|
:language: javascript
|
||||||
|
|
||||||
|
Batch Update Members
|
||||||
|
====================
|
||||||
|
|
||||||
|
.. rest_method:: PUT /v2.0/lbaas/pools/{pool_id}/members
|
||||||
|
|
||||||
|
Set the state of members for a pool in one API call. This may include
|
||||||
|
creating new members, deleting old members, and updating existing members.
|
||||||
|
Existing members are matched based on address/port combination.
|
||||||
|
|
||||||
|
For example, assume a pool currently has two members. These members have the
|
||||||
|
following address/port combinations: '192.0.2.15:80' and '192.0.2.16:80'.
|
||||||
|
Now assume a PUT request is made that includes members with address/port
|
||||||
|
combinations: '192.0.2.16:80' and '192.0.2.17:80'.
|
||||||
|
The member '192.0.2.15:80' will be deleted, because it was not in the request.
|
||||||
|
The member '192.0.2.16:80' will be updated to match the request data for that
|
||||||
|
member, because it was matched.
|
||||||
|
The member '192.0.2.17:80' will be created, because no such member existed.
|
||||||
|
|
||||||
|
If the request is valid, the service returns the ``Accepted (202)``
|
||||||
|
response code. To confirm the updates, check that the member provisioning
|
||||||
|
statuses are ``ACTIVE`` for new or updated members, and that any unspecified
|
||||||
|
members were correctly deleted. If the statuses are ``PENDING_UPDATE`` or
|
||||||
|
``PENDING_DELETE``, use GET to poll the member objects for changes.
|
||||||
|
|
||||||
|
.. rest_status_code:: success ../http-status.yaml
|
||||||
|
|
||||||
|
- 202
|
||||||
|
|
||||||
|
.. rest_status_code:: error ../http-status.yaml
|
||||||
|
|
||||||
|
- 400
|
||||||
|
- 401
|
||||||
|
- 403
|
||||||
|
- 404
|
||||||
|
- 409
|
||||||
|
- 500
|
||||||
|
- 503
|
||||||
|
|
||||||
|
Request
|
||||||
|
-------
|
||||||
|
|
||||||
|
.. rest_parameters:: ../parameters.yaml
|
||||||
|
|
||||||
|
- admin_state_up: admin_state_up-default-optional
|
||||||
|
- address: address
|
||||||
|
- monitor_address: monitor_address-optional
|
||||||
|
- monitor_port: monitor_port-optional
|
||||||
|
- name: name-optional
|
||||||
|
- pool_id: path-pool-id
|
||||||
|
- project_id: project_id-optional-deprecated
|
||||||
|
- protocol_port: protocol_port
|
||||||
|
- subnet_id: subnet_id-optional
|
||||||
|
- weight: weight-optional
|
||||||
|
|
||||||
|
Request Example
|
||||||
|
---------------
|
||||||
|
|
||||||
|
.. literalinclude:: examples/member-batch-update-request.json
|
||||||
|
:language: javascript
|
||||||
|
|
||||||
|
Curl Example
|
||||||
|
------------
|
||||||
|
|
||||||
|
.. literalinclude:: examples/member-batch-update-curl
|
||||||
|
:language: bash
|
||||||
|
|
||||||
|
Response
|
||||||
|
--------
|
||||||
|
|
||||||
|
There is no body content for the response of a successful PUT request.
|
||||||
|
|
||||||
Remove a Member
|
Remove a Member
|
||||||
=================
|
===============
|
||||||
|
|
||||||
.. rest_method:: DELETE /v2.0/lbaas/pools/{pool_id}/members/{member_id}
|
.. rest_method:: DELETE /v2.0/lbaas/pools/{pool_id}/members/{member_id}
|
||||||
|
|
||||||
|
@@ -39,7 +39,8 @@ def validate_input(expected, actual):
|
|||||||
raise InvalidHandlerInputObject(obj_type=actual.__class__)
|
raise InvalidHandlerInputObject(obj_type=actual.__class__)
|
||||||
|
|
||||||
|
|
||||||
def simulate_controller(data_model, delete=False, update=False, create=False):
|
def simulate_controller(data_model, delete=False, update=False, create=False,
|
||||||
|
batch_update=False):
|
||||||
"""Simulates a successful controller operator for a data model.
|
"""Simulates a successful controller operator for a data model.
|
||||||
|
|
||||||
:param data_model: data model to simulate controller operation
|
:param data_model: data model to simulate controller operation
|
||||||
@@ -47,7 +48,8 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
|
|||||||
"""
|
"""
|
||||||
repo = repos.Repositories()
|
repo = repos.Repositories()
|
||||||
|
|
||||||
def member_controller(member, delete=False, update=False, create=False):
|
def member_controller(member, delete=False, update=False, create=False,
|
||||||
|
batch_update=False):
|
||||||
time.sleep(ASYNC_TIME)
|
time.sleep(ASYNC_TIME)
|
||||||
LOG.info("Simulating controller operation for member...")
|
LOG.info("Simulating controller operation for member...")
|
||||||
|
|
||||||
@@ -63,6 +65,11 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
|
|||||||
elif create:
|
elif create:
|
||||||
repo.member.update(db_api.get_session(), member.id,
|
repo.member.update(db_api.get_session(), member.id,
|
||||||
operating_status=constants.ONLINE)
|
operating_status=constants.ONLINE)
|
||||||
|
elif batch_update:
|
||||||
|
members = member
|
||||||
|
for member in members:
|
||||||
|
repo.member.update(db_api.get_session(), member.id,
|
||||||
|
operating_status=constants.ONLINE)
|
||||||
listeners = []
|
listeners = []
|
||||||
if db_mem:
|
if db_mem:
|
||||||
for listener in db_mem.pool.listeners:
|
for listener in db_mem.pool.listeners:
|
||||||
@@ -123,6 +130,7 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
|
|||||||
l7rule_dict = l7rule.to_dict()
|
l7rule_dict = l7rule.to_dict()
|
||||||
repo.l7rule.update(db_api.get_session(), l7rule.id, **l7rule_dict)
|
repo.l7rule.update(db_api.get_session(), l7rule.id, **l7rule_dict)
|
||||||
elif create:
|
elif create:
|
||||||
|
l7rule_dict = l7rule.to_dict()
|
||||||
db_l7rule = repo.l7rule.create(db_api.get_session(), **l7rule_dict)
|
db_l7rule = repo.l7rule.create(db_api.get_session(), **l7rule_dict)
|
||||||
if db_l7rule.l7policy.listener:
|
if db_l7rule.l7policy.listener:
|
||||||
listener = db_l7rule.l7policy.listener
|
listener = db_l7rule.l7policy.listener
|
||||||
@@ -392,6 +400,25 @@ class MemberHandler(abstract_handler.BaseObjectHandler):
|
|||||||
member.id = old_member.id
|
member.id = old_member.id
|
||||||
simulate_controller(member, update=True)
|
simulate_controller(member, update=True)
|
||||||
|
|
||||||
|
def batch_update(self, old_member_ids, new_member_ids, updated_members):
|
||||||
|
for m in updated_members:
|
||||||
|
validate_input(data_models.Member, m)
|
||||||
|
LOG.info("%(entity)s handling the batch update of members: "
|
||||||
|
"old=%(old)s, new=%(new)s",
|
||||||
|
{"entity": self.__class__.__name__, "old": old_member_ids,
|
||||||
|
"new": new_member_ids})
|
||||||
|
|
||||||
|
repo = repos.Repositories()
|
||||||
|
old_members = [repo.member.get(db_api.get_session(), mid)
|
||||||
|
for mid in old_member_ids]
|
||||||
|
new_members = [repo.member.get(db_api.get_session(), mid)
|
||||||
|
for mid in new_member_ids]
|
||||||
|
all_members = []
|
||||||
|
all_members.extend(old_members)
|
||||||
|
all_members.extend(new_members)
|
||||||
|
all_members.extend(updated_members)
|
||||||
|
simulate_controller(all_members, batch_update=True)
|
||||||
|
|
||||||
def delete(self, member_id):
|
def delete(self, member_id):
|
||||||
LOG.info("%(entity)s handling the deletion of member %(id)s",
|
LOG.info("%(entity)s handling the deletion of member %(id)s",
|
||||||
{"entity": self.__class__.__name__, "id": member_id})
|
{"entity": self.__class__.__name__, "id": member_id})
|
||||||
|
@@ -163,6 +163,21 @@ class MemberProducer(BaseProducer):
|
|||||||
def payload_class(self):
|
def payload_class(self):
|
||||||
return self.PAYLOAD_CLASS
|
return self.PAYLOAD_CLASS
|
||||||
|
|
||||||
|
def batch_update(self, old_ids, new_ids, updated_models):
|
||||||
|
"""sends an update message to the controller via oslo.messaging
|
||||||
|
|
||||||
|
:param old_ids: list of member ids that are being deleted
|
||||||
|
:param new_ids: list of member ids that are being created
|
||||||
|
:param updated_models: list of member model objects to update
|
||||||
|
"""
|
||||||
|
updated_dicts = [m.to_dict(render_unsets=False)
|
||||||
|
for m in updated_models]
|
||||||
|
kw = {"old_{0}_ids".format(self.payload_class): old_ids,
|
||||||
|
"new_{0}_ids".format(self.payload_class): new_ids,
|
||||||
|
"updated_{0}s".format(self.payload_class): updated_dicts}
|
||||||
|
method_name = "batch_update_{0}s".format(self.payload_class)
|
||||||
|
self.client.cast({}, method_name, **kw)
|
||||||
|
|
||||||
|
|
||||||
class L7PolicyProducer(BaseProducer):
|
class L7PolicyProducer(BaseProducer):
|
||||||
"""Sends updates,deletes and creates to the RPC end of the queue consumer
|
"""Sends updates,deletes and creates to the RPC end of the queue consumer
|
||||||
|
@@ -34,11 +34,11 @@ from octavia.db import prepare as db_prepare
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MembersController(base.BaseController):
|
class MemberController(base.BaseController):
|
||||||
RBAC_TYPE = constants.RBAC_MEMBER
|
RBAC_TYPE = constants.RBAC_MEMBER
|
||||||
|
|
||||||
def __init__(self, pool_id):
|
def __init__(self, pool_id):
|
||||||
super(MembersController, self).__init__()
|
super(MemberController, self).__init__()
|
||||||
self.pool_id = pool_id
|
self.pool_id = pool_id
|
||||||
self.handler = self.handler.member
|
self.handler = self.handler.member
|
||||||
|
|
||||||
@@ -265,3 +265,80 @@ class MembersController(base.BaseController):
|
|||||||
self.repositories.member.update(
|
self.repositories.member.update(
|
||||||
lock_session, db_member.id,
|
lock_session, db_member.id,
|
||||||
provisioning_status=constants.ERROR)
|
provisioning_status=constants.ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
class MembersController(MemberController):
|
||||||
|
|
||||||
|
def __init__(self, pool_id):
|
||||||
|
super(MembersController, self).__init__(pool_id)
|
||||||
|
|
||||||
|
@wsme_pecan.wsexpose(None, wtypes.text,
|
||||||
|
body=member_types.MembersRootPUT, status_code=202)
|
||||||
|
def put(self, members_):
|
||||||
|
"""Updates all members."""
|
||||||
|
members = members_.members
|
||||||
|
context = pecan.request.context.get('octavia_context')
|
||||||
|
|
||||||
|
db_pool = self._get_db_pool(context.session, self.pool_id)
|
||||||
|
old_members = db_pool.members
|
||||||
|
|
||||||
|
# Check POST+PUT+DELETE since this operation is all of 'CUD'
|
||||||
|
self._auth_validate_action(context, db_pool.project_id,
|
||||||
|
constants.RBAC_POST)
|
||||||
|
self._auth_validate_action(context, db_pool.project_id,
|
||||||
|
constants.RBAC_PUT)
|
||||||
|
self._auth_validate_action(context, db_pool.project_id,
|
||||||
|
constants.RBAC_DELETE)
|
||||||
|
|
||||||
|
with db_api.get_lock_session() as lock_session:
|
||||||
|
self._test_lb_and_listener_and_pool_statuses(lock_session)
|
||||||
|
|
||||||
|
member_count_diff = len(members) - len(old_members)
|
||||||
|
if member_count_diff > 0 and self.repositories.check_quota_met(
|
||||||
|
context.session, lock_session, data_models.Member,
|
||||||
|
db_pool.project_id, count=member_count_diff):
|
||||||
|
raise exceptions.QuotaException
|
||||||
|
|
||||||
|
old_member_uniques = {
|
||||||
|
(m.ip_address, m.protocol_port): m.id for m in old_members}
|
||||||
|
new_member_uniques = [
|
||||||
|
(m.address, m.protocol_port) for m in members]
|
||||||
|
|
||||||
|
# Find members that are brand new or updated
|
||||||
|
new_members = []
|
||||||
|
updated_members = []
|
||||||
|
for m in members:
|
||||||
|
if (m.address, m.protocol_port) not in old_member_uniques:
|
||||||
|
new_members.append(m)
|
||||||
|
else:
|
||||||
|
m.id = old_member_uniques[(m.address, m.protocol_port)]
|
||||||
|
updated_members.append(m)
|
||||||
|
|
||||||
|
# Find members that are deleted
|
||||||
|
deleted_members = []
|
||||||
|
for m in old_members:
|
||||||
|
if (m.ip_address, m.protocol_port) not in new_member_uniques:
|
||||||
|
deleted_members.append(m)
|
||||||
|
|
||||||
|
# Create new members
|
||||||
|
new_members_created = []
|
||||||
|
for m in new_members:
|
||||||
|
m = m.to_dict(render_unsets=False)
|
||||||
|
m['project_id'] = db_pool.project_id
|
||||||
|
new_members_created.append(self._graph_create(lock_session, m))
|
||||||
|
# Update old members
|
||||||
|
for m in updated_members:
|
||||||
|
self.repositories.member.update(
|
||||||
|
lock_session, m.id,
|
||||||
|
provisioning_status=constants.PENDING_UPDATE)
|
||||||
|
# Delete old members
|
||||||
|
for m in deleted_members:
|
||||||
|
self.repositories.member.update(
|
||||||
|
lock_session, m.id,
|
||||||
|
provisioning_status=constants.PENDING_DELETE)
|
||||||
|
|
||||||
|
LOG.info("Sending Full Member Update to handler")
|
||||||
|
new_member_ids = [m.id for m in new_members_created]
|
||||||
|
old_member_ids = [m.id for m in deleted_members]
|
||||||
|
self.handler.batch_update(
|
||||||
|
old_member_ids, new_member_ids, updated_members)
|
||||||
|
@@ -319,19 +319,14 @@ class PoolsController(base.BaseController):
|
|||||||
which controller, if any, should control be passed.
|
which controller, if any, should control be passed.
|
||||||
"""
|
"""
|
||||||
context = pecan.request.context.get('octavia_context')
|
context = pecan.request.context.get('octavia_context')
|
||||||
if pool_id and len(remainder) and (remainder[0] == 'members' or
|
if pool_id and len(remainder) and remainder[0] == 'members':
|
||||||
remainder[0] == 'healthmonitor'):
|
|
||||||
controller = remainder[0]
|
|
||||||
remainder = remainder[1:]
|
remainder = remainder[1:]
|
||||||
db_pool = self.repositories.pool.get(context.session, id=pool_id)
|
db_pool = self.repositories.pool.get(context.session, id=pool_id)
|
||||||
if not db_pool:
|
if not db_pool:
|
||||||
LOG.info("Pool %s not found.", pool_id)
|
LOG.info("Pool %s not found.", pool_id)
|
||||||
raise exceptions.NotFound(resource=data_models.Pool._name(),
|
raise exceptions.NotFound(resource=data_models.Pool._name(),
|
||||||
id=pool_id)
|
id=pool_id)
|
||||||
if controller == 'members':
|
if remainder:
|
||||||
return member.MembersController(
|
return member.MemberController(pool_id=db_pool.id), remainder
|
||||||
pool_id=db_pool.id), remainder
|
else:
|
||||||
elif controller == 'healthmonitor':
|
return member.MembersController(pool_id=db_pool.id), remainder
|
||||||
return health_monitor.HealthMonitorController(
|
|
||||||
load_balancer_id=db_pool.load_balancer_id,
|
|
||||||
pool_id=db_pool.id), remainder
|
|
||||||
|
@@ -101,6 +101,10 @@ class MemberRootPUT(types.BaseType):
|
|||||||
member = wtypes.wsattr(MemberPUT)
|
member = wtypes.wsattr(MemberPUT)
|
||||||
|
|
||||||
|
|
||||||
|
class MembersRootPUT(types.BaseType):
|
||||||
|
members = wtypes.wsattr([MemberPOST])
|
||||||
|
|
||||||
|
|
||||||
class MemberSingleCreate(BaseMemberType):
|
class MemberSingleCreate(BaseMemberType):
|
||||||
"""Defines mandatory and optional attributes of a POST request."""
|
"""Defines mandatory and optional attributes of a POST request."""
|
||||||
name = wtypes.wsattr(wtypes.StringType(max_length=255))
|
name = wtypes.wsattr(wtypes.StringType(max_length=255))
|
||||||
@@ -112,6 +116,9 @@ class MemberSingleCreate(BaseMemberType):
|
|||||||
weight = wtypes.wsattr(wtypes.IntegerType(
|
weight = wtypes.wsattr(wtypes.IntegerType(
|
||||||
minimum=constants.MIN_WEIGHT, maximum=constants.MAX_WEIGHT), default=1)
|
minimum=constants.MIN_WEIGHT, maximum=constants.MAX_WEIGHT), default=1)
|
||||||
subnet_id = wtypes.wsattr(wtypes.UuidType())
|
subnet_id = wtypes.wsattr(wtypes.UuidType())
|
||||||
|
monitor_port = wtypes.wsattr(wtypes.IntegerType(
|
||||||
|
minimum=constants.MIN_PORT_NUMBER, maximum=constants.MAX_PORT_NUMBER))
|
||||||
|
monitor_address = wtypes.wsattr(types.IPAddressType())
|
||||||
|
|
||||||
|
|
||||||
class MemberStatusResponse(BaseMemberType):
|
class MemberStatusResponse(BaseMemberType):
|
||||||
|
@@ -278,6 +278,19 @@ CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
|
|||||||
|
|
||||||
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
|
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
|
||||||
|
|
||||||
|
# Batch Member Update constants
|
||||||
|
MEMBERS = 'members'
|
||||||
|
UNORDERED_MEMBER_UPDATES_FLOW = 'octavia-unordered-member-updates-flow'
|
||||||
|
UNORDERED_MEMBER_ACTIVE_FLOW = 'octavia-unordered-member-active-flow'
|
||||||
|
UPDATE_ATTRIBUTES_FLOW = 'octavia-update-attributes-flow'
|
||||||
|
DELETE_MODEL_OBJECT_FLOW = 'octavia-delete-model-object-flow'
|
||||||
|
BATCH_UPDATE_MEMBERS_FLOW = 'octavia-batch-update-members-flow'
|
||||||
|
MEMBER_TO_ERROR_ON_REVERT_FLOW = 'octavia-member-to-error-on-revert-flow'
|
||||||
|
DECREMENT_MEMBER_QUOTA_FLOW = 'octavia-decrement-member-quota-flow'
|
||||||
|
MARK_MEMBER_ACTIVE_INDB = 'octavia-mark-member-active-indb'
|
||||||
|
UPDATE_MEMBER_INDB = 'octavia-update-member-indb'
|
||||||
|
DELETE_MEMBER_INDB = 'octavia-delete-member-indb'
|
||||||
|
|
||||||
# Task Names
|
# Task Names
|
||||||
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
|
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
|
||||||
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
|
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
|
||||||
|
@@ -104,6 +104,17 @@ class Endpoint(object):
|
|||||||
LOG.info('Updating member \'%s\'...', member_id)
|
LOG.info('Updating member \'%s\'...', member_id)
|
||||||
self.worker.update_member(member_id, member_updates)
|
self.worker.update_member(member_id, member_updates)
|
||||||
|
|
||||||
|
def batch_update_members(self, context, old_member_ids, new_member_ids,
|
||||||
|
updated_members):
|
||||||
|
updated_member_ids = [m.get('id') for m in updated_members]
|
||||||
|
LOG.info(
|
||||||
|
'Batch updating members: old=\'%(old)s\', new=\'%(new)s\', '
|
||||||
|
'updated=\'%(updated)s\'...',
|
||||||
|
{'old': old_member_ids, 'new': new_member_ids,
|
||||||
|
'updated': updated_member_ids})
|
||||||
|
self.worker.batch_update_members(
|
||||||
|
old_member_ids, new_member_ids, updated_members)
|
||||||
|
|
||||||
def delete_member(self, context, member_id):
|
def delete_member(self, context, member_id):
|
||||||
LOG.info('Deleting member \'%s\'...', member_id)
|
LOG.info('Deleting member \'%s\'...', member_id)
|
||||||
self.worker.delete_member(member_id)
|
self.worker.delete_member(member_id)
|
||||||
|
@@ -376,6 +376,34 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
log=LOG):
|
log=LOG):
|
||||||
delete_member_tf.run()
|
delete_member_tf.run()
|
||||||
|
|
||||||
|
def batch_update_members(self, old_member_ids, new_member_ids,
|
||||||
|
updated_members):
|
||||||
|
old_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||||
|
for mid in old_member_ids]
|
||||||
|
new_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||||
|
for mid in new_member_ids]
|
||||||
|
updated_members = [
|
||||||
|
(self._member_repo.get(db_apis.get_session(), id=m.get('id')), m)
|
||||||
|
for m in updated_members]
|
||||||
|
if old_members:
|
||||||
|
pool = old_members[0].pool
|
||||||
|
elif new_members:
|
||||||
|
pool = new_members[0].pool
|
||||||
|
else:
|
||||||
|
pool = updated_members[0][0].pool
|
||||||
|
listeners = pool.listeners
|
||||||
|
load_balancer = pool.load_balancer
|
||||||
|
|
||||||
|
batch_update_members_tf = self._taskflow_load(
|
||||||
|
self._member_flows.get_batch_update_members_flow(
|
||||||
|
old_members, new_members, updated_members),
|
||||||
|
store={constants.LISTENERS: listeners,
|
||||||
|
constants.LOADBALANCER: load_balancer,
|
||||||
|
constants.POOL: pool})
|
||||||
|
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
||||||
|
log=LOG):
|
||||||
|
batch_update_members_tf.run()
|
||||||
|
|
||||||
def update_member(self, member_id, member_updates):
|
def update_member(self, member_id, member_updates):
|
||||||
"""Updates a pool member.
|
"""Updates a pool member.
|
||||||
|
|
||||||
|
@@ -14,6 +14,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
from taskflow.patterns import linear_flow
|
from taskflow.patterns import linear_flow
|
||||||
|
from taskflow.patterns import unordered_flow
|
||||||
|
|
||||||
from octavia.common import constants
|
from octavia.common import constants
|
||||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||||
@@ -113,11 +114,108 @@ class MemberFlows(object):
|
|||||||
requires=[constants.MEMBER, constants.UPDATE_DICT]))
|
requires=[constants.MEMBER, constants.UPDATE_DICT]))
|
||||||
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
|
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
|
||||||
requires=constants.MEMBER))
|
requires=constants.MEMBER))
|
||||||
|
update_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||||
|
requires=constants.POOL))
|
||||||
update_member_flow.add(database_tasks.
|
update_member_flow.add(database_tasks.
|
||||||
MarkLBAndListenersActiveInDB(
|
MarkLBAndListenersActiveInDB(
|
||||||
requires=[constants.LOADBALANCER,
|
requires=[constants.LOADBALANCER,
|
||||||
constants.LISTENERS]))
|
constants.LISTENERS]))
|
||||||
update_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
|
||||||
requires=constants.POOL))
|
|
||||||
|
|
||||||
return update_member_flow
|
return update_member_flow
|
||||||
|
|
||||||
|
def get_batch_update_members_flow(self, old_members, new_members,
|
||||||
|
updated_members):
|
||||||
|
"""Create a flow to batch update members
|
||||||
|
|
||||||
|
:returns: The flow for batch updating members
|
||||||
|
"""
|
||||||
|
batch_update_members_flow = linear_flow.Flow(
|
||||||
|
constants.BATCH_UPDATE_MEMBERS_FLOW)
|
||||||
|
unordered_members_flow = unordered_flow.Flow(
|
||||||
|
constants.UNORDERED_MEMBER_UPDATES_FLOW)
|
||||||
|
unordered_members_active_flow = unordered_flow.Flow(
|
||||||
|
constants.UNORDERED_MEMBER_ACTIVE_FLOW)
|
||||||
|
|
||||||
|
# Delete old members
|
||||||
|
unordered_members_flow.add(
|
||||||
|
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||||
|
inject={constants.MEMBERS: old_members},
|
||||||
|
name='{flow}-deleted'.format(
|
||||||
|
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||||
|
for m in old_members:
|
||||||
|
unordered_members_flow.add(
|
||||||
|
model_tasks.DeleteModelObject(
|
||||||
|
inject={constants.OBJECT: m},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.DELETE_MODEL_OBJECT_FLOW)))
|
||||||
|
unordered_members_flow.add(database_tasks.DeleteMemberInDB(
|
||||||
|
inject={constants.MEMBER: m},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.DELETE_MEMBER_INDB)))
|
||||||
|
unordered_members_flow.add(database_tasks.DecrementMemberQuota(
|
||||||
|
inject={constants.MEMBER: m},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.DECREMENT_MEMBER_QUOTA_FLOW)))
|
||||||
|
|
||||||
|
# Create new members
|
||||||
|
unordered_members_flow.add(
|
||||||
|
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||||
|
inject={constants.MEMBERS: new_members},
|
||||||
|
name='{flow}-created'.format(
|
||||||
|
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||||
|
for m in new_members:
|
||||||
|
unordered_members_active_flow.add(
|
||||||
|
database_tasks.MarkMemberActiveInDB(
|
||||||
|
inject={constants.MEMBER: m},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||||
|
|
||||||
|
# Update existing members
|
||||||
|
unordered_members_flow.add(
|
||||||
|
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||||
|
inject={constants.MEMBERS: updated_members},
|
||||||
|
name='{flow}-updated'.format(
|
||||||
|
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||||
|
for m, um in updated_members:
|
||||||
|
um.pop('id', None)
|
||||||
|
unordered_members_flow.add(
|
||||||
|
model_tasks.UpdateAttributes(
|
||||||
|
inject={constants.OBJECT: m, constants.UPDATE_DICT: um},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.UPDATE_ATTRIBUTES_FLOW)))
|
||||||
|
unordered_members_flow.add(database_tasks.UpdateMemberInDB(
|
||||||
|
inject={constants.MEMBER: m, constants.UPDATE_DICT: um},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.UPDATE_MEMBER_INDB)))
|
||||||
|
unordered_members_active_flow.add(
|
||||||
|
database_tasks.MarkMemberActiveInDB(
|
||||||
|
inject={constants.MEMBER: m},
|
||||||
|
name='{flow}-{id}'.format(
|
||||||
|
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||||
|
|
||||||
|
batch_update_members_flow.add(unordered_members_flow)
|
||||||
|
|
||||||
|
# Done, do real updates
|
||||||
|
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
||||||
|
requires=constants.LOADBALANCER,
|
||||||
|
provides=constants.DELTAS))
|
||||||
|
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
||||||
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||||
|
batch_update_members_flow.add(
|
||||||
|
amphora_driver_tasks.AmphoraePostNetworkPlug(
|
||||||
|
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
|
||||||
|
|
||||||
|
# Update the Listener (this makes the changes active on the Amp)
|
||||||
|
batch_update_members_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||||
|
requires=(constants.LOADBALANCER, constants.LISTENERS)))
|
||||||
|
|
||||||
|
# Mark all the members ACTIVE here, then pool then LB/Listeners
|
||||||
|
batch_update_members_flow.add(unordered_members_active_flow)
|
||||||
|
batch_update_members_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||||
|
requires=constants.POOL))
|
||||||
|
batch_update_members_flow.add(
|
||||||
|
database_tasks.MarkLBAndListenersActiveInDB(
|
||||||
|
requires=(constants.LOADBALANCER,
|
||||||
|
constants.LISTENERS)))
|
||||||
|
|
||||||
|
return batch_update_members_flow
|
||||||
|
@@ -137,10 +137,25 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
|
|
||||||
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
|
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
|
||||||
self.task_utils.mark_member_prov_status_error(member.id)
|
self.task_utils.mark_member_prov_status_error(member.id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
self.task_utils.mark_pool_prov_status_active(pool.id)
|
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
|
class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
|
"""Task to set members to ERROR on revert."""
|
||||||
|
|
||||||
|
def execute(self, members, listeners, loadbalancer, pool):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def revert(self, members, listeners, loadbalancer, pool, *args, **kwargs):
|
||||||
|
for m in members:
|
||||||
|
self.task_utils.mark_member_prov_status_error(m.id)
|
||||||
|
for listener in listeners:
|
||||||
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
|
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
class PoolToErrorOnRevertTask(BaseLifecycleTask):
|
class PoolToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
|
@@ -427,6 +427,160 @@ class TestMember(base.BaseAPITest):
|
|||||||
member_prov_status=constants.ERROR,
|
member_prov_status=constants.ERROR,
|
||||||
member_op_status=constants.NO_MONITOR)
|
member_op_status=constants.NO_MONITOR)
|
||||||
|
|
||||||
|
def test_full_batch_members(self):
|
||||||
|
member1 = {'address': '10.0.0.1', 'protocol_port': 80}
|
||||||
|
member2 = {'address': '10.0.0.2', 'protocol_port': 80}
|
||||||
|
member3 = {'address': '10.0.0.3', 'protocol_port': 80}
|
||||||
|
member4 = {'address': '10.0.0.4', 'protocol_port': 80}
|
||||||
|
member5 = {'address': '10.0.0.5', 'protocol_port': 80}
|
||||||
|
member6 = {'address': '10.0.0.6', 'protocol_port': 80}
|
||||||
|
members = [member1, member2, member3, member4]
|
||||||
|
for m in members:
|
||||||
|
self.create_member(pool_id=self.pool_id, **m)
|
||||||
|
self.set_lb_status(self.lb_id)
|
||||||
|
|
||||||
|
req_dict = [member1, member2, member5, member6]
|
||||||
|
body = {self.root_tag_list: req_dict}
|
||||||
|
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
self.put(path, body, status=202)
|
||||||
|
returned_members = self.get(
|
||||||
|
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
).json.get(self.root_tag_list)
|
||||||
|
|
||||||
|
expected_members = [
|
||||||
|
('10.0.0.1', 80, 'PENDING_UPDATE'),
|
||||||
|
('10.0.0.2', 80, 'PENDING_UPDATE'),
|
||||||
|
('10.0.0.3', 80, 'PENDING_DELETE'),
|
||||||
|
('10.0.0.4', 80, 'PENDING_DELETE'),
|
||||||
|
('10.0.0.5', 80, 'PENDING_CREATE'),
|
||||||
|
('10.0.0.6', 80, 'PENDING_CREATE'),
|
||||||
|
]
|
||||||
|
|
||||||
|
member_ids = {}
|
||||||
|
for rm in returned_members:
|
||||||
|
self.assertIn(
|
||||||
|
(rm['address'],
|
||||||
|
rm['protocol_port'],
|
||||||
|
rm['provisioning_status']), expected_members)
|
||||||
|
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||||
|
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||||
|
self.assertEqual(
|
||||||
|
[member_ids[('10.0.0.3', 80)], member_ids[('10.0.0.4', 80)]],
|
||||||
|
handler_args[0])
|
||||||
|
self.assertEqual(
|
||||||
|
[member_ids[('10.0.0.5', 80)], member_ids[('10.0.0.6', 80)]],
|
||||||
|
handler_args[1])
|
||||||
|
self.assertEqual(2, len(handler_args[2]))
|
||||||
|
updated_members = [
|
||||||
|
(handler_args[2][0].address, handler_args[2][0].protocol_port),
|
||||||
|
(handler_args[2][1].address, handler_args[2][1].protocol_port)
|
||||||
|
]
|
||||||
|
self.assertEqual([('10.0.0.1', 80), ('10.0.0.2', 80)], updated_members)
|
||||||
|
|
||||||
|
def test_create_batch_members(self):
|
||||||
|
member5 = {'address': '10.0.0.5', 'protocol_port': 80}
|
||||||
|
member6 = {'address': '10.0.0.6', 'protocol_port': 80}
|
||||||
|
|
||||||
|
req_dict = [member5, member6]
|
||||||
|
body = {self.root_tag_list: req_dict}
|
||||||
|
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
self.put(path, body, status=202)
|
||||||
|
returned_members = self.get(
|
||||||
|
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
).json.get(self.root_tag_list)
|
||||||
|
|
||||||
|
expected_members = [
|
||||||
|
('10.0.0.5', 80, 'PENDING_CREATE'),
|
||||||
|
('10.0.0.6', 80, 'PENDING_CREATE'),
|
||||||
|
]
|
||||||
|
|
||||||
|
member_ids = {}
|
||||||
|
for rm in returned_members:
|
||||||
|
self.assertIn(
|
||||||
|
(rm['address'],
|
||||||
|
rm['protocol_port'],
|
||||||
|
rm['provisioning_status']), expected_members)
|
||||||
|
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||||
|
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||||
|
self.assertEqual(0, len(handler_args[0]))
|
||||||
|
self.assertEqual(
|
||||||
|
[member_ids[('10.0.0.5', 80)], member_ids[('10.0.0.6', 80)]],
|
||||||
|
handler_args[1])
|
||||||
|
self.assertEqual(0, len(handler_args[2]))
|
||||||
|
|
||||||
|
def test_update_batch_members(self):
|
||||||
|
member1 = {'address': '10.0.0.1', 'protocol_port': 80}
|
||||||
|
member2 = {'address': '10.0.0.2', 'protocol_port': 80}
|
||||||
|
members = [member1, member2]
|
||||||
|
for m in members:
|
||||||
|
self.create_member(pool_id=self.pool_id, **m)
|
||||||
|
self.set_lb_status(self.lb_id)
|
||||||
|
|
||||||
|
req_dict = [member1, member2]
|
||||||
|
body = {self.root_tag_list: req_dict}
|
||||||
|
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
self.put(path, body, status=202)
|
||||||
|
returned_members = self.get(
|
||||||
|
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
).json.get(self.root_tag_list)
|
||||||
|
|
||||||
|
expected_members = [
|
||||||
|
('10.0.0.1', 80, 'PENDING_UPDATE'),
|
||||||
|
('10.0.0.2', 80, 'PENDING_UPDATE'),
|
||||||
|
]
|
||||||
|
|
||||||
|
member_ids = {}
|
||||||
|
for rm in returned_members:
|
||||||
|
self.assertIn(
|
||||||
|
(rm['address'],
|
||||||
|
rm['protocol_port'],
|
||||||
|
rm['provisioning_status']), expected_members)
|
||||||
|
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||||
|
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||||
|
self.assertEqual(0, len(handler_args[0]))
|
||||||
|
self.assertEqual(0, len(handler_args[1]))
|
||||||
|
self.assertEqual(2, len(handler_args[2]))
|
||||||
|
updated_members = [
|
||||||
|
(handler_args[2][0].address, handler_args[2][0].protocol_port),
|
||||||
|
(handler_args[2][1].address, handler_args[2][1].protocol_port)
|
||||||
|
]
|
||||||
|
self.assertEqual([('10.0.0.1', 80), ('10.0.0.2', 80)], updated_members)
|
||||||
|
|
||||||
|
def test_delete_batch_members(self):
|
||||||
|
member3 = {'address': '10.0.0.3', 'protocol_port': 80}
|
||||||
|
member4 = {'address': '10.0.0.4', 'protocol_port': 80}
|
||||||
|
members = [member3, member4]
|
||||||
|
for m in members:
|
||||||
|
self.create_member(pool_id=self.pool_id, **m)
|
||||||
|
self.set_lb_status(self.lb_id)
|
||||||
|
|
||||||
|
req_dict = []
|
||||||
|
body = {self.root_tag_list: req_dict}
|
||||||
|
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
self.put(path, body, status=202)
|
||||||
|
returned_members = self.get(
|
||||||
|
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||||
|
).json.get(self.root_tag_list)
|
||||||
|
|
||||||
|
expected_members = [
|
||||||
|
('10.0.0.3', 80, 'PENDING_DELETE'),
|
||||||
|
('10.0.0.4', 80, 'PENDING_DELETE'),
|
||||||
|
]
|
||||||
|
|
||||||
|
member_ids = {}
|
||||||
|
for rm in returned_members:
|
||||||
|
self.assertIn(
|
||||||
|
(rm['address'],
|
||||||
|
rm['protocol_port'],
|
||||||
|
rm['provisioning_status']), expected_members)
|
||||||
|
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||||
|
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||||
|
self.assertEqual(
|
||||||
|
[member_ids[('10.0.0.3', 80)], member_ids[('10.0.0.4', 80)]],
|
||||||
|
handler_args[0])
|
||||||
|
self.assertEqual(0, len(handler_args[1]))
|
||||||
|
self.assertEqual(0, len(handler_args[2]))
|
||||||
|
|
||||||
def test_create_with_attached_listener(self):
|
def test_create_with_attached_listener(self):
|
||||||
api_member = self.create_member(
|
api_member = self.create_member(
|
||||||
self.pool_with_listener_id, '10.0.0.1', 80).get(self.root_tag)
|
self.pool_with_listener_id, '10.0.0.1', 80).get(self.root_tag)
|
||||||
|
@@ -186,6 +186,18 @@ class TestProducer(base.TestCase):
|
|||||||
self.mck_client.cast.assert_called_once_with(
|
self.mck_client.cast.assert_called_once_with(
|
||||||
{}, 'update_member', **kw)
|
{}, 'update_member', **kw)
|
||||||
|
|
||||||
|
def test_batch_update_members(self):
|
||||||
|
p = producer.MemberProducer()
|
||||||
|
member_model = data_models.Member(id=10)
|
||||||
|
p.batch_update(old_ids=[9],
|
||||||
|
new_ids=[11],
|
||||||
|
updated_models=[member_model])
|
||||||
|
kw = {'old_member_ids': [9],
|
||||||
|
'new_member_ids': [11],
|
||||||
|
'updated_members': [member_model.to_dict()]}
|
||||||
|
self.mck_client.cast.assert_called_once_with(
|
||||||
|
{}, 'batch_update_members', **kw)
|
||||||
|
|
||||||
def test_create_l7policy(self):
|
def test_create_l7policy(self):
|
||||||
p = producer.L7PolicyProducer()
|
p = producer.L7PolicyProducer()
|
||||||
p.create(self.mck_model)
|
p.create(self.mck_model)
|
||||||
|
@@ -120,6 +120,12 @@ class TestEndpoint(base.TestCase):
|
|||||||
self.ep.worker.update_member.assert_called_once_with(
|
self.ep.worker.update_member.assert_called_once_with(
|
||||||
self.resource_id, self.resource_updates)
|
self.resource_id, self.resource_updates)
|
||||||
|
|
||||||
|
def test_batch_update_members(self):
|
||||||
|
self.ep.batch_update_members(
|
||||||
|
self.context, [9], [11], [self.resource_updates])
|
||||||
|
self.ep.worker.batch_update_members.assert_called_once_with(
|
||||||
|
[9], [11], [self.resource_updates])
|
||||||
|
|
||||||
def test_delete_member(self):
|
def test_delete_member(self):
|
||||||
self.ep.delete_member(self.context, self.resource_id)
|
self.ep.delete_member(self.context, self.resource_id)
|
||||||
self.ep.worker.delete_member.assert_called_once_with(
|
self.ep.worker.delete_member.assert_called_once_with(
|
||||||
|
@@ -72,3 +72,17 @@ class TestMemberFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertEqual(5, len(member_flow.requires))
|
self.assertEqual(5, len(member_flow.requires))
|
||||||
self.assertEqual(0, len(member_flow.provides))
|
self.assertEqual(0, len(member_flow.provides))
|
||||||
|
|
||||||
|
def test_get_batch_update_members_flow(self, mock_get_net_driver):
|
||||||
|
|
||||||
|
member_flow = self.MemberFlow.get_batch_update_members_flow(
|
||||||
|
[], [], [])
|
||||||
|
|
||||||
|
self.assertIsInstance(member_flow, flow.Flow)
|
||||||
|
|
||||||
|
self.assertIn(constants.LISTENERS, member_flow.requires)
|
||||||
|
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||||
|
self.assertIn(constants.POOL, member_flow.requires)
|
||||||
|
|
||||||
|
self.assertEqual(3, len(member_flow.requires))
|
||||||
|
self.assertEqual(2, len(member_flow.provides))
|
||||||
|
@@ -46,6 +46,7 @@ class TestLifecycleTasks(base.TestCase):
|
|||||||
self.MEMBER = mock.MagicMock()
|
self.MEMBER = mock.MagicMock()
|
||||||
self.MEMBER_ID = uuidutils.generate_uuid()
|
self.MEMBER_ID = uuidutils.generate_uuid()
|
||||||
self.MEMBER.id = self.MEMBER_ID
|
self.MEMBER.id = self.MEMBER_ID
|
||||||
|
self.MEMBERS = [self.MEMBER]
|
||||||
self.POOL = mock.MagicMock()
|
self.POOL = mock.MagicMock()
|
||||||
self.POOL_ID = uuidutils.generate_uuid()
|
self.POOL_ID = uuidutils.generate_uuid()
|
||||||
self.POOL.id = self.POOL_ID
|
self.POOL.id = self.POOL_ID
|
||||||
@@ -293,7 +294,6 @@ class TestLifecycleTasks(base.TestCase):
|
|||||||
mock_listener_prov_status_active,
|
mock_listener_prov_status_active,
|
||||||
mock_loadbalancer_prov_status_active,
|
mock_loadbalancer_prov_status_active,
|
||||||
mock_member_prov_status_error):
|
mock_member_prov_status_error):
|
||||||
|
|
||||||
member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask()
|
member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask()
|
||||||
|
|
||||||
# Execute
|
# Execute
|
||||||
@@ -319,6 +319,46 @@ class TestLifecycleTasks(base.TestCase):
|
|||||||
mock_pool_prov_status_active.assert_called_once_with(
|
mock_pool_prov_status_active.assert_called_once_with(
|
||||||
self.POOL_ID)
|
self.POOL_ID)
|
||||||
|
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
|
'mark_member_prov_status_error')
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
|
'mark_loadbalancer_prov_status_active')
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
|
'mark_listener_prov_status_active')
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
|
'mark_pool_prov_status_active')
|
||||||
|
def test_MembersToErrorOnRevertTask(
|
||||||
|
self,
|
||||||
|
mock_pool_prov_status_active,
|
||||||
|
mock_listener_prov_status_active,
|
||||||
|
mock_loadbalancer_prov_status_active,
|
||||||
|
mock_member_prov_status_error):
|
||||||
|
members_to_error_on_revert = (
|
||||||
|
lifecycle_tasks.MembersToErrorOnRevertTask())
|
||||||
|
|
||||||
|
# Execute
|
||||||
|
members_to_error_on_revert.execute(self.MEMBERS,
|
||||||
|
self.LISTENERS,
|
||||||
|
self.LOADBALANCER,
|
||||||
|
self.POOL)
|
||||||
|
|
||||||
|
self.assertFalse(mock_member_prov_status_error.called)
|
||||||
|
|
||||||
|
# Revert
|
||||||
|
members_to_error_on_revert.revert(self.MEMBERS,
|
||||||
|
self.LISTENERS,
|
||||||
|
self.LOADBALANCER,
|
||||||
|
self.POOL)
|
||||||
|
|
||||||
|
mock_member_prov_status_error.assert_called_once_with(
|
||||||
|
self.MEMBER_ID)
|
||||||
|
mock_loadbalancer_prov_status_active.assert_called_once_with(
|
||||||
|
self.LOADBALANCER_ID)
|
||||||
|
mock_listener_prov_status_active.assert_called_once_with(
|
||||||
|
self.LISTENER_ID)
|
||||||
|
mock_pool_prov_status_active.assert_called_once_with(
|
||||||
|
self.POOL_ID)
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
'mark_pool_prov_status_error')
|
'mark_pool_prov_status_error')
|
||||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
|
@@ -773,6 +773,38 @@ class TestControllerWorker(base.TestCase):
|
|||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
|
@mock.patch('octavia.controller.worker.flows.'
|
||||||
|
'member_flows.MemberFlows.get_batch_update_members_flow',
|
||||||
|
return_value=_flow_mock)
|
||||||
|
def test_batch_update_members(self,
|
||||||
|
mock_get_batch_update_members_flow,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
||||||
|
|
||||||
|
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||||
|
assert_called_once_with(_flow_mock,
|
||||||
|
store={
|
||||||
|
constants.LISTENERS: [_listener_mock],
|
||||||
|
constants.LOADBALANCER:
|
||||||
|
_load_balancer_mock,
|
||||||
|
constants.POOL: _pool_mock}))
|
||||||
|
|
||||||
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.flows.'
|
@mock.patch('octavia.controller.worker.flows.'
|
||||||
'pool_flows.PoolFlows.get_create_pool_flow',
|
'pool_flows.PoolFlows.get_create_pool_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
It is now possible to completely update a pool's member list as a batch
|
||||||
|
operation. Using a PUT request on the base member endpoint of a pool, you
|
||||||
|
can specify a list of member objects and the service will perform any
|
||||||
|
necessary creates/deletes/updates as a single operation.
|
@@ -71,6 +71,10 @@ def generate(flow_list, output_directory):
|
|||||||
lb = dmh.generate_load_balancer()
|
lb = dmh.generate_load_balancer()
|
||||||
delete_flow, store = get_flow_method(lb)
|
delete_flow, store = get_flow_method(lb)
|
||||||
current_engine = engines.load(delete_flow)
|
current_engine = engines.load(delete_flow)
|
||||||
|
elif (current_tuple[1] == 'MemberFlows' and
|
||||||
|
current_tuple[2] == 'get_batch_update_members_flow'):
|
||||||
|
current_engine = engines.load(
|
||||||
|
get_flow_method([], [], []))
|
||||||
else:
|
else:
|
||||||
current_engine = engines.load(get_flow_method())
|
current_engine = engines.load(get_flow_method())
|
||||||
current_engine.compile()
|
current_engine.compile()
|
||||||
|
@@ -19,6 +19,7 @@ octavia.controller.worker.flows.pool_flows PoolFlows get_update_pool_flow
|
|||||||
octavia.controller.worker.flows.member_flows MemberFlows get_create_member_flow
|
octavia.controller.worker.flows.member_flows MemberFlows get_create_member_flow
|
||||||
octavia.controller.worker.flows.member_flows MemberFlows get_delete_member_flow
|
octavia.controller.worker.flows.member_flows MemberFlows get_delete_member_flow
|
||||||
octavia.controller.worker.flows.member_flows MemberFlows get_update_member_flow
|
octavia.controller.worker.flows.member_flows MemberFlows get_update_member_flow
|
||||||
|
octavia.controller.worker.flows.member_flows MemberFlows get_batch_update_members_flow
|
||||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_create_health_monitor_flow
|
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_create_health_monitor_flow
|
||||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_delete_health_monitor_flow
|
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_delete_health_monitor_flow
|
||||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_update_health_monitor_flow
|
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_update_health_monitor_flow
|
||||||
|
Reference in New Issue
Block a user