blob: 38a483e04e2b3adb14828b4b9b57e0c4034efdba [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Classes representing the monitoring interface for tasks or devices."""
import json
import logging
import socket
import httplib2
from googleapiclient import errors
from infra_libs import httplib2_utils
from infra_libs.ts_mon.common import pb_to_popo
try: # pragma: no cover
from oauth2client import gce
except ImportError: # pragma: no cover
from oauth2client.contrib import gce
from oauth2client.client import GoogleCredentials
from oauth2client.file import Storage
# Special string that can be passed through as the credentials path to use the
# default Appengine or GCE service account.
APPENGINE_CREDENTIALS = ':appengine'
GCE_CREDENTIALS = ':gce'
class CredentialFactory(object):
"""Base class for things that can create OAuth2Credentials."""
@classmethod
def from_string(cls, path):
"""Creates an appropriate subclass from a file path or magic string."""
if path == APPENGINE_CREDENTIALS:
return AppengineCredentials()
if path == GCE_CREDENTIALS:
return GCECredentials()
return FileCredentials(path)
def create(self, scopes):
raise NotImplementedError
class GCECredentials(CredentialFactory):
def create(self, scopes):
return gce.AppAssertionCredentials(scopes)
class AppengineCredentials(CredentialFactory):
def create(self, scopes): # pragma: no cover
# This import doesn't work outside appengine, so delay it until it's used.
from oauth2client import appengine
return appengine.AppAssertionCredentials(scopes)
class FileCredentials(CredentialFactory):
def __init__(self, path):
self.path = path
def create(self, scopes):
with open(self.path, 'r') as fh:
data = json.load(fh)
if data.get('type', None):
credentials = GoogleCredentials.from_stream(self.path)
credentials = credentials.create_scoped(scopes)
return credentials
return Storage(self.path).get()
class DelegateServiceAccountCredentials(CredentialFactory):
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
def __init__(self, service_account_email, base):
self.base = base
self.service_account_email = service_account_email
def create(self, scopes):
logging.info('Delegating to service account %s', self.service_account_email)
http = httplib2_utils.InstrumentedHttp('actor-credentials')
http = self.base.create([self.IAM_SCOPE]).authorize(http)
return httplib2_utils.DelegateServiceAccountCredentials(
http, self.service_account_email, scopes)
class Monitor(object):
"""Abstract base class encapsulating the ability to collect and send metrics.
This is a singleton class. There should only be one instance of a Monitor at
a time. It will be created and initialized by process_argparse_options. It
must exist in order for any metrics to be sent, although both Targets and
Metrics may be initialized before the underlying Monitor. If it does not exist
at the time that a Metric is sent, an exception will be raised.
send() can be either synchronous or asynchronous. If synchronous, it needs to
make the HTTP request, wait for a response and return None.
If asynchronous, send() should start the request and immediately return some
object which is later passed to wait() once all requests have been started.
"""
_SCOPES = []
def send(self, metric_pb):
raise NotImplementedError()
def wait(self, state): # pragma: no cover
pass
class HttpsMonitor(Monitor):
_SCOPES = ['https://www.googleapis.com/auth/prodxmon']
def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
self._endpoint = endpoint
credentials = credential_factory.create(self._SCOPES)
if http is None:
http = httplib2_utils.RetriableHttp(
httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
self._http = credentials.authorize(http)
def encode_to_json(self, metric_pb):
return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
def send(self, metric_pb):
body = self.encode_to_json(metric_pb)
try:
resp, content = self._http.request(self._endpoint,
method='POST',
body=body,
headers={'Content-Type': 'application/json'})
if resp.status != 200:
logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
content)
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error):
logging.exception('HttpsMonitor.send failed')
class DebugMonitor(Monitor):
"""Class which writes metrics to logs or a local file for debugging."""
def __init__(self, filepath=None):
if filepath is None:
self._fh = None
else:
self._fh = open(filepath, 'a')
def send(self, metric_pb):
text = str(metric_pb)
logging.info('Flushing monitoring metrics:\n%s', text)
if self._fh is not None:
self._fh.write(text + '\n\n')
self._fh.flush()
class NullMonitor(Monitor):
"""Class that doesn't send metrics anywhere."""
def send(self, metric_pb):
pass