blob: 9ca600180a5f19e8752ed534316f0d0effdce42f [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helpers for interacting with LUCI Logdog service."""
from __future__ import print_function
import base64
import json
import time
import urllib
from chromite.lib.protos import annotations_pb2
from chromite.lib import prpc
from chromite.cbuildbot import topology
class LogdogResponseException(Exception):
"""Exception got from invalid Logdog response."""
class LogdogTimeoutException(Exception):
"""Exception got from Logdog request taking too long."""
class LogdogClient(prpc.PRPCClient):
"""Logdog client to interact with the LUCI Logdog service."""
def _GetHost(self):
"""Get LUCI Logdog Server host from topology."""
return topology.topology.get(topology.LUCI_LOGDOG_HOST_KEY)
def LogsQueryAnnotations(self, master, builder, build_num, dryrun=False):
"""Logdog request to query for annotations pathname.
Args:
master: waterfall master to query.
builder: builder to query.
build_num: build number to query.
dryrun: Whether a dryrun.
Returns:
annotations pathname or None if no streams are available.
Raises:
LogdogResponseException if response is invalid.
"""
body = json.dumps({
'project': master,
'tags': {
'buildbot.master': master,
'buildbot.builder': builder,
'buildbot.buildnumber': str(build_num),
},
'content_type': 'text/x-chrome-infra-annotations; version=2',
})
resp = self.SendRequest('prpc/logdog.Logs', 'Query', body, dryrun=dryrun)
if 'streams' not in resp or len(resp['streams']) == 0:
return None
try:
return resp['streams'][0]['path']
except KeyError:
raise LogdogResponseException('Unable to retrieve logs path')
def LogsGet(self, project, path, state=True, index=0, byte_count=0,
log_count=0, dryrun=False):
"""Logdog Logs.Get request to retrieve logs.
Args:
project: Logdog project of stream.
path: Logdog path of stream.
state: Boolean indicating if state descriptor should be returned.
index: The initial log stream index to retrieve.
byte_count: A suggested maximum number of bytes to return. At least
one full log line will be returned.
log_count: The maximum number of logs to return.
dryrun: Whether a dryrun.
Returns:
Full Logs.Get response protobuf.
"""
body = json.dumps({
'project': project,
'path': path,
'state': bool(state),
'index': int(index),
'byte_count': int(byte_count),
'log_count': int(log_count),
})
return self.SendRequest('prpc/logdog.Logs', 'Get', body, dryrun=dryrun)
def LogsGetChunked(self, project, path, index=0, log_count=0,
chunk_bytes=0, allow_retries=True, dryrun=False):
"""Logdog Logs.Get wrapper to request logs in chunks.
Args:
project: Logdog project of stream.
path: Logdog path of stream.
index: The initial log stream index to retrieve.
log_count: The maximum number of logs to return.
chunk_bytes: A suggested maximum number of bytes to return per chunk.
At least one full log line will be returned with each chunk.
allow_retries: Whether or not retries should be attempted.
dryrun: Whether a dryrun.
Returns:
Generator of Logs.Get response protobufs, one per chunk.
"""
retry_count = 3 if allow_retries else 0
terminal_index = None
while True:
# Retrieve the next chunk.
resp = self.LogsGet(project, path, state=True,
index=index, byte_count=chunk_bytes,
log_count=log_count, dryrun=dryrun)
# Ensure logs came back.
logs = resp.get('logs', [])
count = len(logs)
if count == 0:
# Logs haven't yet be ingested, retry.
if retry_count:
time.sleep(5)
retry_count -= 1
continue
else:
terminal = '%d' % (terminal_index) if terminal_index else 'None'
raise LogdogTimeoutException(
'Request to retrieve %s %s took too long '
'(index=%d, byte_count=%d, log_count=%d, terminal=%s)' %
(project, path, index, chunk_bytes, log_count, terminal))
# Return the current chunk.
yield resp
# Track the total number of logs.
if log_count > 0:
log_count -= count
if log_count <= 0:
return
# Keep track of the next index to read from.
final_index = int(logs[-1].get('streamIndex', 0))
# Was this the last index in the stream?
state = resp.get('state', {})
terminal_index = int(state.get('terminalIndex', 0))
if terminal_index >= 0 and final_index >= terminal_index:
return
# Start reading at the next index.
index = final_index + 1
def ExtractLines(self, resp):
"""Generate log lines from a Logs.Get request.
Args:
resp: Logs.Get response protobuf.
Returns:
Generator of delimited log lines.
"""
for log in resp.get('logs', []):
if 'text' in log and 'lines' in log['text']:
for line in log['text']['lines']:
yield '%s%s' % (line.get('value', ''), line['delimiter'])
def GetLines(self, project, path, index=0, log_count=0, chunk_bytes=0,
allow_retries=True, dryrun=False):
"""Retrieve a Logdog stream as lines of text.
Args:
project: Logdog project of stream.
path: Logdog path of stream.
index: The initial log stream index to retrieve.
log_count: The maximum number of logs to return.
chunk_bytes: A suggested maximum number of bytes to retrieve from the
server each time.
allow_retries: Whether or not retries should be attempted.
dryrun: Whether a dryrun.
Returns:
Generator of delimited log lines.
"""
for resp in self.LogsGetChunked(project, path, index=index,
log_count=log_count,
chunk_bytes=chunk_bytes,
allow_retries=allow_retries,
dryrun=dryrun):
for line in self.ExtractLines(resp):
yield line
def LogsTail(self, project, path, dryrun=False):
"""Logdog Logs.Tail request to retrieve logs.
Args:
project: Logdog project of stream.
path: Logdog path of stream.
dryrun: Whether a dryrun.
Returns:
Full Logs.Tail response protobuf.
"""
body = json.dumps({
'project': project,
'path': path,
})
return self.SendRequest('prpc/logdog.Logs', 'Tail', body, dryrun=dryrun)
def GetAnnotations(self, master, builder, build_num, dryrun=False):
"""Retrieve Logdog annotations for a build.
Args:
master: waterfall master to query.
builder: builder to query.
build_num: build number to query.
dryrun: Whether a dryrun.
Returns:
(Parsed Logdog annotations protobuf, Logdog prefix string) tuple
or (None, None) if none is available.
Raises:
LogdogResponseException if response is invalid.
"""
# Determine path to annotations.
path = self.LogsQueryAnnotations(master, builder, build_num, dryrun=dryrun)
if path is None:
return None, None
(prefix, _) = path.split('/+/')
# Retrieve annotations datagram.
annotations_logs = self.LogsTail(master, path)
try:
datagram = annotations_logs['logs'][0]['datagram']['data']
except KeyError:
raise LogdogResponseException('Unable to retrieve annotation datagram')
# Parse annotations protobuf.
annotations = annotations_pb2.Step()
annotations.ParseFromString(base64.b64decode(datagram))
return annotations, prefix
def ConstructViewerURL(self, project, prefix, name):
"""Construct URL to view logs via Logdog.
Args:
project: Logdog project of log stream.
prefix: Logdog prefix name.
name: Logdog stream name.
Returns:
URL of logs viewer.
"""
stream = '%s/+/%s' % (prefix, name)
return '%(scheme)s://%(host)s/v/?s=%(query)s' % {
'scheme': self.GetScheme(),
'host': self.host,
'query': urllib.quote_plus('%s/%s' % (project, stream)),
}