Merge "Pull timestamp-related functions out to a separate module"

This commit is contained in:
Zuul 2023-04-12 18:46:41 +00:00 committed by Gerrit Code Review
commit 984cca9263
5 changed files with 1295 additions and 1226 deletions

View File

@ -26,7 +26,6 @@ import fcntl
import grp
import hashlib
import json
import math
import operator
import os
import pwd
@ -97,6 +96,20 @@ from swift.common.linkat import linkat
# For backwards compatability with 3rd party middlewares
from swift.common.registry import register_swift_info, get_swift_info # noqa
from swift.common.utils.timestamp import ( # noqa
NORMAL_FORMAT,
INTERNAL_FORMAT,
SHORT_FORMAT,
MAX_OFFSET,
PRECISION,
Timestamp,
encode_timestamps,
decode_timestamps,
normalize_timestamp,
EPOCH,
last_modified_date_to_timestamp,
normalize_delete_at_timestamp,
)
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
@ -1151,382 +1164,6 @@ def drop_buffer_cache(fd, offset, length):
'length': length, 'ret': ret})
NORMAL_FORMAT = "%016.05f"
INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x'
SHORT_FORMAT = NORMAL_FORMAT + '_%x'
MAX_OFFSET = (16 ** 16) - 1
PRECISION = 1e-5
# Setting this to True will cause the internal format to always display
# extended digits - even when the value is equivalent to the normalized form.
# This isn't ideal during an upgrade when some servers might not understand
# the new time format - but flipping it to True works great for testing.
FORCE_INTERNAL = False # or True
@functools.total_ordering
class Timestamp(object):
"""
Internal Representation of Swift Time.
The normalized form of the X-Timestamp header looks like a float
with a fixed width to ensure stable string sorting - normalized
timestamps look like "1402464677.04188"
To support overwrites of existing data without modifying the original
timestamp but still maintain consistency a second internal offset vector
is append to the normalized timestamp form which compares and sorts
greater than the fixed width float format but less than a newer timestamp.
The internalized format of timestamps looks like
"1402464677.04188_0000000000000000" - the portion after the underscore is
the offset and is a formatted hexadecimal integer.
The internalized form is not exposed to clients in responses from
Swift. Normal client operations will not create a timestamp with an
offset.
The Timestamp class in common.utils supports internalized and
normalized formatting of timestamps and also comparison of timestamp
values. When the offset value of a Timestamp is 0 - it's considered
insignificant and need not be represented in the string format; to
support backwards compatibility during a Swift upgrade the
internalized and normalized form of a Timestamp with an
insignificant offset are identical. When a timestamp includes an
offset it will always be represented in the internalized form, but
is still excluded from the normalized form. Timestamps with an
equivalent timestamp portion (the float part) will compare and order
by their offset. Timestamps with a greater timestamp portion will
always compare and order greater than a Timestamp with a lesser
timestamp regardless of it's offset. String comparison and ordering
is guaranteed for the internalized string format, and is backwards
compatible for normalized timestamps which do not include an offset.
"""
def __init__(self, timestamp, offset=0, delta=0, check_bounds=True):
"""
Create a new Timestamp.
:param timestamp: time in seconds since the Epoch, may be any of:
* a float or integer
* normalized/internalized string
* another instance of this class (offset is preserved)
:param offset: the second internal offset vector, an int
:param delta: deca-microsecond difference from the base timestamp
param, an int
"""
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('ascii')
if isinstance(timestamp, six.string_types):
base, base_offset = timestamp.partition('_')[::2]
self.timestamp = float(base)
if '_' in base_offset:
raise ValueError('invalid literal for int() with base 16: '
'%r' % base_offset)
if base_offset:
self.offset = int(base_offset, 16)
else:
self.offset = 0
else:
self.timestamp = float(timestamp)
self.offset = getattr(timestamp, 'offset', 0)
# increment offset
if offset >= 0:
self.offset += offset
else:
raise ValueError('offset must be non-negative')
if self.offset > MAX_OFFSET:
raise ValueError('offset must be smaller than %d' % MAX_OFFSET)
self.raw = int(round(self.timestamp / PRECISION))
# add delta
if delta:
self.raw = self.raw + delta
if self.raw <= 0:
raise ValueError(
'delta must be greater than %d' % (-1 * self.raw))
self.timestamp = float(self.raw * PRECISION)
if check_bounds:
if self.timestamp < 0:
raise ValueError('timestamp cannot be negative')
if self.timestamp >= 10000000000:
raise ValueError('timestamp too large')
@classmethod
def now(cls, offset=0, delta=0):
return cls(time.time(), offset=offset, delta=delta)
def __repr__(self):
return INTERNAL_FORMAT % (self.timestamp, self.offset)
def __str__(self):
raise TypeError('You must specify which string format is required')
def __float__(self):
return self.timestamp
def __int__(self):
return int(self.timestamp)
def __nonzero__(self):
return bool(self.timestamp or self.offset)
def __bool__(self):
return self.__nonzero__()
@property
def normal(self):
return NORMAL_FORMAT % self.timestamp
@property
def internal(self):
if self.offset or FORCE_INTERNAL:
return INTERNAL_FORMAT % (self.timestamp, self.offset)
else:
return self.normal
@property
def short(self):
if self.offset or FORCE_INTERNAL:
return SHORT_FORMAT % (self.timestamp, self.offset)
else:
return self.normal
@property
def isoformat(self):
"""
Get an isoformat string representation of the 'normal' part of the
Timestamp with microsecond precision and no trailing timezone, for
example::
1970-01-01T00:00:00.000000
:return: an isoformat string
"""
t = float(self.normal)
if six.PY3:
# On Python 3, round manually using ROUND_HALF_EVEN rounding
# method, to use the same rounding method than Python 2. Python 3
# used a different rounding method, but Python 3.4.4 and 3.5.1 use
# again ROUND_HALF_EVEN as Python 2.
# See https://bugs.python.org/issue23517
frac, t = math.modf(t)
us = round(frac * 1e6)
if us >= 1000000:
t += 1
us -= 1000000
elif us < 0:
t -= 1
us += 1000000
dt = datetime.datetime.utcfromtimestamp(t)
dt = dt.replace(microsecond=us)
else:
dt = datetime.datetime.utcfromtimestamp(t)
isoformat = dt.isoformat()
# python isoformat() doesn't include msecs when zero
if len(isoformat) < len("1970-01-01T00:00:00.000000"):
isoformat += ".000000"
return isoformat
@classmethod
def from_isoformat(cls, date_string):
"""
Parse an isoformat string representation of time to a Timestamp object.
:param date_string: a string formatted as per an Timestamp.isoformat
property.
:return: an instance of this class.
"""
start = datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%f")
delta = start - EPOCH
# This calculation is based on Python 2.7's Modules/datetimemodule.c,
# function delta_to_microseconds(), but written in Python.
return cls(delta.total_seconds())
def ceil(self):
"""
Return the 'normal' part of the timestamp rounded up to the nearest
integer number of seconds.
This value should be used whenever the second-precision Last-Modified
time of a resource is required.
:return: a float value with second precision.
"""
return math.ceil(float(self))
def __eq__(self, other):
if other is None:
return False
if not isinstance(other, Timestamp):
try:
other = Timestamp(other, check_bounds=False)
except ValueError:
return False
return self.internal == other.internal
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
if other is None:
return False
if not isinstance(other, Timestamp):
other = Timestamp(other, check_bounds=False)
if other.timestamp < 0:
return False
if other.timestamp >= 10000000000:
return True
return self.internal < other.internal
def __hash__(self):
return hash(self.internal)
def __invert__(self):
if self.offset:
raise ValueError('Cannot invert timestamps with offsets')
return Timestamp((999999999999999 - self.raw) * PRECISION)
def encode_timestamps(t1, t2=None, t3=None, explicit=False):
"""
Encode up to three timestamps into a string. Unlike a Timestamp object, the
encoded string does NOT used fixed width fields and consequently no
relative chronology of the timestamps can be inferred from lexicographic
sorting of encoded timestamp strings.
The format of the encoded string is:
<t1>[<+/-><t2 - t1>[<+/-><t3 - t2>]]
i.e. if t1 = t2 = t3 then just the string representation of t1 is returned,
otherwise the time offsets for t2 and t3 are appended. If explicit is True
then the offsets for t2 and t3 are always appended even if zero.
Note: any offset value in t1 will be preserved, but offsets on t2 and t3
are not preserved. In the anticipated use cases for this method (and the
inverse decode_timestamps method) the timestamps passed as t2 and t3 are
not expected to have offsets as they will be timestamps associated with a
POST request. In the case where the encoding is used in a container objects
table row, t1 could be the PUT or DELETE time but t2 and t3 represent the
content type and metadata times (if different from the data file) i.e.
correspond to POST timestamps. In the case where the encoded form is used
in a .meta file name, t1 and t2 both correspond to POST timestamps.
"""
form = '{0}'
values = [t1.short]
if t2 is not None:
t2_t1_delta = t2.raw - t1.raw
explicit = explicit or (t2_t1_delta != 0)
values.append(t2_t1_delta)
if t3 is not None:
t3_t2_delta = t3.raw - t2.raw
explicit = explicit or (t3_t2_delta != 0)
values.append(t3_t2_delta)
if explicit:
form += '{1:+x}'
if t3 is not None:
form += '{2:+x}'
return form.format(*values)
def decode_timestamps(encoded, explicit=False):
"""
Parses a string of the form generated by encode_timestamps and returns
a tuple of the three component timestamps. If explicit is False, component
timestamps that are not explicitly encoded will be assumed to have zero
delta from the previous component and therefore take the value of the
previous component. If explicit is True, component timestamps that are
not explicitly encoded will be returned with value None.
"""
# TODO: some tests, e.g. in test_replicator, put float timestamps values
# into container db's, hence this defensive check, but in real world
# this may never happen.
if not isinstance(encoded, six.string_types):
ts = Timestamp(encoded)
return ts, ts, ts
parts = []
signs = []
pos_parts = encoded.split('+')
for part in pos_parts:
# parse time components and their signs
# e.g. x-y+z --> parts = [x, y, z] and signs = [+1, -1, +1]
neg_parts = part.split('-')
parts = parts + neg_parts
signs = signs + [1] + [-1] * (len(neg_parts) - 1)
t1 = Timestamp(parts[0])
t2 = t3 = None
if len(parts) > 1:
t2 = t1
delta = signs[1] * int(parts[1], 16)
# if delta = 0 we want t2 = t3 = t1 in order to
# preserve any offset in t1 - only construct a distinct
# timestamp if there is a non-zero delta.
if delta:
t2 = Timestamp((t1.raw + delta) * PRECISION)
elif not explicit:
t2 = t1
if len(parts) > 2:
t3 = t2
delta = signs[2] * int(parts[2], 16)
if delta:
t3 = Timestamp((t2.raw + delta) * PRECISION)
elif not explicit:
t3 = t2
return t1, t2, t3
def normalize_timestamp(timestamp):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx.xxxxx (10.5) format.
Note that timestamps using values greater than or equal to November 20th,
2286 at 17:46 UTC will use 11 digits to represent the number of
seconds.
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
return Timestamp(timestamp).normal
EPOCH = datetime.datetime(1970, 1, 1)
def last_modified_date_to_timestamp(last_modified_date_str):
"""
Convert a last modified date (like you'd get from a container listing,
e.g. 2014-02-28T23:22:36.698390) to a float.
"""
return Timestamp.from_isoformat(last_modified_date_str)
def normalize_delete_at_timestamp(timestamp, high_precision=False):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format.
Note that timestamps less than 0000000000 are raised to
0000000000 and values greater than November 20th, 2286 at
17:46:39 UTC will be capped at that date and time, resulting in
no return value exceeding 9999999999.99999 (or 9999999999 if
using low-precision).
This cap is because the expirer is already working through a
sorted list of strings that were all a length of 10. Adding
another digit would mess up the sort and cause the expirer to
break from processing early. By 2286, this problem will need to
be fixed, probably by creating an additional .expiring_objects
account to work from with 11 (or more) digit container names.
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
fmt = '%016.5f' if high_precision else '%010d'
return fmt % min(max(0, float(timestamp)), 9999999999.99999)
def mkdirs(path):
"""
Ensures the path is a directory or makes it if not. Errors if the path

View File

@ -0,0 +1,399 @@
# Copyright (c) 2010-2023 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Timestamp-related functions for use with Swift."""
import datetime
import functools
import math
import time
import six
NORMAL_FORMAT = "%016.05f"
INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x'
SHORT_FORMAT = NORMAL_FORMAT + '_%x'
MAX_OFFSET = (16 ** 16) - 1
PRECISION = 1e-5
# Setting this to True will cause the internal format to always display
# extended digits - even when the value is equivalent to the normalized form.
# This isn't ideal during an upgrade when some servers might not understand
# the new time format - but flipping it to True works great for testing.
FORCE_INTERNAL = False # or True
@functools.total_ordering
class Timestamp(object):
"""
Internal Representation of Swift Time.
The normalized form of the X-Timestamp header looks like a float
with a fixed width to ensure stable string sorting - normalized
timestamps look like "1402464677.04188"
To support overwrites of existing data without modifying the original
timestamp but still maintain consistency a second internal offset vector
is append to the normalized timestamp form which compares and sorts
greater than the fixed width float format but less than a newer timestamp.
The internalized format of timestamps looks like
"1402464677.04188_0000000000000000" - the portion after the underscore is
the offset and is a formatted hexadecimal integer.
The internalized form is not exposed to clients in responses from
Swift. Normal client operations will not create a timestamp with an
offset.
The Timestamp class in common.utils supports internalized and
normalized formatting of timestamps and also comparison of timestamp
values. When the offset value of a Timestamp is 0 - it's considered
insignificant and need not be represented in the string format; to
support backwards compatibility during a Swift upgrade the
internalized and normalized form of a Timestamp with an
insignificant offset are identical. When a timestamp includes an
offset it will always be represented in the internalized form, but
is still excluded from the normalized form. Timestamps with an
equivalent timestamp portion (the float part) will compare and order
by their offset. Timestamps with a greater timestamp portion will
always compare and order greater than a Timestamp with a lesser
timestamp regardless of it's offset. String comparison and ordering
is guaranteed for the internalized string format, and is backwards
compatible for normalized timestamps which do not include an offset.
"""
def __init__(self, timestamp, offset=0, delta=0, check_bounds=True):
"""
Create a new Timestamp.
:param timestamp: time in seconds since the Epoch, may be any of:
* a float or integer
* normalized/internalized string
* another instance of this class (offset is preserved)
:param offset: the second internal offset vector, an int
:param delta: deca-microsecond difference from the base timestamp
param, an int
"""
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('ascii')
if isinstance(timestamp, six.string_types):
base, base_offset = timestamp.partition('_')[::2]
self.timestamp = float(base)
if '_' in base_offset:
raise ValueError('invalid literal for int() with base 16: '
'%r' % base_offset)
if base_offset:
self.offset = int(base_offset, 16)
else:
self.offset = 0
else:
self.timestamp = float(timestamp)
self.offset = getattr(timestamp, 'offset', 0)
# increment offset
if offset >= 0:
self.offset += offset
else:
raise ValueError('offset must be non-negative')
if self.offset > MAX_OFFSET:
raise ValueError('offset must be smaller than %d' % MAX_OFFSET)
self.raw = int(round(self.timestamp / PRECISION))
# add delta
if delta:
self.raw = self.raw + delta
if self.raw <= 0:
raise ValueError(
'delta must be greater than %d' % (-1 * self.raw))
self.timestamp = float(self.raw * PRECISION)
if check_bounds:
if self.timestamp < 0:
raise ValueError('timestamp cannot be negative')
if self.timestamp >= 10000000000:
raise ValueError('timestamp too large')
@classmethod
def now(cls, offset=0, delta=0):
return cls(time.time(), offset=offset, delta=delta)
def __repr__(self):
return INTERNAL_FORMAT % (self.timestamp, self.offset)
def __str__(self):
raise TypeError('You must specify which string format is required')
def __float__(self):
return self.timestamp
def __int__(self):
return int(self.timestamp)
def __nonzero__(self):
return bool(self.timestamp or self.offset)
def __bool__(self):
return self.__nonzero__()
@property
def normal(self):
return NORMAL_FORMAT % self.timestamp
@property
def internal(self):
if self.offset or FORCE_INTERNAL:
return INTERNAL_FORMAT % (self.timestamp, self.offset)
else:
return self.normal
@property
def short(self):
if self.offset or FORCE_INTERNAL:
return SHORT_FORMAT % (self.timestamp, self.offset)
else:
return self.normal
@property
def isoformat(self):
"""
Get an isoformat string representation of the 'normal' part of the
Timestamp with microsecond precision and no trailing timezone, for
example::
1970-01-01T00:00:00.000000
:return: an isoformat string
"""
t = float(self.normal)
if six.PY3:
# On Python 3, round manually using ROUND_HALF_EVEN rounding
# method, to use the same rounding method than Python 2. Python 3
# used a different rounding method, but Python 3.4.4 and 3.5.1 use
# again ROUND_HALF_EVEN as Python 2.
# See https://bugs.python.org/issue23517
frac, t = math.modf(t)
us = round(frac * 1e6)
if us >= 1000000:
t += 1
us -= 1000000
elif us < 0:
t -= 1
us += 1000000
dt = datetime.datetime.utcfromtimestamp(t)
dt = dt.replace(microsecond=us)
else:
dt = datetime.datetime.utcfromtimestamp(t)
isoformat = dt.isoformat()
# python isoformat() doesn't include msecs when zero
if len(isoformat) < len("1970-01-01T00:00:00.000000"):
isoformat += ".000000"
return isoformat
@classmethod
def from_isoformat(cls, date_string):
"""
Parse an isoformat string representation of time to a Timestamp object.
:param date_string: a string formatted as per an Timestamp.isoformat
property.
:return: an instance of this class.
"""
start = datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%f")
delta = start - EPOCH
# This calculation is based on Python 2.7's Modules/datetimemodule.c,
# function delta_to_microseconds(), but written in Python.
return cls(delta.total_seconds())
def ceil(self):
"""
Return the 'normal' part of the timestamp rounded up to the nearest
integer number of seconds.
This value should be used whenever the second-precision Last-Modified
time of a resource is required.
:return: a float value with second precision.
"""
return math.ceil(float(self))
def __eq__(self, other):
if other is None:
return False
if not isinstance(other, Timestamp):
try:
other = Timestamp(other, check_bounds=False)
except ValueError:
return False
return self.internal == other.internal
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
if other is None:
return False
if not isinstance(other, Timestamp):
other = Timestamp(other, check_bounds=False)
if other.timestamp < 0:
return False
if other.timestamp >= 10000000000:
return True
return self.internal < other.internal
def __hash__(self):
return hash(self.internal)
def __invert__(self):
if self.offset:
raise ValueError('Cannot invert timestamps with offsets')
return Timestamp((999999999999999 - self.raw) * PRECISION)
def encode_timestamps(t1, t2=None, t3=None, explicit=False):
"""
Encode up to three timestamps into a string. Unlike a Timestamp object, the
encoded string does NOT used fixed width fields and consequently no
relative chronology of the timestamps can be inferred from lexicographic
sorting of encoded timestamp strings.
The format of the encoded string is:
<t1>[<+/-><t2 - t1>[<+/-><t3 - t2>]]
i.e. if t1 = t2 = t3 then just the string representation of t1 is returned,
otherwise the time offsets for t2 and t3 are appended. If explicit is True
then the offsets for t2 and t3 are always appended even if zero.
Note: any offset value in t1 will be preserved, but offsets on t2 and t3
are not preserved. In the anticipated use cases for this method (and the
inverse decode_timestamps method) the timestamps passed as t2 and t3 are
not expected to have offsets as they will be timestamps associated with a
POST request. In the case where the encoding is used in a container objects
table row, t1 could be the PUT or DELETE time but t2 and t3 represent the
content type and metadata times (if different from the data file) i.e.
correspond to POST timestamps. In the case where the encoded form is used
in a .meta file name, t1 and t2 both correspond to POST timestamps.
"""
form = '{0}'
values = [t1.short]
if t2 is not None:
t2_t1_delta = t2.raw - t1.raw
explicit = explicit or (t2_t1_delta != 0)
values.append(t2_t1_delta)
if t3 is not None:
t3_t2_delta = t3.raw - t2.raw
explicit = explicit or (t3_t2_delta != 0)
values.append(t3_t2_delta)
if explicit:
form += '{1:+x}'
if t3 is not None:
form += '{2:+x}'
return form.format(*values)
def decode_timestamps(encoded, explicit=False):
"""
Parses a string of the form generated by encode_timestamps and returns
a tuple of the three component timestamps. If explicit is False, component
timestamps that are not explicitly encoded will be assumed to have zero
delta from the previous component and therefore take the value of the
previous component. If explicit is True, component timestamps that are
not explicitly encoded will be returned with value None.
"""
# TODO: some tests, e.g. in test_replicator, put float timestamps values
# into container db's, hence this defensive check, but in real world
# this may never happen.
if not isinstance(encoded, six.string_types):
ts = Timestamp(encoded)
return ts, ts, ts
parts = []
signs = []
pos_parts = encoded.split('+')
for part in pos_parts:
# parse time components and their signs
# e.g. x-y+z --> parts = [x, y, z] and signs = [+1, -1, +1]
neg_parts = part.split('-')
parts = parts + neg_parts
signs = signs + [1] + [-1] * (len(neg_parts) - 1)
t1 = Timestamp(parts[0])
t2 = t3 = None
if len(parts) > 1:
t2 = t1
delta = signs[1] * int(parts[1], 16)
# if delta = 0 we want t2 = t3 = t1 in order to
# preserve any offset in t1 - only construct a distinct
# timestamp if there is a non-zero delta.
if delta:
t2 = Timestamp((t1.raw + delta) * PRECISION)
elif not explicit:
t2 = t1
if len(parts) > 2:
t3 = t2
delta = signs[2] * int(parts[2], 16)
if delta:
t3 = Timestamp((t2.raw + delta) * PRECISION)
elif not explicit:
t3 = t2
return t1, t2, t3
def normalize_timestamp(timestamp):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx.xxxxx (10.5) format.
Note that timestamps using values greater than or equal to November 20th,
2286 at 17:46 UTC will use 11 digits to represent the number of
seconds.
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
return Timestamp(timestamp).normal
EPOCH = datetime.datetime(1970, 1, 1)
def last_modified_date_to_timestamp(last_modified_date_str):
"""
Convert a last modified date (like you'd get from a container listing,
e.g. 2014-02-28T23:22:36.698390) to a float.
"""
return Timestamp.from_isoformat(last_modified_date_str)
def normalize_delete_at_timestamp(timestamp, high_precision=False):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format.
Note that timestamps less than 0000000000 are raised to
0000000000 and values greater than November 20th, 2286 at
17:46:39 UTC will be capped at that date and time, resulting in
no return value exceeding 9999999999.99999 (or 9999999999 if
using low-precision).
This cap is because the expirer is already working through a
sorted list of strings that were all a length of 10. Adding
another digit would mess up the sort and cause the expirer to
break from processing early. By 2286, this problem will need to
be fixed, probably by creating an additional .expiring_objects
account to work from with 11 (or more) digit container names.
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
fmt = '%016.5f' if high_precision else '%010d'
return fmt % min(max(0, float(timestamp)), 9999999999.99999)

