blob: b39fcbcdf309885f045b735a828a862d424f859a [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A library for emitting traces and spans to Google Cloud trace."""
import collections
import contextlib
import errno
import functools
import json
import logging
import os
import random
import re
from chromite.third_party.google.protobuf import timestamp_pb2
from chromite.third_party.infra_libs import ts_mon
from chromite.lib import metrics
SPANS_LOG = '/var/log/trace/{pid}-{span_id}.json'
_SPAN_COUNT_METRIC = 'chromeos/trace/client/logged_count'
# --- Code for logging spans to a file for later processing. -------------------
def GetSpanLogFilePath(span):
"""Gets the path to write a span to.
Args:
span: The span to write.
"""
return SPANS_LOG.format(pid=os.getpid(), span_id=span.spanId)
def LogSpan(span):
"""Serializes and logs a Span to a file.
Args:
span: A Span instance to serialize.
"""
_RecordSpanMetrics(span)
try:
with open(GetSpanLogFilePath(span), 'w') as fh:
fh.write(json.dumps(span, default=serialize_span))
# Catch various configuration errors.
except OSError as error:
if error.errno == errno.EPERM:
logging.warning(
'Received permissions error while trying to open the span log file.')
return None
elif error.errno == errno.ENOENT:
logging.warning('/var/log/traces does not exist; skipping trace log.')
return None
else:
raise
def serialize_span(obj):
"""Helper function to provide serialization of Spans for `json.dump`."""
if isinstance(obj, Span):
return dict(obj)
else:
raise TypeError(f'Unable to serialize an object of type {type(obj)}')
def _RecordSpanMetrics(span):
"""Increments the count of spans logged.
Args:
span: The span to record.
"""
m = metrics.Counter(
_SPAN_COUNT_METRIC,
description='A count of spans logged by a client.',
field_spec=[ts_mon.StringField('name')])
m.increment(fields={'name': span.name})
# -- User-facing API -----------------------------------------------------------
class Span(collections.abc.Mapping):
"""An object corresponding to a cloud trace Span."""
__keys = ('name', 'spanId', 'parentSpanId', 'labels', 'startTime', 'endTime',
'status')
def __init__(self, name, spanId=None, labels=None, parentSpanId=None,
traceId=None):
"""Creates a Span object.
Args:
name: The name of the span
spanId: (optional) A 64-bit number as a string. If not provided, it will
be generated randomly with .GenerateSpanId().
labels: (optional) a dict<string, string> of key/values
traceId: (optional) A 32 hex digit string referring to the trace
containing this span. If not provided, a new trace will be created
with a random id.
parentSpanId: (optional) The spanId of the parent.
"""
# Visible attributes
self.name = name
self.spanId = spanId or Span.GenerateSpanId()
self.parentSpanId = parentSpanId
self.labels = labels or {}
self.startTime = None
self.endTime = None
# Non-visible attributes
self.traceId = traceId or Span.GenerateTraceId()
@staticmethod
def GenerateSpanId():
"""Returns a random 64-bit number as a string."""
return str(random.randint(0, 2**64))
@staticmethod
def GenerateTraceId():
"""Returns a random 128-bit number as a 32-byte hex string."""
id_number = random.randint(0, 2**128)
return '%0.32X' % id_number
def __enter__(self):
"""Enters the span context.
Side effect: Records the start time as a Timestamp.
"""
start = timestamp_pb2.Timestamp()
start.GetCurrentTime()
self.startTime = start.ToJsonString()
return self
def __exit__(self, _type, _value, _traceback):
"""Exits the span context.
Side-effect:
Record the end Timestamp.
"""
end = timestamp_pb2.Timestamp()
end.GetCurrentTime()
self.endTime = end.ToJsonString()
def __getitem__(self, key: str):
if not isinstance(key, str):
raise TypeError('Span keys must be of type str')
if key not in self.__keys:
raise KeyError(key)
val = getattr(self, key, None)
if val is not None:
return val
else:
raise KeyError(key)
def keys(self):
return (key for key in self.__keys if getattr(self, key, None) is not None)
def __len__(self) -> int:
return len(self.keys())
def __iter__(self):
return ((key, self[key]) for key in self.keys())
class SpanStack(object):
"""A stack of Span contexts."""
CLOUD_TRACE_CONTEXT_ENV = 'CLOUD_TRACE_CONTEXT'
CLOUD_TRACE_CONTEXT_PATTERN = re.compile(
r'(?P<traceId>[0-9A-Fa-f]+)'
r'/'
r'(?P<parentSpanId>\d+)'
r';o=(?P<options>\d+)'
)
def __init__(self, global_context=None, traceId=None, parentSpanId=None,
labels=None, enabled=True):
"""Initializes the Span.
global_context: (optional) A global context str, perhaps read from the
X-Cloud-Trace-Context header.
traceId: (optional) A 32 hex digit string referring to the trace
containing this span. If not provided, a new trace will be created
with a random id.
parentSpanId: (optional) The spanId of the parent.
labels: (optional) a dict<string, string> of key/values to attach to
each Span created, or None.
enabled: (optional) a bool indicating whether we should log the spans
to a file for later uploading by the cloud trace log consumer daemon.
"""
self.traceId = traceId
self.spans = []
self.last_span_id = parentSpanId
self.labels = labels
self.enabled = enabled
global_context = (global_context or
os.environ.get(self.CLOUD_TRACE_CONTEXT_ENV, ''))
context = SpanStack._ParseCloudTraceContext(global_context)
if traceId is None:
self.traceId = context.get('traceId')
if parentSpanId is None:
self.last_span_id = context.get('parentSpanId')
if context.get('options') == '0':
self.enabled = False
def _CreateSpan(self, name, **kwargs):
"""Creates a span instance, setting certain defaults.
Args:
name: The name of the span
**kwargs: The keyword arguments to configure the span with.
"""
kwargs.setdefault('traceId', self.traceId)
kwargs.setdefault('labels', self.labels)
kwargs.setdefault('parentSpanId', self.last_span_id)
span = Span(name, **kwargs)
if self.traceId is None:
self.traceId = span.traceId
return span
@contextlib.contextmanager
def Span(self, name, **kwargs):
"""Enter a new Span context contained within the top Span of the stack.
Args:
name: The name of the span to enter
**kwargs: The kwargs to construct the span with.
Side effect:
Appends the new span object to |spans|, and yields span while in its
context. Pops the span object when exiting the context.
Returns:
A contextmanager whose __enter__() returns the new Span.
"""
span = self._CreateSpan(name, **kwargs)
old_span_id, self.last_span_id = self.last_span_id, span.spanId
self.spans.append(span)
with span:
with self.EnvironmentContext():
yield span
self.spans.pop()
self.last_span_id = old_span_id
# Log each span to a file for later processing.
if self.enabled:
LogSpan(span)
# pylint: disable=docstring-misnamed-args
def Spanned(self, *span_args, **span_kwargs):
"""A decorator equivalent of 'with span_stack.Span(...)'
Args:
*span_args: *args to use with the .Span
**span_kwargs: **kwargs to use with the .Span
Returns:
A decorator to wrap the body of a function in a span block.
"""
def SpannedDecorator(f):
"""Wraps the body of |f| with a .Span block."""
@functools.wraps(f)
def inner(*args, **kwargs):
with self.Span(*span_args, **span_kwargs):
f(*args, **kwargs)
return inner
return SpannedDecorator
def _GetCloudTraceContextHeader(self):
"""Gets the Cloud Trace HTTP header context.
From the cloud trace doc explaining this (
https://cloud.google.com/trace/docs/support?hl=bg)
'X-Cloud-Trace-Context: TRACE_ID/SPAN_ID;o=TRACE_TRUE'
Where:
- TRACE_ID is a 32-character hex value representing a 128-bit number.
It should be unique between your requests, unless you intentionally
want to bundle the requests together. You can use UUIDs.
- SPAN_ID should be 0 for the first span in your trace. For
subsequent requests, set SPAN_ID to the span ID of the parent
request. See the description of TraceSpan (REST, RPC) for more
information about nested traces.
- TRACE_TRUE must be 1 to trace this request. Specify 0 to not trace
the request. For example, to force a trace with cURL:
curl 'http://www.example.com' --header 'X-Cloud-Trace-Context:
105445aa7843bc8bf206b120001000/0;o=1'
"""
if not self.traceId:
return ''
span_postfix = '/%s' % self.spans[-1].spanId if self.spans else ''
enabled = '1' if self.enabled else '0'
return '{trace_id}{span_postfix};o={enabled}'.format(
trace_id=self.traceId,
span_postfix=span_postfix,
enabled=enabled)
@contextlib.contextmanager
def EnvironmentContext(self):
"""Sets CLOUD_TRACE_CONTEXT to the value of X-Cloud-Trace-Context.
Cloud Trace uses an HTTP header to propagate trace context across RPC
boundaries. This method does the same across process boundaries using an
environment variable.
"""
old_value = os.environ.get(self.CLOUD_TRACE_CONTEXT_ENV)
try:
os.environ[self.CLOUD_TRACE_CONTEXT_ENV] = (
self._GetCloudTraceContextHeader())
yield
finally:
if old_value is not None:
os.environ[self.CLOUD_TRACE_CONTEXT_ENV] = old_value
elif self.CLOUD_TRACE_CONTEXT_ENV in os.environ:
del os.environ[self.CLOUD_TRACE_CONTEXT_ENV]
@staticmethod
def _ParseCloudTraceContext(context):
"""Sets current_span_id and trace_id from the |context|.
See _GetCloudTraceContextHeader.
Args:
context: The context variable, either from X-Cloud-Trace-Context
or from the CLOUD_TRACE_CONTEXT environment variable.
Returns:
A dictionary, which if the context string matches
CLOUD_TRACE_CONTEXT_PATTERN, contains the matched groups. If not matched,
returns an empty dictionary.
"""
m = SpanStack.CLOUD_TRACE_CONTEXT_PATTERN.match(context)
if m:
return m.groupdict()
else:
return {}