Merge "Adding DynamicLoopingCall around lookup"

This commit is contained in:
Jenkins 2014-03-31 22:18:27 +00:00 committed by Gerrit Code Review
commit 517616938e
9 changed files with 366 additions and 55 deletions

View File

@ -106,7 +106,8 @@ class IronicPythonAgentHeartbeater(threading.Thread):
class IronicPythonAgent(object): class IronicPythonAgent(object):
def __init__(self, api_url, advertise_address, listen_address): def __init__(self, api_url, advertise_address, listen_address,
lookup_timeout, lookup_interval):
self.api_url = api_url self.api_url = api_url
self.api_client = ironic_api_client.APIClient(self.api_url) self.api_client = ironic_api_client.APIClient(self.api_url)
self.listen_address = listen_address self.listen_address = listen_address
@ -127,6 +128,9 @@ class IronicPythonAgent(object):
invoke_on_load=True, invoke_on_load=True,
propagate_map_exceptions=True, propagate_map_exceptions=True,
) )
# lookup timeout in seconds
self.lookup_timeout = lookup_timeout
self.lookup_interval = lookup_interval
def get_status(self): def get_status(self):
"""Retrieve a serializable status.""" """Retrieve a serializable status."""
@ -195,13 +199,14 @@ class IronicPythonAgent(object):
def run(self): def run(self):
"""Run the Ironic Python Agent.""" """Run the Ironic Python Agent."""
# Get the UUID so we can heartbeat to Ironic. Raises LookupNodeError
# if there is an issue (uncaught, restart agent)
self.started_at = _time() self.started_at = _time()
# Get the UUID so we can heartbeat to Ironic
content = self.api_client.lookup_node( content = self.api_client.lookup_node(
hardware_info=self.hardware.list_hardware_info() hardware_info=self.hardware.list_hardware_info(),
) timeout=self.lookup_timeout,
if 'node' not in content or 'heartbeat_timeout' not in content: starting_interval=self.lookup_interval)
raise LookupError('Lookup return needs node and heartbeat_timeout')
self.node = content['node'] self.node = content['node']
self.heartbeat_timeout = content['heartbeat_timeout'] self.heartbeat_timeout = content['heartbeat_timeout']
self.heartbeater.start() self.heartbeater.start()
@ -223,8 +228,12 @@ def build_agent(api_url,
advertise_host, advertise_host,
advertise_port, advertise_port,
listen_host, listen_host,
listen_port): listen_port,
lookup_timeout,
lookup_interval):
return IronicPythonAgent(api_url, return IronicPythonAgent(api_url,
(advertise_host, advertise_port), (advertise_host, advertise_port),
(listen_host, listen_port)) (listen_host, listen_port),
lookup_timeout,
lookup_interval)

View File

@ -47,9 +47,24 @@ def run():
type=int, type=int,
help='The port to tell Ironic to reply and send ' help='The port to tell Ironic to reply and send '
'commands to.') 'commands to.')
parser.add_argument('--lookup-timeout',
default=300,
type=int,
help='The amount of time to retry the initial lookup '
'call to Ironic. After the timeout, the agent '
'will exit with a non-zero exit code.')
parser.add_argument('--lookup-interval',
default=1,
type=int,
help='The initial interval for retries on the initial '
'lookup call to Ironic. The interval will be '
'doubled after each failure until timeout is '
'exceeded.')
args = parser.parse_args() args = parser.parse_args()
agent.build_agent(args.api_url, agent.build_agent(args.api_url,
args.advertise_host, args.advertise_host,
args.advertise_port, args.advertise_port,
args.listen_host, args.listen_host,
args.listen_port).run() args.listen_port,
args.lookup_timeout,
args.lookup_interval).run()

View File

