Merge "Network Delta calculations should respect AZs"
This commit is contained in:
@@ -431,14 +431,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
listeners = pool.listeners
|
listeners = pool.listeners
|
||||||
load_balancer = pool.load_balancer
|
load_balancer = pool.load_balancer
|
||||||
|
|
||||||
create_member_tf = self._taskflow_load(self._member_flows.
|
store = {
|
||||||
get_create_member_flow(),
|
constants.MEMBER: member,
|
||||||
store={constants.MEMBER: member,
|
constants.LISTENERS: listeners,
|
||||||
constants.LISTENERS:
|
constants.LOADBALANCER: load_balancer,
|
||||||
listeners,
|
constants.POOL: pool}
|
||||||
constants.LOADBALANCER:
|
if load_balancer.availability_zone:
|
||||||
load_balancer,
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
constants.POOL: pool})
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
|
create_member_tf = self._taskflow_load(
|
||||||
|
self._member_flows.get_create_member_flow(),
|
||||||
|
store=store)
|
||||||
with tf_logging.DynamicLoggingListener(create_member_tf,
|
with tf_logging.DynamicLoggingListener(create_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
create_member_tf.run()
|
create_member_tf.run()
|
||||||
@@ -456,10 +463,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
listeners = pool.listeners
|
listeners = pool.listeners
|
||||||
load_balancer = pool.load_balancer
|
load_balancer = pool.load_balancer
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.MEMBER: member,
|
||||||
|
constants.LISTENERS: listeners,
|
||||||
|
constants.LOADBALANCER: load_balancer,
|
||||||
|
constants.POOL: pool}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
delete_member_tf = self._taskflow_load(
|
delete_member_tf = self._taskflow_load(
|
||||||
self._member_flows.get_delete_member_flow(),
|
self._member_flows.get_delete_member_flow(),
|
||||||
store={constants.MEMBER: member, constants.LISTENERS: listeners,
|
store=store
|
||||||
constants.LOADBALANCER: load_balancer, constants.POOL: pool}
|
|
||||||
)
|
)
|
||||||
with tf_logging.DynamicLoggingListener(delete_member_tf,
|
with tf_logging.DynamicLoggingListener(delete_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
@@ -483,12 +501,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
listeners = pool.listeners
|
listeners = pool.listeners
|
||||||
load_balancer = pool.load_balancer
|
load_balancer = pool.load_balancer
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.LISTENERS: listeners,
|
||||||
|
constants.LOADBALANCER: load_balancer,
|
||||||
|
constants.POOL: pool}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
batch_update_members_tf = self._taskflow_load(
|
batch_update_members_tf = self._taskflow_load(
|
||||||
self._member_flows.get_batch_update_members_flow(
|
self._member_flows.get_batch_update_members_flow(
|
||||||
old_members, new_members, updated_members),
|
old_members, new_members, updated_members),
|
||||||
store={constants.LISTENERS: listeners,
|
store=store)
|
||||||
constants.LOADBALANCER: load_balancer,
|
|
||||||
constants.POOL: pool})
|
|
||||||
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
batch_update_members_tf.run()
|
batch_update_members_tf.run()
|
||||||
@@ -501,7 +528,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises MemberNotFound: The referenced member was not found
|
:raises MemberNotFound: The referenced member was not found
|
||||||
"""
|
"""
|
||||||
member = None
|
|
||||||
try:
|
try:
|
||||||
member = self._get_db_obj_until_pending_update(
|
member = self._get_db_obj_until_pending_update(
|
||||||
self._member_repo, member_id)
|
self._member_repo, member_id)
|
||||||
@@ -517,17 +543,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
listeners = pool.listeners
|
listeners = pool.listeners
|
||||||
load_balancer = pool.load_balancer
|
load_balancer = pool.load_balancer
|
||||||
|
|
||||||
update_member_tf = self._taskflow_load(self._member_flows.
|
store = {
|
||||||
get_update_member_flow(),
|
constants.MEMBER: member,
|
||||||
store={constants.MEMBER: member,
|
constants.LISTENERS: listeners,
|
||||||
constants.LISTENERS:
|
constants.LOADBALANCER: load_balancer,
|
||||||
listeners,
|
constants.POOL: pool,
|
||||||
constants.LOADBALANCER:
|
constants.UPDATE_DICT: member_updates}
|
||||||
load_balancer,
|
if load_balancer.availability_zone:
|
||||||
constants.POOL:
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
pool,
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
constants.UPDATE_DICT:
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
member_updates})
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
|
update_member_tf = self._taskflow_load(
|
||||||
|
self._member_flows.get_update_member_flow(),
|
||||||
|
store=store)
|
||||||
with tf_logging.DynamicLoggingListener(update_member_tf,
|
with tf_logging.DynamicLoggingListener(update_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
update_member_tf.run()
|
update_member_tf.run()
|
||||||
|
@@ -491,7 +491,8 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
# Plug the member networks into the new amphora
|
# Plug the member networks into the new amphora
|
||||||
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
|
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORA),
|
requires=(constants.LOADBALANCER, constants.AMPHORA,
|
||||||
|
constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTA))
|
provides=constants.DELTA))
|
||||||
|
|
||||||
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
|
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
|
||||||
|
@@ -145,7 +145,8 @@ class LoadBalancerFlows(object):
|
|||||||
)
|
)
|
||||||
flows.append(
|
flows.append(
|
||||||
network_tasks.CalculateDelta(
|
network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER, provides=constants.DELTAS
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
|
provides=constants.DELTAS
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
flows.append(
|
flows.append(
|
||||||
|
@@ -40,7 +40,7 @@ class MemberFlows(object):
|
|||||||
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
|
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
|
||||||
requires=constants.MEMBER))
|
requires=constants.MEMBER))
|
||||||
create_member_flow.add(network_tasks.CalculateDelta(
|
create_member_flow.add(network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER,
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTAS))
|
provides=constants.DELTAS))
|
||||||
create_member_flow.add(network_tasks.HandleNetworkDeltas(
|
create_member_flow.add(network_tasks.HandleNetworkDeltas(
|
||||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||||
@@ -185,7 +185,7 @@ class MemberFlows(object):
|
|||||||
|
|
||||||
# Done, do real updates
|
# Done, do real updates
|
||||||
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER,
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTAS))
|
provides=constants.DELTAS))
|
||||||
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
||||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||||
|
@@ -47,14 +47,19 @@ class CalculateAmphoraDelta(BaseNetworkTask):
|
|||||||
|
|
||||||
default_provides = constants.DELTA
|
default_provides = constants.DELTA
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora):
|
def execute(self, loadbalancer, amphora, availability_zone):
|
||||||
LOG.debug("Calculating network delta for amphora id: %s", amphora.id)
|
LOG.debug("Calculating network delta for amphora id: %s", amphora.id)
|
||||||
|
|
||||||
# Figure out what networks we want
|
# Figure out what networks we want
|
||||||
# seed with lb network(s)
|
# seed with lb network(s)
|
||||||
vrrp_port = self.network_driver.get_port(amphora.vrrp_port_id)
|
vrrp_port = self.network_driver.get_port(amphora.vrrp_port_id)
|
||||||
desired_network_ids = {vrrp_port.network_id}.union(
|
if availability_zone:
|
||||||
CONF.controller_worker.amp_boot_network_list)
|
management_nets = (
|
||||||
|
[availability_zone.get(constants.MANAGEMENT_NETWORK)] or
|
||||||
|
CONF.controller_worker.amp_boot_network_list)
|
||||||
|
else:
|
||||||
|
management_nets = CONF.controller_worker.amp_boot_network_list
|
||||||
|
desired_network_ids = {vrrp_port.network_id}.union(management_nets)
|
||||||
|
|
||||||
for pool in loadbalancer.pools:
|
for pool in loadbalancer.pools:
|
||||||
member_networks = [
|
member_networks = [
|
||||||
@@ -91,13 +96,15 @@ class CalculateDelta(BaseNetworkTask):
|
|||||||
|
|
||||||
default_provides = constants.DELTAS
|
default_provides = constants.DELTAS
|
||||||
|
|
||||||
def execute(self, loadbalancer):
|
def execute(self, loadbalancer, availability_zone):
|
||||||
"""Compute which NICs need to be plugged
|
"""Compute which NICs need to be plugged
|
||||||
|
|
||||||
for the amphora to become operational.
|
for the amphora to become operational.
|
||||||
|
|
||||||
:param loadbalancer: the loadbalancer to calculate deltas for all
|
:param loadbalancer: the loadbalancer to calculate deltas for all
|
||||||
amphorae
|
amphorae
|
||||||
|
:param availability_zone: availability zone metadata dict
|
||||||
|
|
||||||
:returns: dict of octavia.network.data_models.Delta keyed off amphora
|
:returns: dict of octavia.network.data_models.Delta keyed off amphora
|
||||||
id
|
id
|
||||||
"""
|
"""
|
||||||
@@ -108,7 +115,8 @@ class CalculateDelta(BaseNetworkTask):
|
|||||||
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
||||||
loadbalancer.amphorae):
|
loadbalancer.amphorae):
|
||||||
|
|
||||||
delta = calculate_amp.execute(loadbalancer, amphora)
|
delta = calculate_amp.execute(loadbalancer, amphora,
|
||||||
|
availability_zone)
|
||||||
deltas[amphora.id] = delta
|
deltas[amphora.id] = delta
|
||||||
return deltas
|
return deltas
|
||||||
|
|
||||||
|
@@ -439,13 +439,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
||||||
pool.listeners))
|
pool.listeners))
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.MEMBER: member,
|
||||||
|
constants.LISTENERS: listeners_dicts,
|
||||||
|
constants.LOADBALANCER_ID: load_balancer.id,
|
||||||
|
constants.LOADBALANCER: provider_lb,
|
||||||
|
constants.POOL_ID: pool.id}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
create_member_tf = self._taskflow_load(
|
create_member_tf = self._taskflow_load(
|
||||||
self._member_flows.get_create_member_flow(),
|
self._member_flows.get_create_member_flow(),
|
||||||
store={constants.MEMBER: member,
|
store=store)
|
||||||
constants.LISTENERS: listeners_dicts,
|
|
||||||
constants.LOADBALANCER_ID: load_balancer.id,
|
|
||||||
constants.LOADBALANCER: provider_lb,
|
|
||||||
constants.POOL_ID: pool.id})
|
|
||||||
with tf_logging.DynamicLoggingListener(create_member_tf,
|
with tf_logging.DynamicLoggingListener(create_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
create_member_tf.run()
|
create_member_tf.run()
|
||||||
@@ -468,16 +477,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
||||||
pool.listeners))
|
pool.listeners))
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.MEMBER: member,
|
||||||
|
constants.LISTENERS: listeners_dicts,
|
||||||
|
constants.LOADBALANCER_ID: load_balancer.id,
|
||||||
|
constants.LOADBALANCER: provider_lb,
|
||||||
|
constants.POOL_ID: pool.id,
|
||||||
|
constants.PROJECT_ID: load_balancer.project_id}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
delete_member_tf = self._taskflow_load(
|
delete_member_tf = self._taskflow_load(
|
||||||
self._member_flows.get_delete_member_flow(),
|
self._member_flows.get_delete_member_flow(),
|
||||||
store={constants.MEMBER: member,
|
store=store
|
||||||
constants.LISTENERS: listeners_dicts,
|
|
||||||
constants.LOADBALANCER: provider_lb,
|
|
||||||
constants.LOADBALANCER_ID: load_balancer.id,
|
|
||||||
constants.POOL_ID: pool.id,
|
|
||||||
constants.PROJECT_ID: load_balancer.project_id
|
|
||||||
}
|
|
||||||
|
|
||||||
)
|
)
|
||||||
with tf_logging.DynamicLoggingListener(delete_member_tf,
|
with tf_logging.DynamicLoggingListener(delete_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
@@ -514,14 +530,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||||
load_balancer).to_dict()
|
load_balancer).to_dict()
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.LISTENERS: listeners_dicts,
|
||||||
|
constants.LOADBALANCER_ID: load_balancer.id,
|
||||||
|
constants.LOADBALANCER: provider_lb,
|
||||||
|
constants.POOL_ID: pool.id,
|
||||||
|
constants.PROJECT_ID: load_balancer.project_id}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
batch_update_members_tf = self._taskflow_load(
|
batch_update_members_tf = self._taskflow_load(
|
||||||
self._member_flows.get_batch_update_members_flow(
|
self._member_flows.get_batch_update_members_flow(
|
||||||
provider_old_members, new_members, updated_members),
|
provider_old_members, new_members, updated_members),
|
||||||
store={constants.LISTENERS: listeners_dicts,
|
store=store)
|
||||||
constants.LOADBALANCER: provider_lb,
|
|
||||||
constants.LOADBALANCER_ID: load_balancer.id,
|
|
||||||
constants.POOL_ID: pool.id,
|
|
||||||
constants.PROJECT_ID: load_balancer.project_id})
|
|
||||||
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
batch_update_members_tf.run()
|
batch_update_members_tf.run()
|
||||||
@@ -545,14 +570,23 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
||||||
pool.listeners))
|
pool.listeners))
|
||||||
|
|
||||||
|
store = {
|
||||||
|
constants.MEMBER: member,
|
||||||
|
constants.LISTENERS: listeners_dicts,
|
||||||
|
constants.LOADBALANCER_ID: load_balancer.id,
|
||||||
|
constants.LOADBALANCER: provider_lb,
|
||||||
|
constants.POOL_ID: pool.id,
|
||||||
|
constants.UPDATE_DICT: member_updates}
|
||||||
|
if load_balancer.availability_zone:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = (
|
||||||
|
self._az_repo.get_availability_zone_metadata_dict(
|
||||||
|
db_apis.get_session(), load_balancer.availability_zone))
|
||||||
|
else:
|
||||||
|
store[constants.AVAILABILITY_ZONE] = {}
|
||||||
|
|
||||||
update_member_tf = self._taskflow_load(
|
update_member_tf = self._taskflow_load(
|
||||||
self._member_flows.get_update_member_flow(),
|
self._member_flows.get_update_member_flow(),
|
||||||
store={constants.MEMBER: member,
|
store=store)
|
||||||
constants.LISTENERS: listeners_dicts,
|
|
||||||
constants.LOADBALANCER: provider_lb,
|
|
||||||
constants.LOADBALANCER_ID: load_balancer.id,
|
|
||||||
constants.POOL_ID: pool.id,
|
|
||||||
constants.UPDATE_DICT: member_updates})
|
|
||||||
with tf_logging.DynamicLoggingListener(update_member_tf,
|
with tf_logging.DynamicLoggingListener(update_member_tf,
|
||||||
log=LOG):
|
log=LOG):
|
||||||
update_member_tf.run()
|
update_member_tf.run()
|
||||||
|
@@ -526,7 +526,8 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
# Plug the member networks into the new amphora
|
# Plug the member networks into the new amphora
|
||||||
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
|
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORA),
|
requires=(constants.LOADBALANCER, constants.AMPHORA,
|
||||||
|
constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTA))
|
provides=constants.DELTA))
|
||||||
|
|
||||||
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
|
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
|
||||||
|
@@ -149,7 +149,8 @@ class LoadBalancerFlows(object):
|
|||||||
)
|
)
|
||||||
flows.append(
|
flows.append(
|
||||||
network_tasks.CalculateDelta(
|
network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER, provides=constants.DELTAS
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
|
provides=constants.DELTAS
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
flows.append(
|
flows.append(
|
||||||
|
@@ -39,7 +39,7 @@ class MemberFlows(object):
|
|||||||
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
|
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
|
||||||
requires=constants.MEMBER))
|
requires=constants.MEMBER))
|
||||||
create_member_flow.add(network_tasks.CalculateDelta(
|
create_member_flow.add(network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER,
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTAS))
|
provides=constants.DELTAS))
|
||||||
create_member_flow.add(network_tasks.HandleNetworkDeltas(
|
create_member_flow.add(network_tasks.HandleNetworkDeltas(
|
||||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||||
@@ -180,7 +180,7 @@ class MemberFlows(object):
|
|||||||
|
|
||||||
# Done, do real updates
|
# Done, do real updates
|
||||||
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER,
|
requires=(constants.LOADBALANCER, constants.AVAILABILITY_ZONE),
|
||||||
provides=constants.DELTAS))
|
provides=constants.DELTAS))
|
||||||
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
||||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||||
|
@@ -52,7 +52,7 @@ class CalculateAmphoraDelta(BaseNetworkTask):
|
|||||||
|
|
||||||
default_provides = constants.DELTA
|
default_provides = constants.DELTA
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora):
|
def execute(self, loadbalancer, amphora, availability_zone):
|
||||||
LOG.debug("Calculating network delta for amphora id: %s",
|
LOG.debug("Calculating network delta for amphora id: %s",
|
||||||
amphora.get(constants.ID))
|
amphora.get(constants.ID))
|
||||||
|
|
||||||
@@ -60,8 +60,13 @@ class CalculateAmphoraDelta(BaseNetworkTask):
|
|||||||
# seed with lb network(s)
|
# seed with lb network(s)
|
||||||
vrrp_port = self.network_driver.get_port(
|
vrrp_port = self.network_driver.get_port(
|
||||||
amphora[constants.VRRP_PORT_ID])
|
amphora[constants.VRRP_PORT_ID])
|
||||||
desired_network_ids = {vrrp_port.network_id}.union(
|
if availability_zone:
|
||||||
CONF.controller_worker.amp_boot_network_list)
|
management_nets = (
|
||||||
|
[availability_zone.get(constants.MANAGEMENT_NETWORK)] or
|
||||||
|
CONF.controller_worker.amp_boot_network_list)
|
||||||
|
else:
|
||||||
|
management_nets = CONF.controller_worker.amp_boot_network_list
|
||||||
|
desired_network_ids = {vrrp_port.network_id}.union(management_nets)
|
||||||
db_lb = self.loadbalancer_repo.get(
|
db_lb = self.loadbalancer_repo.get(
|
||||||
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
|
db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID])
|
||||||
for pool in db_lb.pools:
|
for pool in db_lb.pools:
|
||||||
@@ -101,13 +106,15 @@ class CalculateDelta(BaseNetworkTask):
|
|||||||
|
|
||||||
default_provides = constants.DELTAS
|
default_provides = constants.DELTAS
|
||||||
|
|
||||||
def execute(self, loadbalancer):
|
def execute(self, loadbalancer, availability_zone):
|
||||||
"""Compute which NICs need to be plugged
|
"""Compute which NICs need to be plugged
|
||||||
|
|
||||||
for the amphora to become operational.
|
for the amphora to become operational.
|
||||||
|
|
||||||
:param loadbalancer: the loadbalancer to calculate deltas for all
|
:param loadbalancer: the loadbalancer to calculate deltas for all
|
||||||
amphorae
|
amphorae
|
||||||
|
:param availability_zone: availability zone metadata dict
|
||||||
|
|
||||||
:returns: dict of octavia.network.data_models.Delta keyed off amphora
|
:returns: dict of octavia.network.data_models.Delta keyed off amphora
|
||||||
id
|
id
|
||||||
"""
|
"""
|
||||||
@@ -120,7 +127,8 @@ class CalculateDelta(BaseNetworkTask):
|
|||||||
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
||||||
db_lb.amphorae):
|
db_lb.amphorae):
|
||||||
|
|
||||||
delta = calculate_amp.execute(loadbalancer, amphora.to_dict())
|
delta = calculate_amp.execute(loadbalancer, amphora.to_dict(),
|
||||||
|
availability_zone)
|
||||||
deltas[amphora.id] = delta
|
deltas[amphora.id] = delta
|
||||||
return deltas
|
return deltas
|
||||||
|
|
||||||
|
@@ -40,8 +40,13 @@ class TestMemberFlows(base.TestCase):
|
|||||||
self.assertIn(constants.LISTENERS, member_flow.requires)
|
self.assertIn(constants.LISTENERS, member_flow.requires)
|
||||||
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||||
self.assertIn(constants.POOL, member_flow.requires)
|
self.assertIn(constants.POOL, member_flow.requires)
|
||||||
|
self.assertIn(constants.MEMBER, member_flow.requires)
|
||||||
|
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
|
||||||
|
|
||||||
self.assertEqual(4, len(member_flow.requires))
|
self.assertIn(constants.DELTAS, member_flow.provides)
|
||||||
|
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
|
||||||
|
|
||||||
|
self.assertEqual(5, len(member_flow.requires))
|
||||||
self.assertEqual(2, len(member_flow.provides))
|
self.assertEqual(2, len(member_flow.provides))
|
||||||
|
|
||||||
def test_get_delete_member_flow(self, mock_get_net_driver):
|
def test_get_delete_member_flow(self, mock_get_net_driver):
|
||||||
@@ -83,6 +88,10 @@ class TestMemberFlows(base.TestCase):
|
|||||||
self.assertIn(constants.LISTENERS, member_flow.requires)
|
self.assertIn(constants.LISTENERS, member_flow.requires)
|
||||||
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||||
self.assertIn(constants.POOL, member_flow.requires)
|
self.assertIn(constants.POOL, member_flow.requires)
|
||||||
|
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
|
||||||
|
|
||||||
self.assertEqual(3, len(member_flow.requires))
|
self.assertIn(constants.DELTAS, member_flow.provides)
|
||||||
|
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
|
||||||
|
|
||||||
|
self.assertEqual(4, len(member_flow.requires))
|
||||||
self.assertEqual(2, len(member_flow.provides))
|
self.assertEqual(2, len(member_flow.provides))
|
||||||
|
@@ -96,7 +96,8 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
|
|
||||||
calc_delta = network_tasks.CalculateDelta()
|
calc_delta = network_tasks.CalculateDelta()
|
||||||
|
|
||||||
self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
|
self.assertEqual(EMPTY,
|
||||||
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and no pools, nothing plugged
|
# Test with one amp and no pools, nothing plugged
|
||||||
# Delta should be empty
|
# Delta should be empty
|
||||||
@@ -107,7 +108,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
self.load_balancer_mock.pools = []
|
self.load_balancer_mock.pools = []
|
||||||
|
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
|
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
|
||||||
|
|
||||||
# Pool mock should be configured explicitly for each test
|
# Pool mock should be configured explicitly for each test
|
||||||
@@ -118,7 +119,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
# Delta should be empty
|
# Delta should be empty
|
||||||
pool_mock.members = []
|
pool_mock.members = []
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and one member, nothing plugged
|
# Test with one amp and one pool and one member, nothing plugged
|
||||||
# Delta should be one additional subnet to plug
|
# Delta should be one additional subnet to plug
|
||||||
@@ -135,7 +136,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id=2)],
|
data_models.Interface(network_id=2)],
|
||||||
delete_nics=[])
|
delete_nics=[])
|
||||||
self.assertEqual({self.amphora_mock.id: ndm},
|
self.assertEqual({self.amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
vrrp_port_call = mock.call(self.amphora_mock.vrrp_port_id)
|
vrrp_port_call = mock.call(self.amphora_mock.vrrp_port_id)
|
||||||
mock_driver.get_port.assert_has_calls([vrrp_port_call])
|
mock_driver.get_port.assert_has_calls([vrrp_port_call])
|
||||||
@@ -155,7 +156,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id=2)]
|
data_models.Interface(network_id=2)]
|
||||||
|
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and one member, wrong network plugged
|
# Test with one amp and one pool and one member, wrong network plugged
|
||||||
# Delta should be one network to add and one to remove
|
# Delta should be one network to add and one to remove
|
||||||
@@ -173,7 +174,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
delete_nics=[
|
delete_nics=[
|
||||||
data_models.Interface(network_id=3)])
|
data_models.Interface(network_id=3)])
|
||||||
self.assertEqual({self.amphora_mock.id: ndm},
|
self.assertEqual({self.amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and no members, one network plugged
|
# Test with one amp and one pool and no members, one network plugged
|
||||||
# Delta should be one network to remove
|
# Delta should be one network to remove
|
||||||
@@ -188,7 +189,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
delete_nics=[
|
delete_nics=[
|
||||||
data_models.Interface(network_id=2)])
|
data_models.Interface(network_id=2)])
|
||||||
self.assertEqual({self.amphora_mock.id: ndm},
|
self.assertEqual({self.amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
def test_get_plumbed_networks(self, mock_get_net_driver):
|
def test_get_plumbed_networks(self, mock_get_net_driver):
|
||||||
mock_driver = mock.MagicMock()
|
mock_driver = mock.MagicMock()
|
||||||
|
@@ -748,7 +748,10 @@ class TestControllerWorker(base.TestCase):
|
|||||||
@mock.patch('octavia.controller.worker.v1.flows.'
|
@mock.patch('octavia.controller.worker.v1.flows.'
|
||||||
'member_flows.MemberFlows.get_create_member_flow',
|
'member_flows.MemberFlows.get_create_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_create_member(self,
|
def test_create_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_create_member_flow,
|
mock_get_create_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -763,20 +766,20 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
mock_member_repo_get.side_effect = [None, _member_mock]
|
mock_member_repo_get.side_effect = [None, _member_mock]
|
||||||
|
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.create_member(MEMBER_ID)
|
cw.create_member(MEMBER_ID)
|
||||||
|
|
||||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||||
assert_called_once_with(_flow_mock,
|
assert_called_once_with(
|
||||||
store={constants.MEMBER: _member_mock,
|
_flow_mock,
|
||||||
constants.LISTENERS:
|
store={constants.MEMBER: _member_mock,
|
||||||
[_listener_mock],
|
constants.LISTENERS: [_listener_mock],
|
||||||
constants.LOADBALANCER:
|
constants.LOADBALANCER: _load_balancer_mock,
|
||||||
_load_balancer_mock,
|
constants.POOL: _pool_mock,
|
||||||
constants.POOL:
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
_pool_mock}))
|
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
self.assertEqual(2, mock_member_repo_get.call_count)
|
self.assertEqual(2, mock_member_repo_get.call_count)
|
||||||
@@ -784,7 +787,10 @@ class TestControllerWorker(base.TestCase):
|
|||||||
@mock.patch('octavia.controller.worker.v1.flows.'
|
@mock.patch('octavia.controller.worker.v1.flows.'
|
||||||
'member_flows.MemberFlows.get_delete_member_flow',
|
'member_flows.MemberFlows.get_delete_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_delete_member(self,
|
def test_delete_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_delete_member_flow,
|
mock_get_delete_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -799,7 +805,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.delete_member(MEMBER_ID)
|
cw.delete_member(MEMBER_ID)
|
||||||
|
|
||||||
@@ -811,14 +817,18 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.LOADBALANCER:
|
constants.LOADBALANCER:
|
||||||
_load_balancer_mock,
|
_load_balancer_mock,
|
||||||
constants.POOL:
|
constants.POOL:
|
||||||
_pool_mock}))
|
_pool_mock,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v1.flows.'
|
@mock.patch('octavia.controller.worker.v1.flows.'
|
||||||
'member_flows.MemberFlows.get_update_member_flow',
|
'member_flows.MemberFlows.get_update_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_update_member(self,
|
def test_update_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_update_member_flow,
|
mock_get_update_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -834,7 +844,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
_member_mock.provisioning_status = constants.PENDING_UPDATE
|
_member_mock.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
|
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
|
||||||
|
|
||||||
@@ -848,14 +858,18 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.POOL:
|
constants.POOL:
|
||||||
_pool_mock,
|
_pool_mock,
|
||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
MEMBER_UPDATE_DICT}))
|
MEMBER_UPDATE_DICT,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v1.flows.'
|
@mock.patch('octavia.controller.worker.v1.flows.'
|
||||||
'member_flows.MemberFlows.get_batch_update_members_flow',
|
'member_flows.MemberFlows.get_batch_update_members_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_batch_update_members(self,
|
def test_batch_update_members(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_batch_update_members_flow,
|
mock_get_batch_update_members_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -870,7 +884,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
||||||
|
|
||||||
@@ -880,7 +894,8 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.LISTENERS: [_listener_mock],
|
constants.LISTENERS: [_listener_mock],
|
||||||
constants.LOADBALANCER:
|
constants.LOADBALANCER:
|
||||||
_load_balancer_mock,
|
_load_balancer_mock,
|
||||||
constants.POOL: _pool_mock}))
|
constants.POOL: _pool_mock,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
|
@@ -41,8 +41,13 @@ class TestMemberFlows(base.TestCase):
|
|||||||
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
|
||||||
self.assertIn(constants.POOL_ID, member_flow.requires)
|
self.assertIn(constants.POOL_ID, member_flow.requires)
|
||||||
|
self.assertIn(constants.MEMBER, member_flow.requires)
|
||||||
|
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
|
||||||
|
|
||||||
self.assertEqual(5, len(member_flow.requires))
|
self.assertIn(constants.DELTAS, member_flow.provides)
|
||||||
|
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
|
||||||
|
|
||||||
|
self.assertEqual(6, len(member_flow.requires))
|
||||||
self.assertEqual(2, len(member_flow.provides))
|
self.assertEqual(2, len(member_flow.provides))
|
||||||
|
|
||||||
def test_get_delete_member_flow(self, mock_get_net_driver):
|
def test_get_delete_member_flow(self, mock_get_net_driver):
|
||||||
@@ -88,6 +93,10 @@ class TestMemberFlows(base.TestCase):
|
|||||||
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
|
||||||
self.assertIn(constants.POOL_ID, member_flow.requires)
|
self.assertIn(constants.POOL_ID, member_flow.requires)
|
||||||
|
self.assertIn(constants.AVAILABILITY_ZONE, member_flow.requires)
|
||||||
|
|
||||||
self.assertEqual(4, len(member_flow.requires))
|
self.assertIn(constants.DELTAS, member_flow.provides)
|
||||||
|
self.assertIn(constants.ADDED_PORTS, member_flow.provides)
|
||||||
|
|
||||||
|
self.assertEqual(5, len(member_flow.requires))
|
||||||
self.assertEqual(2, len(member_flow.provides))
|
self.assertEqual(2, len(member_flow.provides))
|
||||||
|
@@ -122,7 +122,8 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
|
|
||||||
calc_delta = network_tasks.CalculateDelta()
|
calc_delta = network_tasks.CalculateDelta()
|
||||||
|
|
||||||
self.assertEqual(EMPTY, calc_delta.execute(self.load_balancer_mock))
|
self.assertEqual(EMPTY,
|
||||||
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and no pools, nothing plugged
|
# Test with one amp and no pools, nothing plugged
|
||||||
# Delta should be empty
|
# Delta should be empty
|
||||||
@@ -132,7 +133,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
self.db_load_balancer_mock.amphorae = [self.db_amphora_mock]
|
self.db_load_balancer_mock.amphorae = [self.db_amphora_mock]
|
||||||
self.db_load_balancer_mock.pools = []
|
self.db_load_balancer_mock.pools = []
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
|
mock_driver.get_plugged_networks.assert_called_once_with(COMPUTE_ID)
|
||||||
|
|
||||||
# Pool mock should be configured explicitly for each test
|
# Pool mock should be configured explicitly for each test
|
||||||
@@ -143,7 +144,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
# Delta should be empty
|
# Delta should be empty
|
||||||
pool_mock.members = []
|
pool_mock.members = []
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and one member, nothing plugged
|
# Test with one amp and one pool and one member, nothing plugged
|
||||||
# Delta should be one additional subnet to plug
|
# Delta should be one additional subnet to plug
|
||||||
@@ -160,7 +161,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id=3)],
|
data_models.Interface(network_id=3)],
|
||||||
delete_nics=[]).to_dict(recurse=True)
|
delete_nics=[]).to_dict(recurse=True)
|
||||||
self.assertEqual({self.db_amphora_mock.id: ndm},
|
self.assertEqual({self.db_amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
vrrp_port_call = mock.call(PORT_ID)
|
vrrp_port_call = mock.call(PORT_ID)
|
||||||
mock_driver.get_port.assert_has_calls([vrrp_port_call])
|
mock_driver.get_port.assert_has_calls([vrrp_port_call])
|
||||||
@@ -181,7 +182,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id='netid')]
|
data_models.Interface(network_id='netid')]
|
||||||
|
|
||||||
self.assertEqual(empty_deltas,
|
self.assertEqual(empty_deltas,
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and one member, wrong network plugged
|
# Test with one amp and one pool and one member, wrong network plugged
|
||||||
# Delta should be one network to add and one to remove
|
# Delta should be one network to add and one to remove
|
||||||
@@ -201,7 +202,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id=2)]
|
data_models.Interface(network_id=2)]
|
||||||
).to_dict(recurse=True)
|
).to_dict(recurse=True)
|
||||||
self.assertEqual({self.db_amphora_mock.id: ndm},
|
self.assertEqual({self.db_amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
# Test with one amp and one pool and no members, one network plugged
|
# Test with one amp and one pool and no members, one network plugged
|
||||||
# Delta should be one network to remove
|
# Delta should be one network to remove
|
||||||
@@ -219,7 +220,7 @@ class TestNetworkTasks(base.TestCase):
|
|||||||
data_models.Interface(network_id=2)]
|
data_models.Interface(network_id=2)]
|
||||||
).to_dict(recurse=True)
|
).to_dict(recurse=True)
|
||||||
self.assertEqual({self.db_amphora_mock.id: ndm},
|
self.assertEqual({self.db_amphora_mock.id: ndm},
|
||||||
calc_delta.execute(self.load_balancer_mock))
|
calc_delta.execute(self.load_balancer_mock, {}))
|
||||||
|
|
||||||
def test_get_plumbed_networks(self, mock_get_net_driver):
|
def test_get_plumbed_networks(self, mock_get_net_driver):
|
||||||
mock_driver = mock.MagicMock()
|
mock_driver = mock.MagicMock()
|
||||||
|
@@ -807,7 +807,10 @@ class TestControllerWorker(base.TestCase):
|
|||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_create_member_flow',
|
'member_flows.MemberFlows.get_create_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_create_member(self,
|
def test_create_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_create_member_flow,
|
mock_get_create_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -822,6 +825,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
mock_member_repo_get.side_effect = [None, _member_mock]
|
mock_member_repo_get.side_effect = [None, _member_mock]
|
||||||
_member = _member_mock.to_dict()
|
_member = _member_mock.to_dict()
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
@@ -830,23 +834,24 @@ class TestControllerWorker(base.TestCase):
|
|||||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||||
_db_load_balancer_mock).to_dict()
|
_db_load_balancer_mock).to_dict()
|
||||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||||
assert_called_once_with(_flow_mock,
|
assert_called_once_with(
|
||||||
store={constants.MEMBER: _member,
|
_flow_mock,
|
||||||
constants.LISTENERS:
|
store={constants.MEMBER: _member,
|
||||||
[self.ref_listener_dict],
|
constants.LISTENERS: [self.ref_listener_dict],
|
||||||
constants.LOADBALANCER_ID:
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
LB_ID,
|
constants.LOADBALANCER: provider_lb,
|
||||||
constants.LOADBALANCER:
|
constants.POOL_ID: POOL_ID,
|
||||||
provider_lb,
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
constants.POOL_ID:
|
|
||||||
POOL_ID}))
|
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_delete_member_flow',
|
'member_flows.MemberFlows.get_delete_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_delete_member(self,
|
def test_delete_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_delete_member_flow,
|
mock_get_delete_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -862,6 +867,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
_member = _member_mock.to_dict()
|
_member = _member_mock.to_dict()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.delete_member(_member)
|
cw.delete_member(_member)
|
||||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||||
@@ -878,14 +884,18 @@ class TestControllerWorker(base.TestCase):
|
|||||||
provider_lb,
|
provider_lb,
|
||||||
constants.POOL_ID:
|
constants.POOL_ID:
|
||||||
POOL_ID,
|
POOL_ID,
|
||||||
constants.PROJECT_ID: PROJECT_ID}))
|
constants.PROJECT_ID: PROJECT_ID,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_update_member_flow',
|
'member_flows.MemberFlows.get_update_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_update_member(self,
|
def test_update_member(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_update_member_flow,
|
mock_get_update_member_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -902,7 +912,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
_member = _member_mock.to_dict()
|
_member = _member_mock.to_dict()
|
||||||
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.update_member(_member, MEMBER_UPDATE_DICT)
|
cw.update_member(_member, MEMBER_UPDATE_DICT)
|
||||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||||
@@ -920,14 +930,18 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.LOADBALANCER_ID:
|
constants.LOADBALANCER_ID:
|
||||||
LB_ID,
|
LB_ID,
|
||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
MEMBER_UPDATE_DICT}))
|
MEMBER_UPDATE_DICT,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_batch_update_members_flow',
|
'member_flows.MemberFlows.get_batch_update_members_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
def test_batch_update_members(self,
|
def test_batch_update_members(self,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
mock_get_batch_update_members_flow,
|
mock_get_batch_update_members_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@@ -942,7 +956,7 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.batch_update_members([{constants.MEMBER_ID: 9,
|
cw.batch_update_members([{constants.MEMBER_ID: 9,
|
||||||
constants.POOL_ID: 'testtest'}],
|
constants.POOL_ID: 'testtest'}],
|
||||||
@@ -957,7 +971,8 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.LOADBALANCER_ID: LB_ID,
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
constants.LOADBALANCER: provider_lb,
|
constants.LOADBALANCER: provider_lb,
|
||||||
constants.POOL_ID: POOL_ID,
|
constants.POOL_ID: POOL_ID,
|
||||||
constants.PROJECT_ID: PROJECT_ID}))
|
constants.PROJECT_ID: PROJECT_ID,
|
||||||
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user