Merge "Add Timer context manager class"
This commit is contained in:
commit
fc5235c49c
@ -18,6 +18,7 @@
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import importlib
|
||||
import os
|
||||
@ -36,6 +37,7 @@ from eventlet.green import subprocess
|
||||
import netaddr
|
||||
from neutron_lib import constants as n_const
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
@ -60,6 +62,10 @@ class WaitTimeout(Exception):
|
||||
"""Default exception coming from wait_until_true() function."""
|
||||
|
||||
|
||||
class TimerTimeout(n_exc.NeutronException):
|
||||
message = _('Timer timeout expired after %(timeout)s second(s).')
|
||||
|
||||
|
||||
class LockWithTimer(object):
|
||||
def __init__(self, threshold):
|
||||
self._threshold = threshold
|
||||
@ -889,3 +895,64 @@ def validate_rp_bandwidth(rp_bandwidths, device_names):
|
||||
"Invalid resource_provider_bandwidths: "
|
||||
"Device name %(dev_name)s is missing from "
|
||||
"device mappings") % {'dev_name': dev_name})
|
||||
|
||||
|
||||
class Timer(object):
|
||||
"""Timer context manager class
|
||||
|
||||
This class creates a context that:
|
||||
- Triggers a timeout exception if the timeout is set.
|
||||
- Returns the time elapsed since the context was initialized.
|
||||
- Returns the time spent in the context once it's closed.
|
||||
|
||||
The timeout exception can be suppressed; when the time expires, the context
|
||||
finishes without rising TimerTimeout.
|
||||
"""
|
||||
def __init__(self, timeout=None, raise_exception=True):
|
||||
super(Timer, self).__init__()
|
||||
self.start = self.delta = None
|
||||
self._timeout = int(timeout) if timeout else None
|
||||
self._timeout_flag = False
|
||||
self._raise_exception = raise_exception
|
||||
|
||||
def _timeout_handler(self, *_):
|
||||
self._timeout_flag = True
|
||||
if self._raise_exception:
|
||||
raise TimerTimeout(timeout=self._timeout)
|
||||
self.__exit__()
|
||||
|
||||
def __enter__(self):
|
||||
self.start = datetime.datetime.now()
|
||||
if self._timeout:
|
||||
signal.signal(signal.SIGALRM, self._timeout_handler)
|
||||
signal.alarm(self._timeout)
|
||||
return self
|
||||
|
||||
def __exit__(self, *_):
|
||||
if self._timeout:
|
||||
signal.alarm(0)
|
||||
self.delta = datetime.datetime.now() - self.start
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self.delta, item)
|
||||
|
||||
def __iter__(self):
|
||||
self._raise_exception = False
|
||||
return self.__enter__()
|
||||
|
||||
def next(self): # pragma: no cover
|
||||
# NOTE(ralonsoh): Python 2 support.
|
||||
if not self._timeout_flag:
|
||||
return datetime.datetime.now()
|
||||
raise StopIteration()
|
||||
|
||||
def __next__(self): # pragma: no cover
|
||||
# NOTE(ralonsoh): Python 3 support.
|
||||
return self.next()
|
||||
|
||||
def __del__(self):
|
||||
signal.alarm(0)
|
||||
|
||||
@property
|
||||
def delta_time_sec(self):
|
||||
return (datetime.datetime.now() - self.start).total_seconds()
|
||||
|
@ -16,6 +16,7 @@ import os.path
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import ddt
|
||||
import eventlet
|
||||
@ -558,3 +559,38 @@ class TestRpBandwidthValidator(base.BaseTestCase):
|
||||
|
||||
self.assertRaises(ValueError, utils.validate_rp_bandwidth,
|
||||
self.not_valid_rp_bandwidth, self.device_name_set)
|
||||
|
||||
|
||||
class TimerTestCase(base.BaseTestCase):
|
||||
|
||||
def test__getattr(self):
|
||||
with utils.Timer() as timer:
|
||||
time.sleep(1)
|
||||
self.assertEqual(1, round(timer.total_seconds(), 0))
|
||||
self.assertEqual(1, timer.delta.seconds)
|
||||
|
||||
def test__enter_with_timeout(self):
|
||||
with utils.Timer(timeout=10) as timer:
|
||||
time.sleep(1)
|
||||
self.assertEqual(1, round(timer.total_seconds(), 0))
|
||||
|
||||
def test__enter_with_timeout_exception(self):
|
||||
msg = r'Timer timeout expired after 1 second\(s\).'
|
||||
with self.assertRaisesRegex(utils.TimerTimeout, msg):
|
||||
with utils.Timer(timeout=1):
|
||||
time.sleep(2)
|
||||
|
||||
def test__enter_with_timeout_no_exception(self):
|
||||
with utils.Timer(timeout=1, raise_exception=False):
|
||||
time.sleep(2)
|
||||
|
||||
def test__iter(self):
|
||||
iterations = []
|
||||
for i in utils.Timer(timeout=2):
|
||||
iterations.append(i)
|
||||
time.sleep(1.1)
|
||||
self.assertEqual(2, len(iterations))
|
||||
|
||||
def test_delta_time_sec(self):
|
||||
with utils.Timer() as timer:
|
||||
self.assertIsInstance(timer.delta_time_sec, float)
|
||||
|
Loading…
Reference in New Issue
Block a user