@ -15,11 +15,12 @@ limitations under the License.
""" """
import json import json
import requests import requests
from ironic_python_agent import encoding from ironic_python_agent import encoding
from ironic_python_agent import errors from ironic_python_agent import errors
from ironic_python_agent.openstack.common import log
from ironic_python_agent.openstack.common import loopingcall
class APIClient(object): class APIClient(object):
@ -29,6 +30,7 @@ class APIClient(object):
self.api_url = api_url.rstrip('/') self.api_url = api_url.rstrip('/')
self.session = requests.Session() self.session = requests.Session()
self.encoder = encoding.RESTJSONEncoder() self.encoder = encoding.RESTJSONEncoder()
self.log = log.getLogger(__name__)
def _request(self, method, path, data=None): def _request(self, method, path, data=None):
request_url = '{api_url}{path}'.format(api_url=self.api_url, path=path) request_url = '{api_url}{path}'.format(api_url=self.api_url, path=path)
@ -70,38 +72,86 @@ class APIClient(object):
except Exception: except Exception:
raise errors.HeartbeatError('Invalid Heartbeat-Before header') raise errors.HeartbeatError('Invalid Heartbeat-Before header')
def lookup_node(self, hardware_info): def lookup_node(self, hardware_info, timeout, starting_interval):
timer = loopingcall.DynamicLoopingCall(
self._do_lookup,
hardware_info=hardware_info,
intervals=[starting_interval],
total_time=[0],
timeout=timeout)
node_content = timer.start().wait()
# True is returned on timeout
if node_content is True:
raise errors.LookupNodeError('Could not look up node info. Check '
'logs for details.')
return node_content
def _do_lookup(self, hardware_info, timeout, intervals=[1],
total_time=[0]):
"""The actual call to lookup a node. Should be called inside
loopingcall.DynamicLoopingCall.
intervals and total_time are mutable so it can be changed by each run
in the looping call and accessed/changed on the next run.
"""
def next_interval(timeout, intervals=[], total_time=[]):
"""Function to calculate what the next interval should be. Uses
exponential backoff and raises an exception (that won't
be caught by do_lookup) to kill the looping call if it goes too
long
"""
new_interval = intervals[-1] * 2
if total_time[0] + new_interval > timeout:
# No retvalue signifies error
raise loopingcall.LoopingCallDone()
total_time[0] += new_interval
intervals.append(new_interval)
return new_interval
path = '/{api_version}/drivers/teeth/vendor_passthru/lookup'.format( path = '/{api_version}/drivers/teeth/vendor_passthru/lookup'.format(
api_version=self.api_version api_version=self.api_version
) )
# This hardware won't be saved on the node currently, because of how # This hardware won't be saved on the node currently, because of
# driver_vendor_passthru is implemented (no node saving). # how driver_vendor_passthru is implemented (no node saving).
data = { data = {
'hardware': hardware_info, 'hardware': hardware_info
} }
# Make the POST, make sure we get back normal data/status codes and
# content
try: try:
response = self._request('POST', path, data=data) response = self._request('POST', path, data=data)
except Exception as e: except Exception as e:
raise errors.LookupNodeError(str(e)) self.log.warning('POST failed: %s' % str(e))
return next_interval(timeout, intervals, total_time)
if response.status_code != requests.codes.OK: if response.status_code != requests.codes.OK:
msg = 'Invalid status code: {0}'.format(response.status_code) self.log.warning('Invalid status code: %s' %
raise errors.LookupNodeError(msg) response.status_code)
return next_interval(timeout, intervals, total_time)
try: try:
content = json.loads(response.content) content = json.loads(response.content)
except Exception as e: except Exception as e:
raise errors.LookupNodeError('Error decoding response: ' self.log.warning('Error decoding response: %s' % str(e))
+ str(e)) return next_interval(timeout, intervals, total_time)
# Check for valid response data
if 'node' not in content or 'uuid' not in content['node']: if 'node' not in content or 'uuid' not in content['node']:
raise errors.LookupNodeError('Got invalid node data from the API:' self.log.warning('Got invalid node data from the API: %s' %
'%s' % content) content)
return next_interval(timeout, intervals, total_time)
if 'heartbeat_timeout' not in content: if 'heartbeat_timeout' not in content:
raise errors.LookupNodeError('Got invalid heartbeat from the API:' self.log.warning('Got invalid heartbeat from the API: %s' %
'%s' % content) content)
return content return next_interval(timeout, intervals, total_time)
# Got valid content
raise loopingcall.LoopingCallDone(retvalue=content)
def _get_agent_url(self, advertise_address): def _get_agent_url(self, advertise_address):
return 'http://{0}:{1}'.format(advertise_address[0], return 'http://{0}:{1}'.format(advertise_address[0],

View File

@ -0,0 +1,145 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from eventlet import event
from eventlet import greenthread
from ironic_python_agent.openstack.common.gettextutils import _LE, _LW
from ironic_python_agent.openstack.common import log as logging
from ironic_python_agent.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCall.
The poll-function passed to LoopingCall can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCall.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCall.wait() should return."""
self.retvalue = retvalue
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running:
break
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_LW('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn_n(_inner)
return self.done
# TODO(mikal): this class name is deprecated in Havana and should be removed
# in the I release
LoopingCall = FixedIntervalLoopingCall
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug('Dynamic looping call sleeping for %.02f '
'seconds', idle)
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done

View File

@ -123,7 +123,9 @@ class TestBaseAgent(unittest.TestCase):
self.agent = agent.IronicPythonAgent('https://fake_api.example.' self.agent = agent.IronicPythonAgent('https://fake_api.example.'
'org:8081/', 'org:8081/',
('203.0.113.1', 9990), ('203.0.113.1', 9990),
('192.0.2.1', 9999)) ('192.0.2.1', 9999),
lookup_timeout=300,
lookup_interval=1)
def assertEqualEncoded(self, a, b): def assertEqualEncoded(self, a, b):
# Evidently JSONEncoder.default() can't handle None (??) so we have to # Evidently JSONEncoder.default() can't handle None (??) so we have to

View File

@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import httmock
import json import json
import mock import mock
import time import time
@ -23,10 +22,20 @@ import unittest
from ironic_python_agent import errors from ironic_python_agent import errors
from ironic_python_agent import hardware from ironic_python_agent import hardware
from ironic_python_agent import ironic_api_client from ironic_python_agent import ironic_api_client
from ironic_python_agent.openstack.common import loopingcall
API_URL = 'http://agent-api.ironic.example.org/' API_URL = 'http://agent-api.ironic.example.org/'
class FakeResponse(object):
def __init__(self, content=None, status_code=200, headers=None):
content = content or {}
self.content = json.dumps(content)
self.status_code = status_code
self.headers = headers or {}
class TestBaseIronicPythonAgent(unittest.TestCase): class TestBaseIronicPythonAgent(unittest.TestCase):
def setUp(self): def setUp(self):
self.api_client = ironic_api_client.APIClient(API_URL) self.api_client = ironic_api_client.APIClient(API_URL)
@ -39,7 +48,7 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
def test_successful_heartbeat(self): def test_successful_heartbeat(self):
expected_heartbeat_before = time.time() + 120 expected_heartbeat_before = time.time() + 120
response = httmock.response(status_code=204, headers={ response = FakeResponse(status_code=204, headers={
'Heartbeat-Before': expected_heartbeat_before, 'Heartbeat-Before': expected_heartbeat_before,
}) })
@ -69,7 +78,7 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
advertise_address=('192.0.2.1', '9999')) advertise_address=('192.0.2.1', '9999'))
def test_heartbeat_invalid_status_code(self): def test_heartbeat_invalid_status_code(self):
response = httmock.response(status_code=404) response = FakeResponse(status_code=404)
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
@ -79,7 +88,7 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
advertise_address=('192.0.2.1', '9999')) advertise_address=('192.0.2.1', '9999'))
def test_heartbeat_missing_heartbeat_before_header(self): def test_heartbeat_missing_heartbeat_before_header(self):
response = httmock.response(status_code=204) response = FakeResponse(status_code=204)
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
@ -89,7 +98,7 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
advertise_address=('192.0.2.1', '9999')) advertise_address=('192.0.2.1', '9999'))
def test_heartbeat_invalid_heartbeat_before_header(self): def test_heartbeat_invalid_heartbeat_before_header(self):
response = httmock.response(status_code=204, headers={ response = FakeResponse(status_code=204, headers={
'Heartbeat-Before': 'tomorrow', 'Heartbeat-Before': 'tomorrow',
}) })
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
@ -100,8 +109,47 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
uuid='deadbeef-dabb-ad00-b105-f00d00bab10c', uuid='deadbeef-dabb-ad00-b105-f00d00bab10c',
advertise_address=('192.0.2.1', '9999')) advertise_address=('192.0.2.1', '9999'))
def test_lookup_node(self): @mock.patch('eventlet.greenthread.sleep')
response = httmock.response(status_code=200, content={ def test_lookup_node(self, sleep_mock):
content = {
'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
},
'heartbeat_timeout': 300
}
response = FakeResponse(status_code=200, content={
'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
},
'heartbeat_timeout': 300
})
self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response
returned_content = self.api_client.lookup_node(
hardware_info=self.hardware_info,
timeout=300,
starting_interval=1)
self.assertEqual(content, returned_content)
@mock.patch('eventlet.greenthread.sleep')
def test_lookup_node_exception(self, sleep_mock):
bad_response = FakeResponse(status_code=500, content={
'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
},
'heartbeat_timeout': 300
})
self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = bad_response
self.assertRaises(errors.LookupNodeError,
self.api_client.lookup_node,
hardware_info=self.hardware_info,
timeout=300,
starting_interval=1)
def test_do_lookup(self):
response = FakeResponse(status_code=200, content={
'node': { 'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c' 'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
}, },
@ -111,12 +159,16 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
self.api_client.lookup_node(hardware_info=self.hardware_info) self.assertRaises(loopingcall.LoopingCallDone,
self.api_client._do_lookup,
hardware_info=self.hardware_info,
timeout=300,
intervals=[2])
request_args = self.api_client.session.request.call_args[0] request_args = self.api_client.session.request.call_args[0]
self.assertEqual(request_args[0], 'POST') self.assertEqual(request_args[0], 'POST')
self.assertEqual(request_args[1], self.assertEqual(request_args[1],
API_URL + 'v1/drivers/teeth/vendor_passthru/lookup') API_URL + 'v1/drivers/teeth/vendor_passthru/lookup')
data = self.api_client.session.request.call_args[1]['data'] data = self.api_client.session.request.call_args[1]['data']
content = json.loads(data) content = json.loads(data)
@ -131,8 +183,8 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
}, },
]) ])
def test_lookup_node_bad_response_code(self): def test_do_lookup_bad_response_code(self):
response = httmock.response(status_code=400, content={ response = FakeResponse(status_code=400, content={
'node': { 'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c' 'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
} }
@ -141,24 +193,32 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
self.assertRaises(errors.LookupNodeError, total_time = [0]
self.api_client.lookup_node, interval = self.api_client._do_lookup(self.hardware_info,
hardware_info=self.hardware_info) timeout=300,
total_time=total_time,
intervals=[2])
self.assertEqual(4, interval)
self.assertEqual(4, total_time[0])
def test_lookup_node_bad_response_data(self): def test_do_lookup_bad_response_data(self):
response = httmock.response(status_code=200, content={ response = FakeResponse(status_code=200, content={
'heartbeat_timeout': 300 'heartbeat_timeout': 300
}) })
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
self.assertRaises(errors.LookupNodeError, total_time = [0]
self.api_client.lookup_node, interval = self.api_client._do_lookup(self.hardware_info,
hardware_info=self.hardware_info) timeout=300,
total_time=total_time,
intervals=[2])
self.assertEqual(4, interval)
self.assertEqual(4, total_time[0])
def test_lookup_node_no_heartbeat_timeout(self): def test_do_lookup_no_heartbeat_timeout(self):
response = httmock.response(status_code=200, content={ response = FakeResponse(status_code=200, content={
'node': { 'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c' 'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
} }
@ -167,18 +227,47 @@ class TestBaseIronicPythonAgent(unittest.TestCase):
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
self.assertRaises(errors.LookupNodeError, total_time = [0]
self.api_client.lookup_node, interval = self.api_client._do_lookup(self.hardware_info,
hardware_info=self.hardware_info) timeout=300,
total_time=total_time,
intervals=[2])
self.assertEqual(4, interval)
self.assertEqual(4, total_time[0])
def test_lookup_node_bad_response_body(self): def test_do_lookup_bad_response_body(self):
response = httmock.response(status_code=200, content={ response = FakeResponse(status_code=200, content={
'node_node': 'also_not_node' 'node_node': 'also_not_node'
}) })
self.api_client.session.request = mock.Mock() self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = response self.api_client.session.request.return_value = response
total_time = [0]
interval = self.api_client._do_lookup(self.hardware_info,
timeout=300,
total_time=total_time,
intervals=[2])
self.assertEqual(4, interval)
self.assertEqual(4, total_time[0])
@mock.patch('eventlet.greenthread.sleep')
def test_lookup_node_exponential_backoff(self, sleep_mock):
bad_response = FakeResponse(status_code=500, content={
'node': {
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
},
'heartbeat_timeout': 300
})
self.api_client.session.request = mock.Mock()
self.api_client.session.request.return_value = bad_response
self.assertRaises(errors.LookupNodeError, self.assertRaises(errors.LookupNodeError,
self.api_client.lookup_node, self.api_client.lookup_node,
hardware_info=self.hardware_info) hardware_info=self.hardware_info,
timeout=300,
starting_interval=1)
# 254 seconds before timeout
expected_times = [mock.call(2), mock.call(4), mock.call(8),
mock.call(16), mock.call(32), mock.call(64),
mock.call(128)]
self.assertEqual(expected_times, sleep_mock.call_args_list)

View File

@ -1,7 +1,7 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from oslo-incubator # The list of modules to copy from oslo-incubator
modules=log module=log
module=loopingcall
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=ironic_python_agent base=ironic_python_agent

View File

@ -7,3 +7,5 @@ six>=1.5.2
oslo.config==1.2.1 oslo.config==1.2.1
Babel==1.3 Babel==1.3
iso8601==0.1.10 iso8601==0.1.10
eventlet>=0.13.0

View File

@ -5,4 +5,3 @@ mock>=1.0
testrepository>=0.0.18 testrepository>=0.0.18
testtools>=0.9.34 testtools>=0.9.34
python-subunit>=0.0.18 python-subunit>=0.0.18
httmock