# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file is heavily based off of LUCI It's been adopted to remove
# AppEngine-ism.
"""Mixed bag of utilities."""
from datetime import datetime
from datetime import date
from datetime import time
from datetime import timedelta
from email import utils as email_utils
import binascii
import functools
import hashlib
import numbers
import os
import sys
import threading
import urllib.parse
from import timestamp_pb2
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
DATETIME_FORMAT = u'%Y-%m-%d %H:%M:%S'
DATE_FORMAT = u'%Y-%m-%d'
VALID_DATETIME_FORMATS = ('%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S')
# UTC datetime corresponding to zero Unix timestamp.
EPOCH = datetime.utcfromtimestamp(0)
# Module to run task queue tasks on by default. Used by get_task_queue_host
# function. Can be changed by 'set_task_queue_module' function.
_task_queue_module = 'backend'
def should_disable_ui_routes():
return os.environ.get('LUCI_DISABLE_UI_ROUTES', '0') == '1'
def get_request_as_int(request, key, default, min_value, max_value):
"""Returns a request value as int."""
value = request.params.get(key, '')
value = int(value)
except ValueError:
return default
return min(max_value, max(min_value, value))
def parse_datetime(text):
"""Converts text to datetime.datetime instance or None."""
return datetime.strptime(text, f)
except ValueError:
return None
def parse_rfc3339_datetime(value):
"""Parses RFC 3339 datetime string (as used in Timestamp proto JSON encoding).
Keeps only microsecond precision (dropping nanoseconds).
Examples of the input:
datetime.datetime in UTC (regardless of timezone of the original string).
ValueError on errors.
# Adapted from protobuf/internal/ Timestamp.FromJsonString.
# We can't use the original, since it's marked as internal. Also instantiating
# proto messages here to parse a string would been odd.
timezone_offset = value.find('Z')
if timezone_offset == -1:
timezone_offset = value.find('+')
if timezone_offset == -1:
timezone_offset = value.rfind('-')
if timezone_offset == -1:
raise ValueError('Failed to parse timestamp: missing valid timezone offset')
time_value = value[0:timezone_offset]
# Parse datetime and nanos.
point_position = time_value.find('.')
if point_position == -1:
second_value = time_value
nano_value = ''
second_value = time_value[:point_position]
nano_value = time_value[point_position + 1:]
date_object = datetime.strptime(second_value, '%Y-%m-%dT%H:%M:%S')
td = date_object - EPOCH
seconds = td.seconds + td.days * 86400
if len(nano_value) > 9:
raise ValueError(
'Failed to parse timestamp: nanos %r more than 9 fractional digits' %
if nano_value:
nanos = round(float('0.' + nano_value) * 1e9)
nanos = 0
# Parse timezone offsets.
if value[timezone_offset] == 'Z':
if len(value) != timezone_offset + 1:
raise ValueError(
'Failed to parse timestamp: invalid trailing data %r' % value)
timezone = value[timezone_offset:]
pos = timezone.find(':')
if pos == -1:
raise ValueError('Invalid timezone offset value: %r' % timezone)
if timezone[0] == '+':
seconds -= (int(timezone[1:pos]) * 60 + int(timezone[pos + 1:])) * 60
seconds += (int(timezone[1:pos]) * 60 + int(timezone[pos + 1:])) * 60
return timestamp_to_datetime(int(seconds) * 1e6 + int(nanos) // 1e3)
def TimestampToDatetime(input_time):
"""Converts seconds in google.protobuf.Timestamp to readable format.
input_time: google.protobuf.Timestamp instance.
datetime.datetime instance corresponding to input_time.
if input_time and input_time.seconds != 0:
return datetime.fromtimestamp(input_time.seconds)
return None
def DatetimeToTimestamp(input_date, end_of_day=False):
"""Converts object to Timestamp instance.
input_date: instance to be converted.
end_of_day: Boolean indicating whether the Timestamp correponds to the
end of the date in input_date.
A Timestamp instance corresponding to the specific date.
assert isinstance(input_date, date)
if end_of_day:
datetime_instance = datetime.combine(input_date, time(23, 59))
datetime_instance = datetime.combine(input_date, time(0, 0))
micros = datetime_to_timestamp(datetime_instance)
return timestamp_pb2.Timestamp(seconds=micros // (1000 * 1000))
def constant_time_equals(a, b):
"""Compares two strings in constant time regardless of theirs content."""
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
def to_units(number):
"""Convert a string to numbers."""
UNITS = ('', 'k', 'm', 'g', 't', 'p', 'e', 'z', 'y')
unit = 0
while number >= 1024.:
unit += 1
number = number // 1024.
if unit == len(UNITS) - 1:
if unit:
return '%.2f%s' % (number, UNITS[unit])
return '%d' % number
def validate_root_service_url(url):
"""Raises ValueError if the URL doesn't look like https://<host>."""
schemes = ('https', 'http')
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in schemes:
raise ValueError('unsupported protocol %r' % str(parsed.scheme))
if not parsed.netloc:
raise ValueError('missing hostname')
stripped = urllib.parse.urlunparse((parsed[0], parsed[1], '', '', '', ''))
if stripped != url:
raise ValueError('expecting root host URL, e.g. %r)' % str(stripped))
### Time
def utcnow():
"""Returns datetime.utcnow(), used for testing.
Use this function so it can be mocked everywhere.
return datetime.utcnow()
def time_time():
"""Returns the equivalent of time.time() as mocked if applicable."""
return (utcnow() - EPOCH).total_seconds()
def milliseconds_since_epoch(now):
"""Returns the number of milliseconds since unix epoch as an int."""
now = now or utcnow()
return int(round((now - EPOCH).total_seconds() * 1000.))
def datetime_to_rfc2822(dt):
"""datetime -> string value for Last-Modified header as defined by RFC2822."""
if not isinstance(dt, datetime):
raise TypeError(
'Expecting datetime object, got %s instead' % type(dt).__name__)
assert dt.tzinfo is None, 'Expecting UTC timestamp: %s' % dt
return email_utils.formatdate(datetime_to_timestamp(dt) // 1000000)
def datetime_to_timestamp(value):
"""Converts UTC datetime to integer timestamp in microseconds since epoch."""
if not isinstance(value, datetime):
raise ValueError(
'Expecting datetime object, got %s instead' % type(value).__name__)
if value.tzinfo is not None:
raise ValueError('Only UTC datetime is supported')
dt = value - EPOCH
return dt.microseconds + 1000 * 1000 * (dt.seconds + 24 * 3600 * dt.days)
def timestamp_to_datetime(value):
"""Converts integer timestamp in microseconds since epoch to UTC datetime."""
if not isinstance(value, numbers.Real):
raise ValueError(
'Expecting a number, got %s instead' % type(value).__name__)
return EPOCH + timedelta(microseconds=value)
### Cache
class _Cache(object):
"""Holds state of a cache for cache_with_expiration and cache decorators.
May call func more than once.
Thread- and NDB tasklet-safe.
def __init__(self, func, expiration_sec):
self.func = func
self.expiration_sec = expiration_sec
self.lock = threading.Lock()
self.value = None
self.value_is_set = False
self.expires = None
def get_value(self):
"""Returns a cached value refreshing it if it has expired."""
with self.lock:
if self.value_is_set and (not self.expires or time_time() < self.expires):
return self.value
new_value = self.func()
with self.lock:
self.value = new_value
self.value_is_set = True
if self.expiration_sec:
self.expires = time_time() + self.expiration_sec
return self.value
def clear(self):
"""Clears stored cached value."""
with self.lock:
self.value = None
self.value_is_set = False
self.expires = None
def get_wrapper(self):
"""Returns a callable object that can be used in place of |func|.
It's basically self.get_value, updated by functools.wraps to look more like
original function.
# functools.wraps doesn't like 'instancemethod', use lambda as a proxy.
# pylint: disable=W0108
wrapper = functools.wraps(self.func)(lambda: self.get_value())
wrapper.__parent_cache__ = self
return wrapper
def cache(func):
"""Decorator that implements permanent cache of a zero-parameter function."""
return _Cache(func, None).get_wrapper()
def cache_with_expiration(expiration_sec):
"""Decorator that implements in-memory cache for a zero-parameter function."""
def decorator(func):
return _Cache(func, expiration_sec).get_wrapper()
return decorator
def clear_cache(func):
"""Given a function decorated with @cache, resets cached value."""
## General
def get_token_fingerprint(blob):
"""Given a blob with a token returns first 16 bytes of its SHA256 as hex.
It can be used to identify this particular token in logs without revealing it.
assert isinstance(blob, str)
if isinstance(blob, str):
blob = blob.encode('ascii', 'ignore')
return binascii.hexlify(hashlib.sha256(blob).digest()[:16])