View File

@ -198,855 +198,6 @@ class TestUTC(unittest.TestCase):
self.assertEqual(utils.UTC.tzname(None), 'UTC')
class TestTimestamp(unittest.TestCase):
"""Tests for swift.common.utils.Timestamp"""
def test_invalid_input(self):
self.assertRaises(ValueError, utils.Timestamp, time.time(), offset=-1)
self.assertRaises(ValueError, utils.Timestamp, '123.456_78_90')
def test_invalid_string_conversion(self):
t = utils.Timestamp.now()
self.assertRaises(TypeError, str, t)
def test_offset_limit(self):
t = 1417462430.78693
# can't have a offset above MAX_OFFSET
self.assertRaises(ValueError, utils.Timestamp, t,
offset=utils.MAX_OFFSET + 1)
# exactly max offset is fine
ts = utils.Timestamp(t, offset=utils.MAX_OFFSET)
self.assertEqual(ts.internal, '1417462430.78693_ffffffffffffffff')
# but you can't offset it further
self.assertRaises(ValueError, utils.Timestamp, ts.internal, offset=1)
# unless you start below it
ts = utils.Timestamp(t, offset=utils.MAX_OFFSET - 1)
self.assertEqual(utils.Timestamp(ts.internal, offset=1),
'1417462430.78693_ffffffffffffffff')
def test_normal_format_no_offset(self):
expected = '1402436408.91203'
test_values = (
'1402436408.91203',
'1402436408.91203_00000000',
'1402436408.912030000',
'1402436408.912030000_0000000000000',
'000001402436408.912030000',
'000001402436408.912030000_0000000000',
1402436408.91203,
1402436408.912029,
1402436408.9120300000000000,
1402436408.91202999999999999,
utils.Timestamp(1402436408.91203),
utils.Timestamp(1402436408.91203, offset=0),
utils.Timestamp(1402436408.912029),
utils.Timestamp(1402436408.912029, offset=0),
utils.Timestamp('1402436408.91203'),
utils.Timestamp('1402436408.91203', offset=0),
utils.Timestamp('1402436408.91203_00000000'),
utils.Timestamp('1402436408.91203_00000000', offset=0),
)
for value in test_values:
timestamp = utils.Timestamp(value)
self.assertEqual(timestamp.normal, expected)
# timestamp instance can also compare to string or float
self.assertEqual(timestamp, expected)
self.assertEqual(timestamp, float(expected))
self.assertEqual(timestamp, utils.normalize_timestamp(expected))
def test_isoformat(self):
expected = '2014-06-10T22:47:32.054580'
test_values = (
'1402440452.05458',
'1402440452.054579',
'1402440452.05458_00000000',
'1402440452.054579_00000000',
'1402440452.054580000',
'1402440452.054579999',
'1402440452.054580000_0000000000000',
'1402440452.054579999_0000ff00',
'000001402440452.054580000',
'000001402440452.0545799',
'000001402440452.054580000_0000000000',
'000001402440452.054579999999_00000fffff',
1402440452.05458,
1402440452.054579,
1402440452.0545800000000000,
1402440452.054579999,
utils.Timestamp(1402440452.05458),
utils.Timestamp(1402440452.0545799),
utils.Timestamp(1402440452.05458, offset=0),
utils.Timestamp(1402440452.05457999999, offset=0),
utils.Timestamp(1402440452.05458, offset=100),
utils.Timestamp(1402440452.054579, offset=100),
utils.Timestamp('1402440452.05458'),
utils.Timestamp('1402440452.054579999'),
utils.Timestamp('1402440452.05458', offset=0),
utils.Timestamp('1402440452.054579', offset=0),
utils.Timestamp('1402440452.05458', offset=300),
utils.Timestamp('1402440452.05457999', offset=300),
utils.Timestamp('1402440452.05458_00000000'),
utils.Timestamp('1402440452.05457999_00000000'),
utils.Timestamp('1402440452.05458_00000000', offset=0),
utils.Timestamp('1402440452.05457999_00000aaa', offset=0),
utils.Timestamp('1402440452.05458_00000000', offset=400),
utils.Timestamp('1402440452.054579_0a', offset=400),
)
for value in test_values:
self.assertEqual(utils.Timestamp(value).isoformat, expected)
expected = '1970-01-01T00:00:00.000000'
test_values = (
'0',
'0000000000.00000',
'0000000000.00000_ffffffffffff',
0,
0.0,
)
for value in test_values:
self.assertEqual(utils.Timestamp(value).isoformat, expected)
def test_from_isoformat(self):
ts = utils.Timestamp.from_isoformat('2014-06-10T22:47:32.054580')
self.assertIsInstance(ts, utils.Timestamp)
self.assertEqual(1402440452.05458, float(ts))
self.assertEqual('2014-06-10T22:47:32.054580', ts.isoformat)
ts = utils.Timestamp.from_isoformat('1970-01-01T00:00:00.000000')
self.assertIsInstance(ts, utils.Timestamp)
self.assertEqual(0.0, float(ts))
self.assertEqual('1970-01-01T00:00:00.000000', ts.isoformat)
ts = utils.Timestamp(1402440452.05458)
self.assertIsInstance(ts, utils.Timestamp)
self.assertEqual(ts, utils.Timestamp.from_isoformat(ts.isoformat))
def test_ceil(self):
self.assertEqual(0.0, utils.Timestamp(0).ceil())
self.assertEqual(1.0, utils.Timestamp(0.00001).ceil())
self.assertEqual(1.0, utils.Timestamp(0.000001).ceil())
self.assertEqual(12345678.0, utils.Timestamp(12345678.0).ceil())
self.assertEqual(12345679.0, utils.Timestamp(12345678.000001).ceil())
def test_not_equal(self):
ts = '1402436408.91203_0000000000000001'
test_values = (
utils.Timestamp('1402436408.91203_0000000000000002'),
utils.Timestamp('1402436408.91203'),
utils.Timestamp(1402436408.91203),
utils.Timestamp(1402436408.91204),
utils.Timestamp(1402436408.91203, offset=0),
utils.Timestamp(1402436408.91203, offset=2),
)
for value in test_values:
self.assertTrue(value != ts)
self.assertIs(True, utils.Timestamp(ts) == ts) # sanity
self.assertIs(False, utils.Timestamp(ts) != utils.Timestamp(ts))
self.assertIs(False, utils.Timestamp(ts) != ts)
self.assertIs(False, utils.Timestamp(ts) is None)
self.assertIs(True, utils.Timestamp(ts) is not None)
def test_no_force_internal_no_offset(self):
"""Test that internal is the same as normal with no offset"""
with mock.patch('swift.common.utils.FORCE_INTERNAL', new=False):
self.assertEqual(utils.Timestamp(0).internal, '0000000000.00000')
self.assertEqual(utils.Timestamp(1402437380.58186).internal,
'1402437380.58186')
self.assertEqual(utils.Timestamp(1402437380.581859).internal,
'1402437380.58186')
self.assertEqual(utils.Timestamp(0).internal,
utils.normalize_timestamp(0))
def test_no_force_internal_with_offset(self):
"""Test that internal always includes the offset if significant"""
with mock.patch('swift.common.utils.FORCE_INTERNAL', new=False):
self.assertEqual(utils.Timestamp(0, offset=1).internal,
'0000000000.00000_0000000000000001')
self.assertEqual(
utils.Timestamp(1402437380.58186, offset=16).internal,
'1402437380.58186_0000000000000010')
self.assertEqual(
utils.Timestamp(1402437380.581859, offset=240).internal,
'1402437380.58186_00000000000000f0')
self.assertEqual(
utils.Timestamp('1402437380.581859_00000001',
offset=240).internal,
'1402437380.58186_00000000000000f1')
def test_force_internal(self):
"""Test that internal always includes the offset if forced"""
with mock.patch('swift.common.utils.FORCE_INTERNAL', new=True):
self.assertEqual(utils.Timestamp(0).internal,
'0000000000.00000_0000000000000000')
self.assertEqual(utils.Timestamp(1402437380.58186).internal,
'1402437380.58186_0000000000000000')
self.assertEqual(utils.Timestamp(1402437380.581859).internal,
'1402437380.58186_0000000000000000')
self.assertEqual(utils.Timestamp(0, offset=1).internal,
'0000000000.00000_0000000000000001')
self.assertEqual(
utils.Timestamp(1402437380.58186, offset=16).internal,
'1402437380.58186_0000000000000010')
self.assertEqual(
utils.Timestamp(1402437380.581859, offset=16).internal,
'1402437380.58186_0000000000000010')
def test_internal_format_no_offset(self):
expected = '1402436408.91203_0000000000000000'
test_values = (
'1402436408.91203',
'1402436408.91203_00000000',
'1402436408.912030000',
'1402436408.912030000_0000000000000',
'000001402436408.912030000',
'000001402436408.912030000_0000000000',
1402436408.91203,
1402436408.9120300000000000,
1402436408.912029,
1402436408.912029999999999999,
utils.Timestamp(1402436408.91203),
utils.Timestamp(1402436408.91203, offset=0),
utils.Timestamp(1402436408.912029),
utils.Timestamp(1402436408.91202999999999999, offset=0),
utils.Timestamp('1402436408.91203'),
utils.Timestamp('1402436408.91203', offset=0),
utils.Timestamp('1402436408.912029'),
utils.Timestamp('1402436408.912029', offset=0),
utils.Timestamp('1402436408.912029999999999'),
utils.Timestamp('1402436408.912029999999999', offset=0),
)
for value in test_values:
# timestamp instance is always equivalent
self.assertEqual(utils.Timestamp(value), expected)
if utils.FORCE_INTERNAL:
# the FORCE_INTERNAL flag makes the internal format always
# include the offset portion of the timestamp even when it's
# not significant and would be bad during upgrades
self.assertEqual(utils.Timestamp(value).internal, expected)
else:
# unless we FORCE_INTERNAL, when there's no offset the
# internal format is equivalent to the normalized format
self.assertEqual(utils.Timestamp(value).internal,
'1402436408.91203')
def test_internal_format_with_offset(self):
expected = '1402436408.91203_00000000000000f0'
test_values = (
'1402436408.91203_000000f0',
u'1402436408.91203_000000f0',
b'1402436408.91203_000000f0',
'1402436408.912030000_0000000000f0',
'1402436408.912029_000000f0',
'1402436408.91202999999_0000000000f0',
'000001402436408.912030000_000000000f0',
'000001402436408.9120299999_000000000f0',
utils.Timestamp(1402436408.91203, offset=240),
utils.Timestamp(1402436408.912029, offset=240),
utils.Timestamp('1402436408.91203', offset=240),
utils.Timestamp('1402436408.91203_00000000', offset=240),
utils.Timestamp('1402436408.91203_0000000f', offset=225),
utils.Timestamp('1402436408.9120299999', offset=240),
utils.Timestamp('1402436408.9120299999_00000000', offset=240),
utils.Timestamp('1402436408.9120299999_00000010', offset=224),
)
for value in test_values:
timestamp = utils.Timestamp(value)
self.assertEqual(timestamp.internal, expected)
# can compare with offset if the string is internalized
self.assertEqual(timestamp, expected)
# if comparison value only includes the normalized portion and the
# timestamp includes an offset, it is considered greater
normal = utils.Timestamp(expected).normal
self.assertTrue(timestamp > normal,
'%r is not bigger than %r given %r' % (
timestamp, normal, value))
self.assertTrue(timestamp > float(normal),
'%r is not bigger than %f given %r' % (
timestamp, float(normal), value))
def test_short_format_with_offset(self):
expected = '1402436408.91203_f0'
timestamp = utils.Timestamp(1402436408.91203, 0xf0)
self.assertEqual(expected, timestamp.short)
expected = '1402436408.91203'
timestamp = utils.Timestamp(1402436408.91203)
self.assertEqual(expected, timestamp.short)
def test_raw(self):
expected = 140243640891203
timestamp = utils.Timestamp(1402436408.91203)
self.assertEqual(expected, timestamp.raw)
# 'raw' does not include offset
timestamp = utils.Timestamp(1402436408.91203, 0xf0)
self.assertEqual(expected, timestamp.raw)
def test_delta(self):
def _assertWithinBounds(expected, timestamp):
tolerance = 0.00001
minimum = expected - tolerance
maximum = expected + tolerance
self.assertTrue(float(timestamp) > minimum)
self.assertTrue(float(timestamp) < maximum)
timestamp = utils.Timestamp(1402436408.91203, delta=100)
_assertWithinBounds(1402436408.91303, timestamp)
self.assertEqual(140243640891303, timestamp.raw)
timestamp = utils.Timestamp(1402436408.91203, delta=-100)
_assertWithinBounds(1402436408.91103, timestamp)
self.assertEqual(140243640891103, timestamp.raw)
timestamp = utils.Timestamp(1402436408.91203, delta=0)
_assertWithinBounds(1402436408.91203, timestamp)
self.assertEqual(140243640891203, timestamp.raw)
# delta is independent of offset
timestamp = utils.Timestamp(1402436408.91203, offset=42, delta=100)
self.assertEqual(140243640891303, timestamp.raw)
self.assertEqual(42, timestamp.offset)
# cannot go negative
self.assertRaises(ValueError, utils.Timestamp, 1402436408.91203,
delta=-140243640891203)
def test_int(self):
expected = 1402437965
test_values = (
'1402437965.91203',
'1402437965.91203_00000000',
'1402437965.912030000',
'1402437965.912030000_0000000000000',
'000001402437965.912030000',
'000001402437965.912030000_0000000000',
1402437965.91203,
1402437965.9120300000000000,
1402437965.912029,
1402437965.912029999999999999,
utils.Timestamp(1402437965.91203),
utils.Timestamp(1402437965.91203, offset=0),
utils.Timestamp(1402437965.91203, offset=500),
utils.Timestamp(1402437965.912029),
utils.Timestamp(1402437965.91202999999999999, offset=0),
utils.Timestamp(1402437965.91202999999999999, offset=300),
utils.Timestamp('1402437965.91203'),
utils.Timestamp('1402437965.91203', offset=0),
utils.Timestamp('1402437965.91203', offset=400),
utils.Timestamp('1402437965.912029'),
utils.Timestamp('1402437965.912029', offset=0),
utils.Timestamp('1402437965.912029', offset=200),
utils.Timestamp('1402437965.912029999999999'),
utils.Timestamp('1402437965.912029999999999', offset=0),
utils.Timestamp('1402437965.912029999999999', offset=100),
)
for value in test_values:
timestamp = utils.Timestamp(value)
self.assertEqual(int(timestamp), expected)
self.assertTrue(timestamp > expected)
def test_float(self):
expected = 1402438115.91203
test_values = (
'1402438115.91203',
'1402438115.91203_00000000',
'1402438115.912030000',
'1402438115.912030000_0000000000000',
'000001402438115.912030000',
'000001402438115.912030000_0000000000',
1402438115.91203,
1402438115.9120300000000000,
1402438115.912029,
1402438115.912029999999999999,
utils.Timestamp(1402438115.91203),
utils.Timestamp(1402438115.91203, offset=0),
utils.Timestamp(1402438115.91203, offset=500),
utils.Timestamp(1402438115.912029),
utils.Timestamp(1402438115.91202999999999999, offset=0),
utils.Timestamp(1402438115.91202999999999999, offset=300),
utils.Timestamp('1402438115.91203'),
utils.Timestamp('1402438115.91203', offset=0),
utils.Timestamp('1402438115.91203', offset=400),
utils.Timestamp('1402438115.912029'),
utils.Timestamp('1402438115.912029', offset=0),
utils.Timestamp('1402438115.912029', offset=200),
utils.Timestamp('1402438115.912029999999999'),
utils.Timestamp('1402438115.912029999999999', offset=0),
utils.Timestamp('1402438115.912029999999999', offset=100),
)
tolerance = 0.00001
minimum = expected - tolerance
maximum = expected + tolerance
for value in test_values:
timestamp = utils.Timestamp(value)
self.assertTrue(float(timestamp) > minimum,
'%f is not bigger than %f given %r' % (
timestamp, minimum, value))
self.assertTrue(float(timestamp) < maximum,
'%f is not smaller than %f given %r' % (
timestamp, maximum, value))
# direct comparison of timestamp works too
self.assertTrue(timestamp > minimum,
'%s is not bigger than %f given %r' % (
timestamp.normal, minimum, value))
self.assertTrue(timestamp < maximum,
'%s is not smaller than %f given %r' % (
timestamp.normal, maximum, value))
# ... even against strings
self.assertTrue(timestamp > '%f' % minimum,
'%s is not bigger than %s given %r' % (
timestamp.normal, minimum, value))
self.assertTrue(timestamp < '%f' % maximum,
'%s is not smaller than %s given %r' % (
timestamp.normal, maximum, value))
def test_false(self):
self.assertFalse(utils.Timestamp(0))
self.assertFalse(utils.Timestamp(0, offset=0))
self.assertFalse(utils.Timestamp('0'))
self.assertFalse(utils.Timestamp('0', offset=0))
self.assertFalse(utils.Timestamp(0.0))
self.assertFalse(utils.Timestamp(0.0, offset=0))
self.assertFalse(utils.Timestamp('0.0'))
self.assertFalse(utils.Timestamp('0.0', offset=0))
self.assertFalse(utils.Timestamp(00000000.00000000))
self.assertFalse(utils.Timestamp(00000000.00000000, offset=0))
self.assertFalse(utils.Timestamp('00000000.00000000'))
self.assertFalse(utils.Timestamp('00000000.00000000', offset=0))
def test_true(self):
self.assertTrue(utils.Timestamp(1))
self.assertTrue(utils.Timestamp(1, offset=1))
self.assertTrue(utils.Timestamp(0, offset=1))
self.assertTrue(utils.Timestamp('1'))
self.assertTrue(utils.Timestamp('1', offset=1))
self.assertTrue(utils.Timestamp('0', offset=1))
self.assertTrue(utils.Timestamp(1.1))
self.assertTrue(utils.Timestamp(1.1, offset=1))
self.assertTrue(utils.Timestamp(0.0, offset=1))
self.assertTrue(utils.Timestamp('1.1'))
self.assertTrue(utils.Timestamp('1.1', offset=1))
self.assertTrue(utils.Timestamp('0.0', offset=1))
self.assertTrue(utils.Timestamp(11111111.11111111))
self.assertTrue(utils.Timestamp(11111111.11111111, offset=1))
self.assertTrue(utils.Timestamp(00000000.00000000, offset=1))
self.assertTrue(utils.Timestamp('11111111.11111111'))
self.assertTrue(utils.Timestamp('11111111.11111111', offset=1))
self.assertTrue(utils.Timestamp('00000000.00000000', offset=1))
def test_greater_no_offset(self):
now = time.time()
older = now - 1
timestamp = utils.Timestamp(now)
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443112.213252, '1402443112.213252', '1402443112.213252_ffff',
older, '%f' % older, '%f_0000ffff' % older,
)
for value in test_values:
other = utils.Timestamp(value)
self.assertNotEqual(timestamp, other) # sanity
self.assertTrue(timestamp > value,
'%r is not greater than %r given %r' % (
timestamp, value, value))
self.assertTrue(timestamp > other,
'%r is not greater than %r given %r' % (
timestamp, other, value))
self.assertTrue(timestamp > other.normal,
'%r is not greater than %r given %r' % (
timestamp, other.normal, value))
self.assertTrue(timestamp > other.internal,
'%r is not greater than %r given %r' % (
timestamp, other.internal, value))
self.assertTrue(timestamp > float(other),
'%r is not greater than %r given %r' % (
timestamp, float(other), value))
self.assertTrue(timestamp > int(other),
'%r is not greater than %r given %r' % (
timestamp, int(other), value))
def _test_greater_with_offset(self, now, test_values):
for offset in range(1, 1000, 100):
timestamp = utils.Timestamp(now, offset=offset)
for value in test_values:
other = utils.Timestamp(value)
self.assertNotEqual(timestamp, other) # sanity
self.assertTrue(timestamp > value,
'%r is not greater than %r given %r' % (
timestamp, value, value))
self.assertTrue(timestamp > other,
'%r is not greater than %r given %r' % (
timestamp, other, value))
self.assertTrue(timestamp > other.normal,
'%r is not greater than %r given %r' % (
timestamp, other.normal, value))
self.assertTrue(timestamp > other.internal,
'%r is not greater than %r given %r' % (
timestamp, other.internal, value))
self.assertTrue(timestamp > float(other),
'%r is not greater than %r given %r' % (
timestamp, float(other), value))
self.assertTrue(timestamp > int(other),
'%r is not greater than %r given %r' % (
timestamp, int(other), value))
def test_greater_with_offset(self):
# Part 1: use the natural time of the Python. This is deliciously
# unpredictable, but completely legitimate and realistic. Finds bugs!
now = time.time()
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443346.935174, '1402443346.93517', '1402443346.935169_ffff',
older, now,
)
self._test_greater_with_offset(now, test_values)
# Part 2: Same as above, but with fixed time values that reproduce
# specific corner cases.
now = 1519830570.6949348
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443346.935174, '1402443346.93517', '1402443346.935169_ffff',
older, now,
)
self._test_greater_with_offset(now, test_values)
# Part 3: The '%f' problem. Timestamps cannot be converted to %f
# strings, then back to timestamps, then compared with originals.
# You can only "import" a floating point representation once.
now = 1519830570.6949348
now = float('%f' % now)
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
older, '%f' % older, '%f_0000ffff' % older,
now, '%f' % now, '%s_00000000' % now,
)
self._test_greater_with_offset(now, test_values)
def test_smaller_no_offset(self):
now = time.time()
newer = now + 1
timestamp = utils.Timestamp(now)
test_values = (
9999999999.99999, '9999999999.99999', '9999999999.99999_ffff',
newer, '%f' % newer, '%f_0000ffff' % newer,
)
for value in test_values:
other = utils.Timestamp(value)
self.assertNotEqual(timestamp, other) # sanity
self.assertTrue(timestamp < value,
'%r is not smaller than %r given %r' % (
timestamp, value, value))
self.assertTrue(timestamp < other,
'%r is not smaller than %r given %r' % (
timestamp, other, value))
self.assertTrue(timestamp < other.normal,
'%r is not smaller than %r given %r' % (
timestamp, other.normal, value))
self.assertTrue(timestamp < other.internal,
'%r is not smaller than %r given %r' % (
timestamp, other.internal, value))
self.assertTrue(timestamp < float(other),
'%r is not smaller than %r given %r' % (
timestamp, float(other), value))
self.assertTrue(timestamp < int(other),
'%r is not smaller than %r given %r' % (
timestamp, int(other), value))
def test_smaller_with_offset(self):
now = time.time()
newer = now + 1
test_values = (
9999999999.99999, '9999999999.99999', '9999999999.99999_ffff',
newer, '%f' % newer, '%f_0000ffff' % newer,
)
for offset in range(1, 1000, 100):
timestamp = utils.Timestamp(now, offset=offset)
for value in test_values:
other = utils.Timestamp(value)
self.assertNotEqual(timestamp, other) # sanity
self.assertTrue(timestamp < value,
'%r is not smaller than %r given %r' % (
timestamp, value, value))
self.assertTrue(timestamp < other,
'%r is not smaller than %r given %r' % (
timestamp, other, value))
self.assertTrue(timestamp < other.normal,
'%r is not smaller than %r given %r' % (
timestamp, other.normal, value))
self.assertTrue(timestamp < other.internal,
'%r is not smaller than %r given %r' % (
timestamp, other.internal, value))
self.assertTrue(timestamp < float(other),
'%r is not smaller than %r given %r' % (
timestamp, float(other), value))
self.assertTrue(timestamp < int(other),
'%r is not smaller than %r given %r' % (
timestamp, int(other), value))
def test_cmp_with_none(self):
self.assertGreater(utils.Timestamp(0), None)
self.assertGreater(utils.Timestamp(1.0), None)
self.assertGreater(utils.Timestamp(1.0, 42), None)
def test_ordering(self):
given = [
'1402444820.62590_000000000000000a',
'1402444820.62589_0000000000000001',
'1402444821.52589_0000000000000004',
'1402444920.62589_0000000000000004',
'1402444821.62589_000000000000000a',
'1402444821.72589_000000000000000a',
'1402444920.62589_0000000000000002',
'1402444820.62589_0000000000000002',
'1402444820.62589_000000000000000a',
'1402444820.62590_0000000000000004',
'1402444920.62589_000000000000000a',
'1402444820.62590_0000000000000002',
'1402444821.52589_0000000000000002',
'1402444821.52589_0000000000000000',
'1402444920.62589',
'1402444821.62589_0000000000000004',
'1402444821.72589_0000000000000001',
'1402444820.62590',
'1402444820.62590_0000000000000001',
'1402444820.62589_0000000000000004',
'1402444821.72589_0000000000000000',
'1402444821.52589_000000000000000a',
'1402444821.72589_0000000000000004',
'1402444821.62589',
'1402444821.52589_0000000000000001',
'1402444821.62589_0000000000000001',
'1402444821.62589_0000000000000002',
'1402444821.72589_0000000000000002',
'1402444820.62589',
'1402444920.62589_0000000000000001']
expected = [
'1402444820.62589',
'1402444820.62589_0000000000000001',
'1402444820.62589_0000000000000002',
'1402444820.62589_0000000000000004',
'1402444820.62589_000000000000000a',
'1402444820.62590',
'1402444820.62590_0000000000000001',
'1402444820.62590_0000000000000002',
'1402444820.62590_0000000000000004',
'1402444820.62590_000000000000000a',
'1402444821.52589',
'1402444821.52589_0000000000000001',
'1402444821.52589_0000000000000002',
'1402444821.52589_0000000000000004',
'1402444821.52589_000000000000000a',
'1402444821.62589',
'1402444821.62589_0000000000000001',
'1402444821.62589_0000000000000002',
'1402444821.62589_0000000000000004',
'1402444821.62589_000000000000000a',
'1402444821.72589',
'1402444821.72589_0000000000000001',
'1402444821.72589_0000000000000002',
'1402444821.72589_0000000000000004',
'1402444821.72589_000000000000000a',
'1402444920.62589',
'1402444920.62589_0000000000000001',
'1402444920.62589_0000000000000002',
'1402444920.62589_0000000000000004',
'1402444920.62589_000000000000000a',
]
# less visual version
"""
now = time.time()
given = [
utils.Timestamp(now + i, offset=offset).internal
for i in (0, 0.00001, 0.9, 1.0, 1.1, 100.0)
for offset in (0, 1, 2, 4, 10)
]
expected = [t for t in given]
random.shuffle(given)
"""
self.assertEqual(len(given), len(expected)) # sanity
timestamps = [utils.Timestamp(t) for t in given]
# our expected values don't include insignificant offsets
with mock.patch('swift.common.utils.FORCE_INTERNAL', new=False):
self.assertEqual(
[t.internal for t in sorted(timestamps)], expected)
# string sorting works as well
self.assertEqual(
sorted([t.internal for t in timestamps]), expected)
def test_hashable(self):
ts_0 = utils.Timestamp('1402444821.72589')
ts_0_also = utils.Timestamp('1402444821.72589')
self.assertEqual(ts_0, ts_0_also) # sanity
self.assertEqual(hash(ts_0), hash(ts_0_also))
d = {ts_0: 'whatever'}
self.assertIn(ts_0, d) # sanity
self.assertIn(ts_0_also, d)
def test_out_of_range_comparisons(self):
now = utils.Timestamp.now()
def check_is_later(val):
self.assertTrue(now != val)
self.assertFalse(now == val)
self.assertTrue(now <= val)
self.assertTrue(now < val)
self.assertTrue(val > now)
self.assertTrue(val >= now)
check_is_later(1e30)
check_is_later(1579753284000) # someone gave us ms instead of s!
check_is_later('1579753284000')
check_is_later(b'1e15')
check_is_later(u'1.e+10_f')
def check_is_earlier(val):
self.assertTrue(now != val)
self.assertFalse(now == val)
self.assertTrue(now >= val)
self.assertTrue(now > val)
self.assertTrue(val < now)
self.assertTrue(val <= now)
check_is_earlier(-1)
check_is_earlier(-0.1)
check_is_earlier('-9999999')
check_is_earlier(b'-9999.999')
check_is_earlier(u'-1234_5678')
def test_inversion(self):
ts = utils.Timestamp(0)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual((~ts).internal, '9999999999.99999')
ts = utils.Timestamp(123456.789)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual(ts.internal, '0000123456.78900')
self.assertEqual((~ts).internal, '9999876543.21099')
timestamps = sorted(utils.Timestamp(random.random() * 1e10)
for _ in range(20))
self.assertEqual([x.internal for x in timestamps],
sorted(x.internal for x in timestamps))
self.assertEqual([(~x).internal for x in reversed(timestamps)],
sorted((~x).internal for x in timestamps))
ts = utils.Timestamp.now()
self.assertGreater(~ts, ts) # NB: will break around 2128
ts = utils.Timestamp.now(offset=1)
with self.assertRaises(ValueError) as caught:
~ts
self.assertEqual(caught.exception.args[0],
'Cannot invert timestamps with offsets')
class TestTimestampEncoding(unittest.TestCase):
def setUp(self):
t0 = utils.Timestamp(0.0)
t1 = utils.Timestamp(997.9996)
t2 = utils.Timestamp(999)
t3 = utils.Timestamp(1000, 24)
t4 = utils.Timestamp(1001)
t5 = utils.Timestamp(1002.00040)
# encodings that are expected when explicit = False
self.non_explicit_encodings = (
('0000001000.00000_18', (t3, t3, t3)),
('0000001000.00000_18', (t3, t3, None)),
)
# mappings that are expected when explicit = True
self.explicit_encodings = (
('0000001000.00000_18+0+0', (t3, t3, t3)),
('0000001000.00000_18+0', (t3, t3, None)),
)
# mappings that are expected when explicit = True or False
self.encodings = (
('0000001000.00000_18+0+186a0', (t3, t3, t4)),
('0000001000.00000_18+186a0+186c8', (t3, t4, t5)),
('0000001000.00000_18-186a0+0', (t3, t2, t2)),
('0000001000.00000_18+0-186a0', (t3, t3, t2)),
('0000001000.00000_18-186a0-186c8', (t3, t2, t1)),
('0000001000.00000_18', (t3, None, None)),
('0000001000.00000_18+186a0', (t3, t4, None)),
('0000001000.00000_18-186a0', (t3, t2, None)),
('0000001000.00000_18', (t3, None, t1)),
('0000001000.00000_18-5f5e100', (t3, t0, None)),
('0000001000.00000_18+0-5f5e100', (t3, t3, t0)),
('0000001000.00000_18-5f5e100+5f45a60', (t3, t0, t2)),
)
# decodings that are expected when explicit = False
self.non_explicit_decodings = (
('0000001000.00000_18', (t3, t3, t3)),
('0000001000.00000_18+186a0', (t3, t4, t4)),
('0000001000.00000_18-186a0', (t3, t2, t2)),
('0000001000.00000_18+186a0', (t3, t4, t4)),
('0000001000.00000_18-186a0', (t3, t2, t2)),
('0000001000.00000_18-5f5e100', (t3, t0, t0)),
)
# decodings that are expected when explicit = True
self.explicit_decodings = (
('0000001000.00000_18+0+0', (t3, t3, t3)),
('0000001000.00000_18+0', (t3, t3, None)),
('0000001000.00000_18', (t3, None, None)),
('0000001000.00000_18+186a0', (t3, t4, None)),
('0000001000.00000_18-186a0', (t3, t2, None)),
('0000001000.00000_18-5f5e100', (t3, t0, None)),
)
# decodings that are expected when explicit = True or False
self.decodings = (
('0000001000.00000_18+0+186a0', (t3, t3, t4)),
('0000001000.00000_18+186a0+186c8', (t3, t4, t5)),
('0000001000.00000_18-186a0+0', (t3, t2, t2)),
('0000001000.00000_18+0-186a0', (t3, t3, t2)),
('0000001000.00000_18-186a0-186c8', (t3, t2, t1)),
('0000001000.00000_18-5f5e100+5f45a60', (t3, t0, t2)),
)
def _assertEqual(self, expected, actual, test):
self.assertEqual(expected, actual,
'Got %s but expected %s for parameters %s'
% (actual, expected, test))
def test_encoding(self):
for test in self.explicit_encodings:
actual = utils.encode_timestamps(test[1][0], test[1][1],
test[1][2], True)
self._assertEqual(test[0], actual, test[1])
for test in self.non_explicit_encodings:
actual = utils.encode_timestamps(test[1][0], test[1][1],
test[1][2], False)
self._assertEqual(test[0], actual, test[1])
for explicit in (True, False):
for test in self.encodings:
actual = utils.encode_timestamps(test[1][0], test[1][1],
test[1][2], explicit)
self._assertEqual(test[0], actual, test[1])
def test_decoding(self):
for test in self.explicit_decodings:
actual = utils.decode_timestamps(test[0], True)
self._assertEqual(test[1], actual, test[0])
for test in self.non_explicit_decodings:
actual = utils.decode_timestamps(test[0], False)
self._assertEqual(test[1], actual, test[0])
for explicit in (True, False):
for test in self.decodings:
actual = utils.decode_timestamps(test[0], explicit)
self._assertEqual(test[1], actual, test[0])
class TestUtils(unittest.TestCase):
"""Tests for swift.common.utils """

