blob: 178ad36594fe66d5b1734702a87dcd260de85ce7 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file is heavily based off of LUCI util.py. It's been adopted to remove
# AppEngine-ism.
"""Mixed bag of utilities."""
from __future__ import print_function
from datetime import datetime
from datetime import date
from datetime import time
from datetime import timedelta
from email import utils as email_utils
import binascii
import functools
import hashlib
import numbers
import os
import sys
import threading
from google.protobuf import timestamp_pb2
import six
from six.moves import urllib
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
DATETIME_FORMAT = u'%Y-%m-%d %H:%M:%S'
DATE_FORMAT = u'%Y-%m-%d'
VALID_DATETIME_FORMATS = ('%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S')
# UTC datetime corresponding to zero Unix timestamp.
EPOCH = datetime.utcfromtimestamp(0)
# Module to run task queue tasks on by default. Used by get_task_queue_host
# function. Can be changed by 'set_task_queue_module' function.
_task_queue_module = 'backend'
def should_disable_ui_routes():
return os.environ.get('LUCI_DISABLE_UI_ROUTES', '0') == '1'
def get_request_as_int(request, key, default, min_value, max_value):
"""Returns a request value as int."""
value = request.params.get(key, '')
try:
value = int(value)
except ValueError:
return default
return min(max_value, max(min_value, value))
def parse_datetime(text):
"""Converts text to datetime.datetime instance or None."""
for f in VALID_DATETIME_FORMATS:
try:
return datetime.strptime(text, f)
except ValueError:
continue
return None
def parse_rfc3339_datetime(value):
"""Parses RFC 3339 datetime string (as used in Timestamp proto JSON encoding).
Keeps only microsecond precision (dropping nanoseconds).
Examples of the input:
2017-08-17T04:21:32.722952943Z
1972-01-01T10:00:20.021-05:00
Returns:
datetime.datetime in UTC (regardless of timezone of the original string).
Raises:
ValueError on errors.
"""
# Adapted from protobuf/internal/well_known_types.py Timestamp.FromJsonString.
# We can't use the original, since it's marked as internal. Also instantiating
# proto messages here to parse a string would been odd.
timezone_offset = value.find('Z')
if timezone_offset == -1:
timezone_offset = value.find('+')
if timezone_offset == -1:
timezone_offset = value.rfind('-')
if timezone_offset == -1:
raise ValueError('Failed to parse timestamp: missing valid timezone offset')
time_value = value[0:timezone_offset]
# Parse datetime and nanos.
point_position = time_value.find('.')
if point_position == -1:
second_value = time_value
nano_value = ''
else:
second_value = time_value[:point_position]
nano_value = time_value[point_position + 1:]
date_object = datetime.strptime(second_value, '%Y-%m-%dT%H:%M:%S')
td = date_object - EPOCH
seconds = td.seconds + td.days * 86400
if len(nano_value) > 9:
raise ValueError(
'Failed to parse timestamp: nanos %r more than 9 fractional digits' %
nano_value)
if nano_value:
nanos = round(float('0.' + nano_value) * 1e9)
else:
nanos = 0
# Parse timezone offsets.
if value[timezone_offset] == 'Z':
if len(value) != timezone_offset + 1:
raise ValueError(
'Failed to parse timestamp: invalid trailing data %r' % value)
else:
timezone = value[timezone_offset:]
pos = timezone.find(':')
if pos == -1:
raise ValueError('Invalid timezone offset value: %r' % timezone)
if timezone[0] == '+':
seconds -= (int(timezone[1:pos]) * 60 + int(timezone[pos + 1:])) * 60
else:
seconds += (int(timezone[1:pos]) * 60 + int(timezone[pos + 1:])) * 60
return timestamp_to_datetime(int(seconds) * 1e6 + int(nanos) // 1e3)
def TimestampToDatetime(input_time):
"""Converts seconds in google.protobuf.Timestamp to readable format.
Args:
input_time: google.protobuf.Timestamp instance.
Returns:
datetime.datetime instance corresponding to input_time.
"""
if input_time and input_time.seconds != 0:
return datetime.fromtimestamp(input_time.seconds)
else:
return None
def DatetimeToTimestamp(input_date, end_of_day=False):
"""Converts datetime.date object to Timestamp instance.
Args:
input_date: datetime.date instance to be converted.
end_of_day: Boolean indicating whether the Timestamp correponds to the
end of the date in input_date.
Returns:
A Timestamp instance corresponding to the specific date.
"""
assert isinstance(input_date, date)
if end_of_day:
datetime_instance = datetime.combine(input_date, time(23, 59))
else:
datetime_instance = datetime.combine(input_date, time(0, 0))
micros = datetime_to_timestamp(datetime_instance)
return timestamp_pb2.Timestamp(seconds=micros // (1000 * 1000))
def constant_time_equals(a, b):
"""Compares two strings in constant time regardless of theirs content."""
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
def to_units(number):
"""Convert a string to numbers."""
UNITS = ('', 'k', 'm', 'g', 't', 'p', 'e', 'z', 'y')
unit = 0
while number >= 1024.:
unit += 1
number = number // 1024.
if unit == len(UNITS) - 1:
break
if unit:
return '%.2f%s' % (number, UNITS[unit])
return '%d' % number
def validate_root_service_url(url):
"""Raises ValueError if the URL doesn't look like https://<host>."""
schemes = ('https', 'http')
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in schemes:
raise ValueError('unsupported protocol %r' % str(parsed.scheme))
if not parsed.netloc:
raise ValueError('missing hostname')
stripped = urllib.parse.urlunparse((parsed[0], parsed[1], '', '', '', ''))
if stripped != url:
raise ValueError('expecting root host URL, e.g. %r)' % str(stripped))
### Time
def utcnow():
"""Returns datetime.utcnow(), used for testing.
Use this function so it can be mocked everywhere.
"""
return datetime.utcnow()
def time_time():
"""Returns the equivalent of time.time() as mocked if applicable."""
return (utcnow() - EPOCH).total_seconds()
def milliseconds_since_epoch(now):
"""Returns the number of milliseconds since unix epoch as an int."""
now = now or utcnow()
return int(round((now - EPOCH).total_seconds() * 1000.))
def datetime_to_rfc2822(dt):
"""datetime -> string value for Last-Modified header as defined by RFC2822."""
if not isinstance(dt, datetime):
raise TypeError(
'Expecting datetime object, got %s instead' % type(dt).__name__)
assert dt.tzinfo is None, 'Expecting UTC timestamp: %s' % dt
return email_utils.formatdate(datetime_to_timestamp(dt) // 1000000)
def datetime_to_timestamp(value):
"""Converts UTC datetime to integer timestamp in microseconds since epoch."""
if not isinstance(value, datetime):
raise ValueError(
'Expecting datetime object, got %s instead' % type(value).__name__)
if value.tzinfo is not None:
raise ValueError('Only UTC datetime is supported')
dt = value - EPOCH
return dt.microseconds + 1000 * 1000 * (dt.seconds + 24 * 3600 * dt.days)
def timestamp_to_datetime(value):
"""Converts integer timestamp in microseconds since epoch to UTC datetime."""
if not isinstance(value, numbers.Real):
raise ValueError(
'Expecting a number, got %s instead' % type(value).__name__)
return EPOCH + timedelta(microseconds=value)
### Cache
class _Cache(object):
"""Holds state of a cache for cache_with_expiration and cache decorators.
May call func more than once.
Thread- and NDB tasklet-safe.
"""
def __init__(self, func, expiration_sec):
self.func = func
self.expiration_sec = expiration_sec
self.lock = threading.Lock()
self.value = None
self.value_is_set = False
self.expires = None
def get_value(self):
"""Returns a cached value refreshing it if it has expired."""
with self.lock:
if self.value_is_set and (not self.expires or time_time() < self.expires):
return self.value
new_value = self.func()
with self.lock:
self.value = new_value
self.value_is_set = True
if self.expiration_sec:
self.expires = time_time() + self.expiration_sec
return self.value
def clear(self):
"""Clears stored cached value."""
with self.lock:
self.value = None
self.value_is_set = False
self.expires = None
def get_wrapper(self):
"""Returns a callable object that can be used in place of |func|.
It's basically self.get_value, updated by functools.wraps to look more like
original function.
"""
# functools.wraps doesn't like 'instancemethod', use lambda as a proxy.
# pylint: disable=W0108
wrapper = functools.wraps(self.func)(lambda: self.get_value())
wrapper.__parent_cache__ = self
return wrapper
def cache(func):
"""Decorator that implements permanent cache of a zero-parameter function."""
return _Cache(func, None).get_wrapper()
def cache_with_expiration(expiration_sec):
"""Decorator that implements in-memory cache for a zero-parameter function."""
def decorator(func):
return _Cache(func, expiration_sec).get_wrapper()
return decorator
def clear_cache(func):
"""Given a function decorated with @cache, resets cached value."""
func.__parent_cache__.clear()
## General
def get_token_fingerprint(blob):
"""Given a blob with a token returns first 16 bytes of its SHA256 as hex.
It can be used to identify this particular token in logs without revealing it.
"""
assert isinstance(blob, six.string_types)
if isinstance(blob, six.text_type):
blob = blob.encode('ascii', 'ignore')
return binascii.hexlify(hashlib.sha256(blob).digest()[:16])
## Hacks
def fix_protobuf_package():
"""Modifies 'google' package to include path to 'google.protobuf' package.
Prefer our own proto package on the server. Note that this functions is not
used on the Swarming bot nor any other client.
"""
# google.__path__[0] will be google_appengine/google.
import google
if len(google.__path__) > 1:
return
# We do not mind what 'google' get used, inject protobuf in there.
path = os.path.join(THIS_DIR, 'third_party', 'protobuf', 'google')
google.__path__.append(path)
# six is needed for oauth2client and webtest (local testing).
six_path = os.path.join(THIS_DIR, 'third_party', 'six')
if six_path not in sys.path:
sys.path.insert(0, six_path)
def import_jinja2():
"""Remove any existing jinja2 package and add ours."""
for i in sys.path[:]:
if os.path.basename(i) == 'jinja2':
sys.path.remove(i)
sys.path.append(os.path.join(THIS_DIR, 'third_party'))