Merge "Resync L3, DHCP and OVS/LB agents upon revival"
This commit is contained in:
commit
c8a7d9bfdb
@ -553,7 +553,6 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
||||
'start_flag': True,
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
report_interval = self.conf.AGENT.report_interval
|
||||
self.use_call = True
|
||||
if report_interval:
|
||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
self._report_state)
|
||||
@ -564,8 +563,12 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
||||
self.agent_state.get('configurations').update(
|
||||
self.cache.get_state())
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
|
||||
self.use_call = False
|
||||
agent_status = self.state_rpc.report_state(
|
||||
ctx, self.agent_state, True)
|
||||
if agent_status == constants.AGENT_REVIVED:
|
||||
LOG.info(_LI("Agent has just been revived. "
|
||||
"Scheduling full sync"))
|
||||
self.schedule_resync("Agent has just been revived")
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_LW("Neutron server does not support state report."
|
||||
|
@ -598,7 +598,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
||||
class L3NATAgentWithStateReport(L3NATAgent):
|
||||
|
||||
def __init__(self, host, conf=None):
|
||||
self.use_call = True
|
||||
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
|
||||
self.agent_state = {
|
||||
@ -645,10 +644,14 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
||||
configurations['interfaces'] = num_interfaces
|
||||
configurations['floating_ips'] = num_floating_ips
|
||||
try:
|
||||
self.state_rpc.report_state(self.context, self.agent_state,
|
||||
self.use_call)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == l3_constants.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.agent_state.pop('start_flag', None)
|
||||
self.use_call = False
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_LW("Neutron server does not support state report."
|
||||
|
@ -207,3 +207,11 @@ ROUTER_MARK_MASK = "0xffff"
|
||||
|
||||
# Time format
|
||||
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
|
||||
# Agent states as detected by server, used to reply on agent's state report
|
||||
# agent has just been registered
|
||||
AGENT_NEW = 'new'
|
||||
# agent is alive
|
||||
AGENT_ALIVE = 'alive'
|
||||
# agent has just returned to alive after being dead
|
||||
AGENT_REVIVED = 'revived'
|
||||
|
@ -299,6 +299,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
'delta': delta})
|
||||
|
||||
def _create_or_update_agent(self, context, agent_state):
|
||||
"""Registers new agent in the database or updates existing.
|
||||
|
||||
Returns agent status from server point of view: alive, new or revived.
|
||||
It could be used by agent to do some sync with the server if needed.
|
||||
"""
|
||||
status = constants.AGENT_ALIVE
|
||||
with context.session.begin(subtransactions=True):
|
||||
res_keys = ['agent_type', 'binary', 'host', 'topic']
|
||||
res = dict((k, agent_state[k]) for k in res_keys)
|
||||
@ -311,6 +317,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
try:
|
||||
agent_db = self._get_agent_by_type_and_host(
|
||||
context, agent_state['agent_type'], agent_state['host'])
|
||||
if not agent_db.is_active:
|
||||
status = constants.AGENT_REVIVED
|
||||
res['heartbeat_timestamp'] = current_time
|
||||
if agent_state.get('start_flag'):
|
||||
res['started_at'] = current_time
|
||||
@ -327,7 +335,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
greenthread.sleep(0)
|
||||
context.session.add(agent_db)
|
||||
self._log_heartbeat(agent_state, agent_db, configurations_dict)
|
||||
status = constants.AGENT_NEW
|
||||
greenthread.sleep(0)
|
||||
return status
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
@ -367,7 +377,10 @@ class AgentExtRpcCallback(object):
|
||||
self.plugin = plugin
|
||||
|
||||
def report_state(self, context, **kwargs):
|
||||
"""Report state from agent to server."""
|
||||
"""Report state from agent to server.
|
||||
|
||||
Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE
|
||||
"""
|
||||
time = kwargs['time']
|
||||
time = timeutils.parse_strtime(time)
|
||||
agent_state = kwargs['agent_state']['agent_state']
|
||||
@ -382,7 +395,7 @@ class AgentExtRpcCallback(object):
|
||||
return
|
||||
if not self.plugin:
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
self.plugin.create_or_update_agent(context, agent_state)
|
||||
return self.plugin.create_or_update_agent(context, agent_state)
|
||||
|
||||
def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
|
||||
"""Checks if the server and the agent times are in sync.
|
||||
|
@ -824,6 +824,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
||||
|
||||
# stores received port_updates for processing by the main loop
|
||||
self.updated_devices = set()
|
||||
# flag to do a sync after revival
|
||||
self.fullsync = False
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
|
||||
@ -845,8 +847,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
||||
try:
|
||||
devices = len(self.br_mgr.get_tap_devices())
|
||||
self.agent_state.get('configurations')['devices'] = devices
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == constants.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
@ -1047,11 +1054,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
||||
while True:
|
||||
start = time.time()
|
||||
|
||||
device_info = self.scan_devices(previous=device_info, sync=sync)
|
||||
if self.fullsync:
|
||||
sync = True
|
||||
self.fullsync = False
|
||||
|
||||
if sync:
|
||||
LOG.info(_LI("Agent out of sync with plugin!"))
|
||||
sync = False
|
||||
|
||||
device_info = self.scan_devices(previous=device_info, sync=sync)
|
||||
sync = False
|
||||
|
||||
if (self._device_info_has_changes(device_info)
|
||||
or self.sg_agent.firewall_refresh_needed()):
|
||||
|
@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
super(OVSNeutronAgent, self).__init__()
|
||||
self.conf = conf or cfg.CONF
|
||||
|
||||
self.fullsync = True
|
||||
# init bridge classes with configured datapath type.
|
||||
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
|
||||
functools.partial(bridge_classes[b],
|
||||
@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.veth_mtu = veth_mtu
|
||||
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
|
||||
p_const.MAX_VLAN_TAG))
|
||||
self.use_call = True
|
||||
self.tunnel_types = tunnel_types or []
|
||||
self.l2_pop = l2_population
|
||||
# TODO(ethuleau): Change ARP responder so it's not dependent on the
|
||||
@ -329,9 +329,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
self.dvr_agent.in_distributed_mode())
|
||||
|
||||
try:
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
self.use_call)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == n_const.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.use_call = False
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
@ -1660,7 +1664,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
if not polling_manager:
|
||||
polling_manager = polling.get_polling_manager(
|
||||
minimize_polling=False)
|
||||
|
||||
sync = True
|
||||
ports = set()
|
||||
updated_ports_copy = set()
|
||||
@ -1670,6 +1673,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
consecutive_resyncs = 0
|
||||
need_clean_stale_flow = True
|
||||
while self._check_and_handle_signal():
|
||||
if self.fullsync:
|
||||
LOG.info(_LI("rpc_loop doing a full sync."))
|
||||
sync = True
|
||||
self.fullsync = False
|
||||
port_info = {}
|
||||
ancillary_port_info = {}
|
||||
start = time.time()
|
||||
|
@ -425,6 +425,20 @@ class TestDhcpAgent(base.BaseTestCase):
|
||||
dhcp.periodic_resync()
|
||||
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
|
||||
|
||||
def test_report_state_revival_logic(self):
|
||||
dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
|
||||
with mock.patch.object(dhcp.state_rpc,
|
||||
'report_state') as report_state,\
|
||||
mock.patch.object(dhcp, "run"):
|
||||
report_state.return_value = const.AGENT_ALIVE
|
||||
dhcp._report_state()
|
||||
self.assertEqual(dhcp.needs_resync_reasons, {})
|
||||
|
||||
report_state.return_value = const.AGENT_REVIVED
|
||||
dhcp._report_state()
|
||||
self.assertEqual(dhcp.needs_resync_reasons[None],
|
||||
['Agent has just been revived'])
|
||||
|
||||
def test_periodic_resync_helper(self):
|
||||
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
|
@ -216,13 +216,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
conf=self.conf)
|
||||
|
||||
self.assertTrue(agent.agent_state['start_flag'])
|
||||
use_call_arg = agent.use_call
|
||||
agent.after_start()
|
||||
report_state.assert_called_once_with(agent.context,
|
||||
agent.agent_state,
|
||||
use_call_arg)
|
||||
True)
|
||||
self.assertIsNone(agent.agent_state.get('start_flag'))
|
||||
|
||||
def test_report_state_revival_logic(self):
|
||||
with mock.patch.object(agent_rpc.PluginReportStateAPI,
|
||||
'report_state') as report_state:
|
||||
agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME,
|
||||
conf=self.conf)
|
||||
report_state.return_value = l3_constants.AGENT_REVIVED
|
||||
agent._report_state()
|
||||
self.assertTrue(agent.fullsync)
|
||||
|
||||
agent.fullsync = False
|
||||
report_state.return_value = l3_constants.AGENT_ALIVE
|
||||
agent._report_state()
|
||||
self.assertFalse(agent.fullsync)
|
||||
|
||||
def test_periodic_sync_routers_task_call_clean_stale_namespaces(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.plugin_api.get_routers.return_value = []
|
||||
|
@ -362,6 +362,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
|
||||
self.agent.stop()
|
||||
self.assertFalse(mock_set_rpc.called)
|
||||
|
||||
def test_report_state_revived(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
"report_state") as report_st:
|
||||
report_st.return_value = constants.AGENT_REVIVED
|
||||
self.agent._report_state()
|
||||
self.assertTrue(self.agent.fullsync)
|
||||
|
||||
|
||||
class TestLinuxBridgeManager(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
|
@ -147,9 +147,6 @@ class TestOvsNeutronAgent(object):
|
||||
return_value=[]):
|
||||
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
|
||||
**kwargs)
|
||||
# set back to true because initial report state will succeed due
|
||||
# to mocked out RPC calls
|
||||
self.agent.use_call = True
|
||||
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
|
||||
self.agent.sg_agent = mock.Mock()
|
||||
|
||||
@ -742,14 +739,13 @@ class TestOvsNeutronAgent(object):
|
||||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, True)
|
||||
self.assertNotIn("start_flag", self.agent.agent_state)
|
||||
self.assertFalse(self.agent.use_call)
|
||||
self.assertEqual(
|
||||
self.agent.agent_state["configurations"]["devices"],
|
||||
self.agent.int_br_device_count
|
||||
)
|
||||
self.agent._report_state()
|
||||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, False)
|
||||
self.agent.agent_state, True)
|
||||
|
||||
def test_report_state_fail(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
@ -762,6 +758,13 @@ class TestOvsNeutronAgent(object):
|
||||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, True)
|
||||
|
||||
def test_report_state_revived(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
"report_state") as report_st:
|
||||
report_st.return_value = n_const.AGENT_REVIVED
|
||||
self.agent._report_state()
|
||||
self.assertTrue(self.agent.fullsync)
|
||||
|
||||
def test_port_update(self):
|
||||
port = {"id": TEST_PORT_ID1,
|
||||
"network_id": TEST_NETWORK_ID1,
|
||||
@ -1809,9 +1812,6 @@ class TestOvsDvrNeutronAgent(object):
|
||||
return_value=[]):
|
||||
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
|
||||
**kwargs)
|
||||
# set back to true because initial report state will succeed due
|
||||
# to mocked out RPC calls
|
||||
self.agent.use_call = True
|
||||
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
|
||||
self.agent.sg_agent = mock.Mock()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user