View File

View File

@ -0,0 +1,882 @@
# Copyright (c) 2010-2023 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for swift.common.utils.timestamp"""
import random
import time
import unittest
import mock
from swift.common.utils import timestamp
class TestTimestamp(unittest.TestCase):
"""Tests for swift.common.utils.timestamp.Timestamp"""
def test_invalid_input(self):
with self.assertRaises(ValueError):
timestamp.Timestamp(time.time(), offset=-1)
with self.assertRaises(ValueError):
timestamp.Timestamp('123.456_78_90')
def test_invalid_string_conversion(self):
t = timestamp.Timestamp.now()
self.assertRaises(TypeError, str, t)
def test_offset_limit(self):
t = 1417462430.78693
# can't have a offset above MAX_OFFSET
with self.assertRaises(ValueError):
timestamp.Timestamp(t, offset=timestamp.MAX_OFFSET + 1)
# exactly max offset is fine
ts = timestamp.Timestamp(t, offset=timestamp.MAX_OFFSET)
self.assertEqual(ts.internal, '1417462430.78693_ffffffffffffffff')
# but you can't offset it further
with self.assertRaises(ValueError):
timestamp.Timestamp(ts.internal, offset=1)
# unless you start below it
ts = timestamp.Timestamp(t, offset=timestamp.MAX_OFFSET - 1)
self.assertEqual(timestamp.Timestamp(ts.internal, offset=1),
'1417462430.78693_ffffffffffffffff')
def test_normal_format_no_offset(self):
expected = '1402436408.91203'
test_values = (
'1402436408.91203',
'1402436408.91203_00000000',
'1402436408.912030000',
'1402436408.912030000_0000000000000',
'000001402436408.912030000',
'000001402436408.912030000_0000000000',
1402436408.91203,
1402436408.912029,
1402436408.9120300000000000,
1402436408.91202999999999999,
timestamp.Timestamp(1402436408.91203),
timestamp.Timestamp(1402436408.91203, offset=0),
timestamp.Timestamp(1402436408.912029),
timestamp.Timestamp(1402436408.912029, offset=0),
timestamp.Timestamp('1402436408.91203'),
timestamp.Timestamp('1402436408.91203', offset=0),
timestamp.Timestamp('1402436408.91203_00000000'),
timestamp.Timestamp('1402436408.91203_00000000', offset=0),
)
for value in test_values:
ts = timestamp.Timestamp(value)
self.assertEqual(ts.normal, expected)
# timestamp instance can also compare to string or float
self.assertEqual(ts, expected)
self.assertEqual(ts, float(expected))
self.assertEqual(ts, timestamp.normalize_timestamp(expected))
def test_isoformat(self):
expected = '2014-06-10T22:47:32.054580'
test_values = (
'1402440452.05458',
'1402440452.054579',
'1402440452.05458_00000000',
'1402440452.054579_00000000',
'1402440452.054580000',
'1402440452.054579999',
'1402440452.054580000_0000000000000',
'1402440452.054579999_0000ff00',
'000001402440452.054580000',
'000001402440452.0545799',
'000001402440452.054580000_0000000000',
'000001402440452.054579999999_00000fffff',
1402440452.05458,
1402440452.054579,
1402440452.0545800000000000,
1402440452.054579999,
timestamp.Timestamp(1402440452.05458),
timestamp.Timestamp(1402440452.0545799),
timestamp.Timestamp(1402440452.05458, offset=0),
timestamp.Timestamp(1402440452.05457999999, offset=0),
timestamp.Timestamp(1402440452.05458, offset=100),
timestamp.Timestamp(1402440452.054579, offset=100),
timestamp.Timestamp('1402440452.05458'),
timestamp.Timestamp('1402440452.054579999'),
timestamp.Timestamp('1402440452.05458', offset=0),
timestamp.Timestamp('1402440452.054579', offset=0),
timestamp.Timestamp('1402440452.05458', offset=300),
timestamp.Timestamp('1402440452.05457999', offset=300),
timestamp.Timestamp('1402440452.05458_00000000'),
timestamp.Timestamp('1402440452.05457999_00000000'),
timestamp.Timestamp('1402440452.05458_00000000', offset=0),
timestamp.Timestamp('1402440452.05457999_00000aaa', offset=0),
timestamp.Timestamp('1402440452.05458_00000000', offset=400),
timestamp.Timestamp('1402440452.054579_0a', offset=400),
)
for value in test_values:
self.assertEqual(timestamp.Timestamp(value).isoformat, expected)
expected = '1970-01-01T00:00:00.000000'
test_values = (
'0',
'0000000000.00000',
'0000000000.00000_ffffffffffff',
0,
0.0,
)
for value in test_values:
self.assertEqual(timestamp.Timestamp(value).isoformat, expected)
def test_from_isoformat(self):
ts = timestamp.Timestamp.from_isoformat('2014-06-10T22:47:32.054580')
self.assertIsInstance(ts, timestamp.Timestamp)
self.assertEqual(1402440452.05458, float(ts))
self.assertEqual('2014-06-10T22:47:32.054580', ts.isoformat)
ts = timestamp.Timestamp.from_isoformat('1970-01-01T00:00:00.000000')
self.assertIsInstance(ts, timestamp.Timestamp)
self.assertEqual(0.0, float(ts))
self.assertEqual('1970-01-01T00:00:00.000000', ts.isoformat)
ts = timestamp.Timestamp(1402440452.05458)
self.assertIsInstance(ts, timestamp.Timestamp)
self.assertEqual(ts, timestamp.Timestamp.from_isoformat(ts.isoformat))
def test_ceil(self):
self.assertEqual(0.0, timestamp.Timestamp(0).ceil())
self.assertEqual(1.0, timestamp.Timestamp(0.00001).ceil())
self.assertEqual(1.0, timestamp.Timestamp(0.000001).ceil())
self.assertEqual(12345678.0, timestamp.Timestamp(12345678.0).ceil())
self.assertEqual(12345679.0,
timestamp.Timestamp(12345678.000001).ceil())
def test_not_equal(self):
ts = '1402436408.91203_0000000000000001'
test_values = (
timestamp.Timestamp('1402436408.91203_0000000000000002'),
timestamp.Timestamp('1402436408.91203'),
timestamp.Timestamp(1402436408.91203),
timestamp.Timestamp(1402436408.91204),
timestamp.Timestamp(1402436408.91203, offset=0),
timestamp.Timestamp(1402436408.91203, offset=2),
)
for value in test_values:
self.assertTrue(value != ts)
self.assertIs(True, timestamp.Timestamp(ts) == ts) # sanity
self.assertIs(False,
timestamp.Timestamp(ts) != timestamp.Timestamp(ts))
self.assertIs(False, timestamp.Timestamp(ts) != ts)
self.assertIs(False, timestamp.Timestamp(ts) is None)
self.assertIs(True, timestamp.Timestamp(ts) is not None)
def test_no_force_internal_no_offset(self):
"""Test that internal is the same as normal with no offset"""
with mock.patch('swift.common.utils.timestamp.FORCE_INTERNAL',
new=False):
self.assertEqual(timestamp.Timestamp(0).internal,
'0000000000.00000')
self.assertEqual(timestamp.Timestamp(1402437380.58186).internal,
'1402437380.58186')
self.assertEqual(timestamp.Timestamp(1402437380.581859).internal,
'1402437380.58186')
self.assertEqual(timestamp.Timestamp(0).internal,
timestamp.normalize_timestamp(0))
def test_no_force_internal_with_offset(self):
"""Test that internal always includes the offset if significant"""
with mock.patch('swift.common.utils.timestamp.FORCE_INTERNAL',
new=False):
self.assertEqual(timestamp.Timestamp(0, offset=1).internal,
'0000000000.00000_0000000000000001')
self.assertEqual(
timestamp.Timestamp(1402437380.58186, offset=16).internal,
'1402437380.58186_0000000000000010')
self.assertEqual(
timestamp.Timestamp(1402437380.581859, offset=240).internal,
'1402437380.58186_00000000000000f0')
self.assertEqual(
timestamp.Timestamp('1402437380.581859_00000001',
offset=240).internal,
'1402437380.58186_00000000000000f1')
def test_force_internal(self):
"""Test that internal always includes the offset if forced"""
with mock.patch('swift.common.utils.timestamp.FORCE_INTERNAL',
new=True):
self.assertEqual(timestamp.Timestamp(0).internal,
'0000000000.00000_0000000000000000')
self.assertEqual(timestamp.Timestamp(1402437380.58186).internal,
'1402437380.58186_0000000000000000')
self.assertEqual(timestamp.Timestamp(1402437380.581859).internal,
'1402437380.58186_0000000000000000')
self.assertEqual(timestamp.Timestamp(0, offset=1).internal,
'0000000000.00000_0000000000000001')
self.assertEqual(
timestamp.Timestamp(1402437380.58186, offset=16).internal,
'1402437380.58186_0000000000000010')
self.assertEqual(
timestamp.Timestamp(1402437380.581859, offset=16).internal,
'1402437380.58186_0000000000000010')
def test_internal_format_no_offset(self):
expected = '1402436408.91203_0000000000000000'
test_values = (
'1402436408.91203',
'1402436408.91203_00000000',
'1402436408.912030000',
'1402436408.912030000_0000000000000',
'000001402436408.912030000',
'000001402436408.912030000_0000000000',
1402436408.91203,
1402436408.9120300000000000,
1402436408.912029,
1402436408.912029999999999999,
timestamp.Timestamp(1402436408.91203),
timestamp.Timestamp(1402436408.91203, offset=0),
timestamp.Timestamp(1402436408.912029),
timestamp.Timestamp(1402436408.91202999999999999, offset=0),
timestamp.Timestamp('1402436408.91203'),
timestamp.Timestamp('1402436408.91203', offset=0),
timestamp.Timestamp('1402436408.912029'),
timestamp.Timestamp('1402436408.912029', offset=0),
timestamp.Timestamp('1402436408.912029999999999'),
timestamp.Timestamp('1402436408.912029999999999', offset=0),
)
for value in test_values:
# timestamp instance is always equivalent
self.assertEqual(timestamp.Timestamp(value), expected)
if timestamp.FORCE_INTERNAL:
# the FORCE_INTERNAL flag makes the internal format always
# include the offset portion of the timestamp even when it's
# not significant and would be bad during upgrades
self.assertEqual(timestamp.Timestamp(value).internal, expected)
else:
# unless we FORCE_INTERNAL, when there's no offset the
# internal format is equivalent to the normalized format
self.assertEqual(timestamp.Timestamp(value).internal,
'1402436408.91203')
def test_internal_format_with_offset(self):
expected = '1402436408.91203_00000000000000f0'
test_values = (
'1402436408.91203_000000f0',
u'1402436408.91203_000000f0',
b'1402436408.91203_000000f0',
'1402436408.912030000_0000000000f0',
'1402436408.912029_000000f0',
'1402436408.91202999999_0000000000f0',
'000001402436408.912030000_000000000f0',
'000001402436408.9120299999_000000000f0',
timestamp.Timestamp(1402436408.91203, offset=240),
timestamp.Timestamp(1402436408.912029, offset=240),
timestamp.Timestamp('1402436408.91203', offset=240),
timestamp.Timestamp('1402436408.91203_00000000', offset=240),
timestamp.Timestamp('1402436408.91203_0000000f', offset=225),
timestamp.Timestamp('1402436408.9120299999', offset=240),
timestamp.Timestamp('1402436408.9120299999_00000000', offset=240),
timestamp.Timestamp('1402436408.9120299999_00000010', offset=224),
)
for value in test_values:
ts = timestamp.Timestamp(value)
self.assertEqual(ts.internal, expected)
# can compare with offset if the string is internalized
self.assertEqual(ts, expected)
# if comparison value only includes the normalized portion and the
# timestamp includes an offset, it is considered greater
normal = timestamp.Timestamp(expected).normal
self.assertTrue(ts > normal,
'%r is not bigger than %r given %r' % (
ts, normal, value))
self.assertTrue(ts > float(normal),
'%r is not bigger than %f given %r' % (
ts, float(normal), value))
def test_short_format_with_offset(self):
expected = '1402436408.91203_f0'
ts = timestamp.Timestamp(1402436408.91203, 0xf0)
self.assertEqual(expected, ts.short)
expected = '1402436408.91203'
ts = timestamp.Timestamp(1402436408.91203)
self.assertEqual(expected, ts.short)
def test_raw(self):
expected = 140243640891203
ts = timestamp.Timestamp(1402436408.91203)
self.assertEqual(expected, ts.raw)
# 'raw' does not include offset
ts = timestamp.Timestamp(1402436408.91203, 0xf0)
self.assertEqual(expected, ts.raw)
def test_delta(self):
def _assertWithinBounds(expected, timestamp):
tolerance = 0.00001
minimum = expected - tolerance
maximum = expected + tolerance
self.assertTrue(float(timestamp) > minimum)
self.assertTrue(float(timestamp) < maximum)
ts = timestamp.Timestamp(1402436408.91203, delta=100)
_assertWithinBounds(1402436408.91303, ts)
self.assertEqual(140243640891303, ts.raw)
ts = timestamp.Timestamp(1402436408.91203, delta=-100)
_assertWithinBounds(1402436408.91103, ts)
self.assertEqual(140243640891103, ts.raw)
ts = timestamp.Timestamp(1402436408.91203, delta=0)
_assertWithinBounds(1402436408.91203, ts)
self.assertEqual(140243640891203, ts.raw)
# delta is independent of offset
ts = timestamp.Timestamp(1402436408.91203, offset=42, delta=100)
self.assertEqual(140243640891303, ts.raw)
self.assertEqual(42, ts.offset)
# cannot go negative
self.assertRaises(ValueError, timestamp.Timestamp, 1402436408.91203,
delta=-140243640891203)
def test_int(self):
expected = 1402437965
test_values = (
'1402437965.91203',
'1402437965.91203_00000000',
'1402437965.912030000',
'1402437965.912030000_0000000000000',
'000001402437965.912030000',
'000001402437965.912030000_0000000000',
1402437965.91203,
1402437965.9120300000000000,
1402437965.912029,
1402437965.912029999999999999,
timestamp.Timestamp(1402437965.91203),
timestamp.Timestamp(1402437965.91203, offset=0),
timestamp.Timestamp(1402437965.91203, offset=500),
timestamp.Timestamp(1402437965.912029),
timestamp.Timestamp(1402437965.91202999999999999, offset=0),
timestamp.Timestamp(1402437965.91202999999999999, offset=300),
timestamp.Timestamp('1402437965.91203'),
timestamp.Timestamp('1402437965.91203', offset=0),
timestamp.Timestamp('1402437965.91203', offset=400),
timestamp.Timestamp('1402437965.912029'),
timestamp.Timestamp('1402437965.912029', offset=0),
timestamp.Timestamp('1402437965.912029', offset=200),
timestamp.Timestamp('1402437965.912029999999999'),
timestamp.Timestamp('1402437965.912029999999999', offset=0),
timestamp.Timestamp('1402437965.912029999999999', offset=100),
)
for value in test_values:
ts = timestamp.Timestamp(value)
self.assertEqual(int(ts), expected)
self.assertTrue(ts > expected)
def test_float(self):
expected = 1402438115.91203
test_values = (
'1402438115.91203',
'1402438115.91203_00000000',
'1402438115.912030000',
'1402438115.912030000_0000000000000',
'000001402438115.912030000',
'000001402438115.912030000_0000000000',
1402438115.91203,
1402438115.9120300000000000,
1402438115.912029,
1402438115.912029999999999999,
timestamp.Timestamp(1402438115.91203),
timestamp.Timestamp(1402438115.91203, offset=0),
timestamp.Timestamp(1402438115.91203, offset=500),
timestamp.Timestamp(1402438115.912029),
timestamp.Timestamp(1402438115.91202999999999999, offset=0),
timestamp.Timestamp(1402438115.91202999999999999, offset=300),
timestamp.Timestamp('1402438115.91203'),
timestamp.Timestamp('1402438115.91203', offset=0),
timestamp.Timestamp('1402438115.91203', offset=400),
timestamp.Timestamp('1402438115.912029'),
timestamp.Timestamp('1402438115.912029', offset=0),
timestamp.Timestamp('1402438115.912029', offset=200),
timestamp.Timestamp('1402438115.912029999999999'),
timestamp.Timestamp('1402438115.912029999999999', offset=0),
timestamp.Timestamp('1402438115.912029999999999', offset=100),
)
tolerance = 0.00001
minimum = expected - tolerance
maximum = expected + tolerance
for value in test_values:
ts = timestamp.Timestamp(value)
self.assertTrue(float(ts) > minimum,
'%f is not bigger than %f given %r' % (
ts, minimum, value))
self.assertTrue(float(ts) < maximum,
'%f is not smaller than %f given %r' % (
ts, maximum, value))
# direct comparison of timestamp works too
self.assertTrue(ts > minimum,
'%s is not bigger than %f given %r' % (
ts.normal, minimum, value))
self.assertTrue(ts < maximum,
'%s is not smaller than %f given %r' % (
ts.normal, maximum, value))
# ... even against strings
self.assertTrue(ts > '%f' % minimum,
'%s is not bigger than %s given %r' % (
ts.normal, minimum, value))
self.assertTrue(ts < '%f' % maximum,
'%s is not smaller than %s given %r' % (
ts.normal, maximum, value))
def test_false(self):
self.assertFalse(timestamp.Timestamp(0))
self.assertFalse(timestamp.Timestamp(0, offset=0))
self.assertFalse(timestamp.Timestamp('0'))
self.assertFalse(timestamp.Timestamp('0', offset=0))
self.assertFalse(timestamp.Timestamp(0.0))
self.assertFalse(timestamp.Timestamp(0.0, offset=0))
self.assertFalse(timestamp.Timestamp('0.0'))
self.assertFalse(timestamp.Timestamp('0.0', offset=0))
self.assertFalse(timestamp.Timestamp(00000000.00000000))
self.assertFalse(timestamp.Timestamp(00000000.00000000, offset=0))
self.assertFalse(timestamp.Timestamp('00000000.00000000'))
self.assertFalse(timestamp.Timestamp('00000000.00000000', offset=0))
def test_true(self):
self.assertTrue(timestamp.Timestamp(1))
self.assertTrue(timestamp.Timestamp(1, offset=1))
self.assertTrue(timestamp.Timestamp(0, offset=1))
self.assertTrue(timestamp.Timestamp('1'))
self.assertTrue(timestamp.Timestamp('1', offset=1))
self.assertTrue(timestamp.Timestamp('0', offset=1))
self.assertTrue(timestamp.Timestamp(1.1))
self.assertTrue(timestamp.Timestamp(1.1, offset=1))
self.assertTrue(timestamp.Timestamp(0.0, offset=1))
self.assertTrue(timestamp.Timestamp('1.1'))
self.assertTrue(timestamp.Timestamp('1.1', offset=1))
self.assertTrue(timestamp.Timestamp('0.0', offset=1))
self.assertTrue(timestamp.Timestamp(11111111.11111111))
self.assertTrue(timestamp.Timestamp(11111111.11111111, offset=1))
self.assertTrue(timestamp.Timestamp(00000000.00000000, offset=1))
self.assertTrue(timestamp.Timestamp('11111111.11111111'))
self.assertTrue(timestamp.Timestamp('11111111.11111111', offset=1))
self.assertTrue(timestamp.Timestamp('00000000.00000000', offset=1))
def test_greater_no_offset(self):
now = time.time()
older = now - 1
ts = timestamp.Timestamp(now)
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443112.213252, '1402443112.213252', '1402443112.213252_ffff',
older, '%f' % older, '%f_0000ffff' % older,
)
for value in test_values:
other = timestamp.Timestamp(value)
self.assertNotEqual(ts, other) # sanity
self.assertTrue(ts > value,
'%r is not greater than %r given %r' % (
ts, value, value))
self.assertTrue(ts > other,
'%r is not greater than %r given %r' % (
ts, other, value))
self.assertTrue(ts > other.normal,
'%r is not greater than %r given %r' % (
ts, other.normal, value))
self.assertTrue(ts > other.internal,
'%r is not greater than %r given %r' % (
ts, other.internal, value))
self.assertTrue(ts > float(other),
'%r is not greater than %r given %r' % (
ts, float(other), value))
self.assertTrue(ts > int(other),
'%r is not greater than %r given %r' % (
ts, int(other), value))
def _test_greater_with_offset(self, now, test_values):
for offset in range(1, 1000, 100):
ts = timestamp.Timestamp(now, offset=offset)
for value in test_values:
other = timestamp.Timestamp(value)
self.assertNotEqual(ts, other) # sanity
self.assertTrue(ts > value,
'%r is not greater than %r given %r' % (
ts, value, value))
self.assertTrue(ts > other,
'%r is not greater than %r given %r' % (
ts, other, value))
self.assertTrue(ts > other.normal,
'%r is not greater than %r given %r' % (
ts, other.normal, value))
self.assertTrue(ts > other.internal,
'%r is not greater than %r given %r' % (
ts, other.internal, value))
self.assertTrue(ts > float(other),
'%r is not greater than %r given %r' % (
ts, float(other), value))
self.assertTrue(ts > int(other),
'%r is not greater than %r given %r' % (
ts, int(other), value))
def test_greater_with_offset(self):
# Part 1: use the natural time of the Python. This is deliciously
# unpredictable, but completely legitimate and realistic. Finds bugs!
now = time.time()
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443346.935174, '1402443346.93517', '1402443346.935169_ffff',
older, now,
)
self._test_greater_with_offset(now, test_values)
# Part 2: Same as above, but with fixed time values that reproduce
# specific corner cases.
now = 1519830570.6949348
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
1402443346.935174, '1402443346.93517', '1402443346.935169_ffff',
older, now,
)
self._test_greater_with_offset(now, test_values)
# Part 3: The '%f' problem. Timestamps cannot be converted to %f
# strings, then back to timestamps, then compared with originals.
# You can only "import" a floating point representation once.
now = 1519830570.6949348
now = float('%f' % now)
older = now - 1
test_values = (
0, '0', 0.0, '0.0', '0000.0000', '000.000_000',
1, '1', 1.1, '1.1', '1111.1111', '111.111_111',
older, '%f' % older, '%f_0000ffff' % older,
now, '%f' % now, '%s_00000000' % now,
)
self._test_greater_with_offset(now, test_values)
def test_smaller_no_offset(self):
now = time.time()
newer = now + 1
ts = timestamp.Timestamp(now)
test_values = (
9999999999.99999, '9999999999.99999', '9999999999.99999_ffff',
newer, '%f' % newer, '%f_0000ffff' % newer,
)
for value in test_values:
other = timestamp.Timestamp(value)
self.assertNotEqual(ts, other) # sanity
self.assertTrue(ts < value,
'%r is not smaller than %r given %r' % (
ts, value, value))
self.assertTrue(ts < other,
'%r is not smaller than %r given %r' % (
ts, other, value))
self.assertTrue(ts < other.normal,
'%r is not smaller than %r given %r' % (
ts, other.normal, value))
self.assertTrue(ts < other.internal,
'%r is not smaller than %r given %r' % (
ts, other.internal, value))
self.assertTrue(ts < float(other),
'%r is not smaller than %r given %r' % (
ts, float(other), value))
self.assertTrue(ts < int(other),
'%r is not smaller than %r given %r' % (
ts, int(other), value))
def test_smaller_with_offset(self):
now = time.time()
newer = now + 1
test_values = (
9999999999.99999, '9999999999.99999', '9999999999.99999_ffff',
newer, '%f' % newer, '%f_0000ffff' % newer,
)
for offset in range(1, 1000, 100):
ts = timestamp.Timestamp(now, offset=offset)
for value in test_values:
other = timestamp.Timestamp(value)
self.assertNotEqual(ts, other) # sanity
self.assertTrue(ts < value,
'%r is not smaller than %r given %r' % (
ts, value, value))
self.assertTrue(ts < other,
'%r is not smaller than %r given %r' % (
ts, other, value))
self.assertTrue(ts < other.normal,
'%r is not smaller than %r given %r' % (
ts, other.normal, value))
self.assertTrue(ts < other.internal,
'%r is not smaller than %r given %r' % (
ts, other.internal, value))
self.assertTrue(ts < float(other),
'%r is not smaller than %r given %r' % (
ts, float(other), value))
self.assertTrue(ts < int(other),
'%r is not smaller than %r given %r' % (
ts, int(other), value))
def test_cmp_with_none(self):
self.assertGreater(timestamp.Timestamp(0), None)
self.assertGreater(timestamp.Timestamp(1.0), None)
self.assertGreater(timestamp.Timestamp(1.0, 42), None)
def test_ordering(self):
given = [
'1402444820.62590_000000000000000a',
'1402444820.62589_0000000000000001',
'1402444821.52589_0000000000000004',
'1402444920.62589_0000000000000004',
'1402444821.62589_000000000000000a',
'1402444821.72589_000000000000000a',
'1402444920.62589_0000000000000002',
'1402444820.62589_0000000000000002',
'1402444820.62589_000000000000000a',
'1402444820.62590_0000000000000004',
'1402444920.62589_000000000000000a',
'1402444820.62590_0000000000000002',
'1402444821.52589_0000000000000002',
'1402444821.52589_0000000000000000',
'1402444920.62589',
'1402444821.62589_0000000000000004',
'1402444821.72589_0000000000000001',
'1402444820.62590',
'1402444820.62590_0000000000000001',
'1402444820.62589_0000000000000004',
'1402444821.72589_0000000000000000',
'1402444821.52589_000000000000000a',
'1402444821.72589_0000000000000004',
'1402444821.62589',
'1402444821.52589_0000000000000001',
'1402444821.62589_0000000000000001',
'1402444821.62589_0000000000000002',
'1402444821.72589_0000000000000002',
'1402444820.62589',
'1402444920.62589_0000000000000001']
expected = [
'1402444820.62589',
'1402444820.62589_0000000000000001',
'1402444820.62589_0000000000000002',
'1402444820.62589_0000000000000004',
'1402444820.62589_000000000000000a',
'1402444820.62590',
'1402444820.62590_0000000000000001',
'1402444820.62590_0000000000000002',
'1402444820.62590_0000000000000004',
'1402444820.62590_000000000000000a',
'1402444821.52589',
'1402444821.52589_0000000000000001',
'1402444821.52589_0000000000000002',
'1402444821.52589_0000000000000004',
'1402444821.52589_000000000000000a',
'1402444821.62589',
'1402444821.62589_0000000000000001',
'1402444821.62589_0000000000000002',
'1402444821.62589_0000000000000004',
'1402444821.62589_000000000000000a',
'1402444821.72589',
'1402444821.72589_0000000000000001',
'1402444821.72589_0000000000000002',
'1402444821.72589_0000000000000004',
'1402444821.72589_000000000000000a',
'1402444920.62589',
'1402444920.62589_0000000000000001',
'1402444920.62589_0000000000000002',
'1402444920.62589_0000000000000004',
'1402444920.62589_000000000000000a',
]
# less visual version
"""
now = time.time()
given = [
timestamp.Timestamp(now + i, offset=offset).internal
for i in (0, 0.00001, 0.9, 1.0, 1.1, 100.0)
for offset in (0, 1, 2, 4, 10)
]
expected = [t for t in given]
random.shuffle(given)
"""
self.assertEqual(len(given), len(expected)) # sanity
timestamps = [timestamp.Timestamp(t) for t in given]
# our expected values don't include insignificant offsets
with mock.patch('swift.common.utils.timestamp.FORCE_INTERNAL',
new=False):
self.assertEqual(
[t.internal for t in sorted(timestamps)], expected)
# string sorting works as well
self.assertEqual(
sorted([t.internal for t in timestamps]), expected)
def test_hashable(self):
ts_0 = timestamp.Timestamp('1402444821.72589')
ts_0_also = timestamp.Timestamp('1402444821.72589')
self.assertEqual(ts_0, ts_0_also) # sanity
self.assertEqual(hash(ts_0), hash(ts_0_also))
d = {ts_0: 'whatever'}
self.assertIn(ts_0, d) # sanity
self.assertIn(ts_0_also, d)
def test_out_of_range_comparisons(self):
now = timestamp.Timestamp.now()
def check_is_later(val):
self.assertTrue(now != val)
self.assertFalse(now == val)
self.assertTrue(now <= val)
self.assertTrue(now < val)
self.assertTrue(val > now)
self.assertTrue(val >= now)
check_is_later(1e30)
check_is_later(1579753284000) # someone gave us ms instead of s!
check_is_later('1579753284000')
check_is_later(b'1e15')
check_is_later(u'1.e+10_f')
def check_is_earlier(val):
self.assertTrue(now != val)
self.assertFalse(now == val)
self.assertTrue(now >= val)
self.assertTrue(now > val)
self.assertTrue(val < now)
self.assertTrue(val <= now)
check_is_earlier(-1)
check_is_earlier(-0.1)
check_is_earlier('-9999999')
check_is_earlier(b'-9999.999')
check_is_earlier(u'-1234_5678')
def test_inversion(self):
ts = timestamp.Timestamp(0)
self.assertIsInstance(~ts, timestamp.Timestamp)
self.assertEqual((~ts).internal, '9999999999.99999')
ts = timestamp.Timestamp(123456.789)
self.assertIsInstance(~ts, timestamp.Timestamp)
self.assertEqual(ts.internal, '0000123456.78900')
self.assertEqual((~ts).internal, '9999876543.21099')
timestamps = sorted(timestamp.Timestamp(random.random() * 1e10)
for _ in range(20))
self.assertEqual([x.internal for x in timestamps],
sorted(x.internal for x in timestamps))
self.assertEqual([(~x).internal for x in reversed(timestamps)],
sorted((~x).internal for x in timestamps))
ts = timestamp.Timestamp.now()
self.assertGreater(~ts, ts) # NB: will break around 2128
ts = timestamp.Timestamp.now(offset=1)
with self.assertRaises(ValueError) as caught:
~ts
self.assertEqual(caught.exception.args[0],
'Cannot invert timestamps with offsets')
class TestTimestampEncoding(unittest.TestCase):
def setUp(self):
t0 = timestamp.Timestamp(0.0)
t1 = timestamp.Timestamp(997.9996)
t2 = timestamp.Timestamp(999)
t3 = timestamp.Timestamp(1000, 24)
t4 = timestamp.Timestamp(1001)
t5 = timestamp.Timestamp(1002.00040)
# encodings that are expected when explicit = False
self.non_explicit_encodings = (
('0000001000.00000_18', (t3, t3, t3)),
('0000001000.00000_18', (t3, t3, None)),
)
# mappings that are expected when explicit = True
self.explicit_encodings = (
('0000001000.00000_18+0+0', (t3, t3, t3)),
('0000001000.00000_18+0', (t3, t3, None)),
)
# mappings that are expected when explicit = True or False
self.encodings = (
('0000001000.00000_18+0+186a0', (t3, t3, t4)),
('0000001000.00000_18+186a0+186c8', (t3, t4, t5)),
('0000001000.00000_18-186a0+0', (t3, t2, t2)),
('0000001000.00000_18+0-186a0', (t3, t3, t2)),
('0000001000.00000_18-186a0-186c8', (t3, t2, t1)),
('0000001000.00000_18', (t3, None, None)),
('0000001000.00000_18+186a0', (t3, t4, None)),
('0000001000.00000_18-186a0', (t3, t2, None)),
('0000001000.00000_18', (t3, None, t1)),
('0000001000.00000_18-5f5e100', (t3, t0, None)),
('0000001000.00000_18+0-5f5e100', (t3, t3, t0)),
('0000001000.00000_18-5f5e100+5f45a60', (t3, t0, t2)),
)
# decodings that are expected when explicit = False
self.non_explicit_decodings = (
('0000001000.00000_18', (t3, t3, t3)),
('0000001000.00000_18+186a0', (t3, t4, t4)),
('0000001000.00000_18-186a0', (t3, t2, t2)),
('0000001000.00000_18+186a0', (t3, t4, t4)),
('0000001000.00000_18-186a0', (t3, t2, t2)),
('0000001000.00000_18-5f5e100', (t3, t0, t0)),
)
# decodings that are expected when explicit = True
self.explicit_decodings = (
('0000001000.00000_18+0+0', (t3, t3, t3)),
('0000001000.00000_18+0', (t3, t3, None)),
('0000001000.00000_18', (t3, None, None)),
('0000001000.00000_18+186a0', (t3, t4, None)),
('0000001000.00000_18-186a0', (t3, t2, None)),
('0000001000.00000_18-5f5e100', (t3, t0, None)),
)
# decodings that are expected when explicit = True or False
self.decodings = (
('0000001000.00000_18+0+186a0', (t3, t3, t4)),
('0000001000.00000_18+186a0+186c8', (t3, t4, t5)),
('0000001000.00000_18-186a0+0', (t3, t2, t2)),
('0000001000.00000_18+0-186a0', (t3, t3, t2)),
('0000001000.00000_18-186a0-186c8', (t3, t2, t1)),
('0000001000.00000_18-5f5e100+5f45a60', (t3, t0, t2)),
)
def _assertEqual(self, expected, actual, test):
self.assertEqual(expected, actual,
'Got %s but expected %s for parameters %s'
% (actual, expected, test))
def test_encoding(self):
for test in self.explicit_encodings:
actual = timestamp.encode_timestamps(test[1][0], test[1][1],
test[1][2], True)
self._assertEqual(test[0], actual, test[1])
for test in self.non_explicit_encodings:
actual = timestamp.encode_timestamps(test[1][0], test[1][1],
test[1][2], False)
self._assertEqual(test[0], actual, test[1])
for explicit in (True, False):
for test in self.encodings:
actual = timestamp.encode_timestamps(test[1][0], test[1][1],
test[1][2], explicit)
self._assertEqual(test[0], actual, test[1])
def test_decoding(self):
for test in self.explicit_decodings:
actual = timestamp.decode_timestamps(test[0], True)
self._assertEqual(test[1], actual, test[0])
for test in self.non_explicit_decodings:
actual = timestamp.decode_timestamps(test[0], False)
self._assertEqual(test[1], actual, test[0])
for explicit in (True, False):
for test in self.decodings:
actual = timestamp.decode_timestamps(test[0], explicit)
self._assertEqual(test[1], actual, test[0])