blob: 034e1ec3b3a41addd5a6356e0af95a3d55598c2b [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import base64
import collections
import copy
import json
import logging
import re
import socket
import time
import httplib2
import oauth2client.client
import six
from six.moves import http_client as httplib
from googleapiclient import errors
from infra_libs.ts_mon.common import http_metrics
# TODO(nxia): upgrade oauth2client to 4.1.2.
oauth2client_util_imported = False
from oauth2client import util
oauth2client_util_imported = True
except ImportError:
# default timeout for http requests, in seconds
class AuthError(Exception):
class DelegateServiceAccountCredentials(
"""Authorizes an HTTP client with a service account for which we are an actor.
This class uses the IAM API to sign a JWT with the private key of another
service account for which we have the "Service Account Actor" role.
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
def __init__(self, http, service_account_email, scopes, project='-'):
http: An httplib2.Http object that is authorized by another
oauth2client.client.OAuth2Credentials with credentials that have the
service account actor role on the service_account_email.
service_account_email: The email address of the service account for which
to obtain an access token.
scopes: The desired scopes for the token.
project: The cloud project to which service_account_email belongs. The
default of '-' makes the IAM API figure it out for us.
if not oauth2client_util_imported:
raise AssertionError('Failed to import oauth2client.util.')
super(DelegateServiceAccountCredentials, self).__init__(None)
self._service_account_email = service_account_email
self._scopes = util.scopes_to_string(scopes)
self._http = http
self._name = 'projects/%s/serviceAccounts/%s' % (
project, service_account_email)
def sign_blob(self, blob):
response, content = self._http.request(
self._SIGN_BLOB_URL % self._name,
body=json.dumps({'bytesToSign': base64.b64encode(blob)}),
headers={'Content-Type': 'application/json'})
if response.status != 200:
raise AuthError('Failed to sign blob as %s: %d %s' % (
self._service_account_email, response.status, response.reason))
data = json.loads(content)
return data['keyId'], data['signature']
def _generate_assertion(self):
# This is copied with small modifications from
# oauth2client.service_account._ServiceAccountCredentials.
header = {
'alg': 'RS256',
'typ': 'JWT',
now = int(time.time())
payload = {
'aud': self.token_uri,
'scope': self._scopes,
'iat': now,
'exp': now + self.MAX_TOKEN_LIFETIME_SECS,
'iss': self._service_account_email,
assertion_input = (
self._urlsafe_b64encode(header) + b'.' +
# Sign the assertion.
_, rsa_bytes = self.sign_blob(assertion_input)
signature = rsa_bytes.rstrip(b'=')
return assertion_input + b'.' + signature
def _urlsafe_b64encode(self, data):
# Copied verbatim from oauth2client.service_account.
return base64.urlsafe_b64encode(
json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=')
class RetriableHttp(object):
"""A httplib2.Http object that retries on failure."""
def __init__(self, http, max_tries=5, backoff_time=1,
http: an httplib2.Http instance
max_tries: a number of maximum tries
backoff_time: a number of seconds to sleep between retries
retrying_statuses_fn: a function that returns True if a given status
should be retried
self._http = http
self._max_tries = max_tries
self._backoff_time = backoff_time
self._retrying_statuses_fn = retrying_statuses_fn or \
def request(self, uri, method='GET', body=None, *args, **kwargs):
for i in range(1, self._max_tries + 1):
response, content = self._http.request(uri, method, body, *args,
if self._retrying_statuses_fn(response.status):'RetriableHttp: attempt %d receiving status %d, %s',
i, response.status,
'final attempt' if i == self._max_tries else \
'will retry')
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error) as error:'RetriableHttp: attempt %d received exception: %s, %s',
i, error, 'final attempt' if i == self._max_tries else \
'will retry')
if i == self._max_tries:
return response, content
def __getattr__(self, name):
return getattr(self._http, name)
def __setattr__(self, name, value):
if name in ('request', '_http', '_max_tries', '_backoff_time',
self.__dict__[name] = value
setattr(self._http, name, value)
class InstrumentedHttp(httplib2.Http):
"""A httplib2.Http object that reports ts_mon metrics about its requests."""
def __init__(self, name, time_fn=time.time, timeout=DEFAULT_TIMEOUT,
name: An identifier for the HTTP requests made by this object.
time_fn: Function returning the current time in seconds. Use for testing
purposes only.
super(InstrumentedHttp, self).__init__(timeout=timeout, **kwargs)
self.fields = {'name': name, 'client': 'httplib2'}
self.time_fn = time_fn
def _update_metrics(self, status, start_time):
status_fields = {'status': status}
duration_msec = (self.time_fn() - start_time) * 1000
http_metrics.durations.add(duration_msec, fields=self.fields)
def request(self, uri, method="GET", body=None, *args, **kwargs):
request_bytes = 0
if body is not None:
request_bytes = len(body)
http_metrics.request_bytes.add(request_bytes, fields=self.fields)
start_time = self.time_fn()
response, content = super(InstrumentedHttp, self).request(
uri, method, body, *args, **kwargs)
except socket.timeout:
self._update_metrics(http_metrics.STATUS_TIMEOUT, start_time)
except (socket.error, socket.herror, socket.gaierror):
self._update_metrics(http_metrics.STATUS_ERROR, start_time)
except (httplib.HTTPException, httplib2.HttpLib2Error) as ex:
status = http_metrics.STATUS_EXCEPTION
if 'Deadline exceeded while waiting for HTTP response' in str(ex):
# Raised on Appengine (gae_override/
status = http_metrics.STATUS_TIMEOUT
self._update_metrics(status, start_time)
http_metrics.response_bytes.add(len(content), fields=self.fields)
self._update_metrics(response.status, start_time)
return response, content
class HttpMock(object):
"""Mock of httplib2.Http"""
HttpCall = collections.namedtuple('HttpCall', ('uri', 'method', 'body',
def __init__(self, uris):
uris(dict): list of (uri, headers, body). `uri` is a regexp for
matching the requested uri, (headers, body) gives the values returned
by the mock. Uris are tested in the order from `uris`.
`headers` is a dict mapping headers to value. The 'status' key is
mandatory. `body` is a string.
Ex: [('.*', {'status': 200}, 'nicely done.')]
self._uris = []
self.requests_made = []
for value in uris:
if not isinstance(value, (list, tuple)) or len(value) != 3:
raise ValueError("'uris' must be a sequence of (uri, headers, body)")
uri, headers, body = value
compiled_uri = re.compile(uri)
if not isinstance(headers, dict):
raise TypeError("'headers' must be a dict")
if not 'status' in headers:
raise ValueError("'headers' must have 'status' as a key")
new_headers = copy.copy(headers)
new_headers['status'] = int(new_headers['status'])
if not isinstance(body, six.string_types):
raise TypeError("'body' must be a string, got %s" % type(body))
self._uris.append((compiled_uri, new_headers, body))
# pylint: disable=unused-argument
def request(self, uri,
self.requests_made.append(self.HttpCall(uri, method, body, headers))
headers = None
body = None
for candidate in self._uris:
if candidate[0].match(uri):
_, headers, body = candidate
if not headers:
raise AssertionError("Unexpected request to %s" % uri)
return httplib2.Response(headers), body