Handle SIGINT while agents are joining the quorum
Change-Id: I85d130f2f6b8e68e5a9e0177238c8f5c569ab6f1
This commit is contained in:
parent
a8b4c7b8c2
commit
f805eec080
@ -83,7 +83,7 @@ def run_command(command):
|
||||
start=start, finish=time.time())
|
||||
|
||||
|
||||
def work(agent_id, endpoint, polling_interval):
|
||||
def work(agent_id, endpoint, polling_interval, ignore_sigint=False):
|
||||
LOG.info('Agent id is: %s', agent_id)
|
||||
LOG.info('Connecting to server: %s', endpoint)
|
||||
|
||||
@ -91,8 +91,8 @@ def work(agent_id, endpoint, polling_interval):
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.connect('tcp://%s' % endpoint)
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
task = poll_task(socket, agent_id)
|
||||
|
||||
start_at = task.get('start_at')
|
||||
@ -120,10 +120,14 @@ def work(agent_id, endpoint, polling_interval):
|
||||
|
||||
except BaseException as e:
|
||||
if isinstance(e, KeyboardInterrupt):
|
||||
LOG.info('The process is interrupted')
|
||||
if ignore_sigint:
|
||||
LOG.info('Got SIGINT, but configured to ignore it')
|
||||
else:
|
||||
LOG.info('Process is interrupted')
|
||||
sys.exit(3)
|
||||
else:
|
||||
LOG.exception(e)
|
||||
break
|
||||
|
||||
|
||||
def get_mac():
|
||||
|
@ -40,6 +40,7 @@ class MessageQueue(object):
|
||||
|
||||
def reply_handler(reply_message):
|
||||
self.socket.send_json(reply_message)
|
||||
LOG.debug('Sent reply: %s', reply_message)
|
||||
|
||||
try:
|
||||
yield message, reply_handler
|
||||
@ -47,6 +48,8 @@ class MessageQueue(object):
|
||||
break
|
||||
|
||||
except BaseException as e:
|
||||
if not isinstance(e, KeyboardInterrupt): # SIGINT is ok
|
||||
if isinstance(e, KeyboardInterrupt): # SIGINT is ok
|
||||
LOG.info('Process is interrupted')
|
||||
else:
|
||||
LOG.exception(e)
|
||||
raise
|
||||
|
@ -211,11 +211,17 @@ def make_quorum(agent_ids, server_endpoint, polling_interval,
|
||||
heartbeat = multiprocessing.Process(
|
||||
target=agent_process.work,
|
||||
kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=server_endpoint,
|
||||
polling_interval=polling_interval))
|
||||
polling_interval=polling_interval, ignore_sigint=True))
|
||||
heartbeat.daemon = True
|
||||
heartbeat.start()
|
||||
|
||||
quorum = Quorum(message_queue, polling_interval, agent_loss_timeout,
|
||||
agent_join_timeout)
|
||||
quorum.join(set(agent_ids))
|
||||
result = quorum.join(set(agent_ids))
|
||||
|
||||
failed = dict((agent_id, rec['status'])
|
||||
for agent_id, rec in result.items() if rec['status'] != 'ok')
|
||||
if failed:
|
||||
raise Exception('Agents failed to join: %s' % failed)
|
||||
|
||||
return quorum
|
||||
|
@ -182,7 +182,7 @@ class TestQuorum(testtools.TestCase):
|
||||
'beta': DummyExecutor(),
|
||||
}
|
||||
result = quorum.execute(test_case)
|
||||
self.assertEqual(result.keys(), test_case.keys())
|
||||
self.assertEqual(set(result.keys()), set(test_case.keys()))
|
||||
self.assertEqual('lost', result['_lost']['status'])
|
||||
self.assertEqual('ok', result['beta']['status'])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user