| # Copyright 2024 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Parse and publish telemetry.""" |
| |
| import dataclasses |
| import datetime |
| import enum |
| import functools |
| import json |
| import logging |
| import os |
| import socket |
| import time |
| from typing import ( |
| Any, |
| Callable, |
| Dict, |
| Iterable, |
| List, |
| Optional, |
| TYPE_CHECKING, |
| Union, |
| ) |
| import urllib.error |
| import urllib.request |
| |
| from chromite.third_party.google.protobuf import json_format |
| from chromite.third_party.google.protobuf import message as proto_msg |
| from chromite.third_party.opentelemetry.sdk import resources |
| |
| # Required due to incomplete proto support in chromite. This proto usage is not |
| # tied to the Build API, so delegating the proto handling to api/ does not make |
| # sense. When proto is better supported in chromite, the protos could live |
| # somewhere else instead. |
| from chromite.api.gen.chromite.telemetry import clientanalytics_pb2 |
| from chromite.api.gen.chromite.telemetry import trace_span_pb2 |
| from chromite.lib import cros_build_lib |
| from chromite.lib import locking |
| from chromite.lib import osutils |
| from chromite.lib import path_util |
| from chromite.lib.telemetry import trace |
| from chromite.utils.telemetry import detector |
| from chromite.utils.telemetry import utils |
| |
| |
| if TYPE_CHECKING: |
| from pathlib import Path |
| |
| tracer = trace.get_tracer(__name__) |
| |
| _DEFAULT_ENDPOINT = "https://play.googleapis.com/log" |
| _DEFAULT_TIMEOUT = 15 |
| _DEAULT_MAX_WAIT_SECS = 20 * 60 |
| _DEFAULT_MAX_BATCH_SIZE = 20000 |
| # Preallocated in Clearcut proto to Build. |
| _LOG_SOURCE = 2044 |
| # Preallocated in Clearcut proto to Python clients. |
| _CLIENT_TYPE = 33 |
| |
| # How long to keep telemetry before deleting. |
| _TELEMETRY_PURGE_AGE = 7 * 24 * 60 * 60 |
| |
| |
| class Error(Exception): |
| """Base error class for the module.""" |
| |
| |
| class ParseSpanError(Error): |
| """Error parsing a span.""" |
| |
| |
| class PublishError(Error): |
| """An error encountered while publishing.""" |
| |
| |
| @functools.lru_cache |
| def _get_telemetry_dir(): |
| """Get the base telemetry log directory.""" |
| return path_util.get_log_dir() / "telemetry" |
| |
| |
| @functools.lru_cache |
| def _get_publisher_file(): |
| """Get the publisher PID file.""" |
| return _get_telemetry_dir() / ".telemetry_publisher_pid" |
| |
| |
| @functools.lru_cache |
| def _get_next_publish_ts_file(): |
| """Get the telemetry next publish ts file.""" |
| return _get_telemetry_dir() / ".telemetry_next_publish_ts" |
| |
| |
| @functools.lru_cache |
| def _has_internet() -> bool: |
| """Check internet connection.""" |
| try: |
| # Try to connect to a Google DNS server as a quick internet-works check. |
| socket.setdefaulttimeout(3) |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect( |
| ("8.8.8.8", 53) |
| ) |
| return True |
| except socket.error as e: |
| logging.debug("No internet detected.") |
| logging.exception(e) |
| return False |
| |
| |
| def can_publish(): |
| """Check if publishing is possible.""" |
| next_publish = _get_next_publish_ts_file() |
| if not next_publish.exists(): |
| # No timestamp recorded, we can publish if we have internet access. |
| # The internet check is mostly meant to avoid attempting to publish when |
| # the network is disabled, e.g. the network sandbox used by cros |
| # build-image, but generally avoiding trying to publish when it's doomed |
| # to fail is nice. |
| return _has_internet() |
| |
| next_publish_lock = locking.FileLock(next_publish, locktype=locking.FLOCK) |
| next_publish_ts = None |
| with next_publish_lock.read_lock(): |
| if next_publish.exists(): |
| try: |
| next_publish_ts = float(next_publish.read_text()) |
| except Exception as e: |
| logging.error(e) |
| |
| logging.debug("next_publish_ts: %s", next_publish_ts) |
| logging.debug("current time: %s", time.time()) |
| if next_publish_ts and time.time() < next_publish_ts: |
| logging.debug("Too soon to publish again.") |
| return False |
| |
| # The next publish timestamp has passed, we can publish if we have internet. |
| return _has_internet() |
| |
| |
| @tracer.start_as_current_span("chromite.lib.telemetry_publisher.publish") |
| def publish(): |
| """Parse telemetry from files and publish a batch.""" |
| publisher_file = _get_publisher_file() |
| next_publish = _get_next_publish_ts_file() |
| publisher_lock = locking.FileLock(publisher_file, locktype=locking.FLOCK) |
| next_publish_lock = locking.FileLock(next_publish, locktype=locking.FLOCK) |
| |
| if not can_publish(): |
| # Short circuit publisher file lock when we can't publish anyway. |
| return |
| |
| publisher = ClearcutPublisher() |
| |
| logging.debug("Acquiring lock.") |
| with publisher_lock.write_lock(): |
| if not can_publish(): |
| # Double check we weren't waiting on a now-completed publisher. |
| return |
| |
| # Log our PID. |
| publisher_file.write_text(str(os.getpid())) |
| |
| logging.debug("Parsing files.") |
| pending_files = _parse_files(publisher) |
| |
| span = trace.get_current_span() |
| span.set_attribute("file_count", len(pending_files)) |
| span.set_attribute("span_count", publisher.queue_len) |
| logging.debug( |
| "Publishing %s files containing %s spans.", |
| len(pending_files), |
| publisher.queue_len, |
| ) |
| |
| # Do the publishing. |
| try: |
| publisher.publish() |
| except PublishError: |
| _post_publish_failure_actions(pending_files) |
| publisher_file.unlink() |
| raise |
| |
| # Write out the next publish TS. |
| with next_publish_lock.write_lock(): |
| next_publish.write_text(str(publisher.next_publish_ts)) |
| |
| logging.debug("Next request: %s", publisher.next_request_dt.isoformat()) |
| _post_publish_actions(pending_files) |
| |
| # Drop the PID file and we're done. |
| publisher_file.unlink() |
| |
| logging.notice("Publish complete.") |
| |
| |
| @tracer.start_as_current_span("chromite.lib.telemetry_publisher._parse_files") |
| def _parse_files(publisher: "ClearcutPublisher") -> List["TelemetryFile"]: |
| """Parse relevant files from the telemetry log dir and queue their spans.""" |
| pending_files = [] |
| for current in _get_telemetry_files(): |
| logging.debug("Processing: %s", current) |
| |
| if not current.is_publishable: |
| continue |
| |
| if not current.spans: |
| # This should be redundant since is_publishable checks for the |
| # in-progress file, but just in case there's a race condition... |
| continue |
| |
| try: |
| queued = publisher.queue(current.spans) |
| except ParseSpanError as e: |
| logging.warning(e) |
| current.parsing_failed() |
| continue |
| |
| if queued: |
| logging.debug( |
| "Queued: %s with %s spans", current, len(current.spans) |
| ) |
| pending_files.append(current) |
| else: |
| break |
| |
| return pending_files |
| |
| |
| @tracer.start_as_current_span( |
| "chromite.lib.telemetry_publisher._post_publish_actions" |
| ) |
| def _post_publish_actions(pending_files: List["TelemetryFile"]): |
| """Post-publish actions for all published files.""" |
| # Mark the just published files as published. |
| for file in pending_files: |
| file.publishing_succeeded() |
| |
| # Clean out old telemetry files. |
| # We need to convert it to a list because deletions can cause problems for |
| # the rglob iterator. |
| for file in list(_get_telemetry_files()): |
| file.delete(age=_TELEMETRY_PURGE_AGE) |
| |
| # Clean out old publisher logs. |
| publisher_logs = _get_telemetry_dir() / ".publisher_logs" |
| cutoff = time.time() - _TELEMETRY_PURGE_AGE |
| for file in publisher_logs.iterdir(): |
| if file.stat().st_mtime < cutoff: |
| try: |
| osutils.SafeUnlink(file, sudo=True) |
| except cros_build_lib.RunCommandError as e: |
| logging.warning("Error deleting %s: %s", file, e) |
| |
| |
| def _post_publish_failure_actions(pending_files: Iterable["TelemetryFile"]): |
| """Anything that needs to be done on a publishing failure.""" |
| for file in pending_files: |
| file.publishing_failed() |
| |
| |
| def _get_telemetry_files() -> Iterable["TelemetryFile"]: |
| """Get all telemetry files on disk.""" |
| for current in _get_telemetry_dir().rglob("*.otel.traces.json"): |
| yield TelemetryFile(current) |
| |
| |
| class TelemetryFile: |
| """Telemetry file class.""" |
| |
| def __init__(self, path: "Path"): |
| self._path = path |
| |
| def __str__(self): |
| return str(self._path) |
| |
| @functools.cached_property |
| def spans(self): |
| """Get the spans from the file.""" |
| return [ |
| x.strip() for x in self._path.read_text().splitlines() if x.strip() |
| ] |
| |
| # Metadata file properties used to track the status of the telemetry. |
| def _metadata_file(self, metadata_type): |
| return self._path.with_name(f".{self._path.name}.{metadata_type}") |
| |
| @property |
| def _published_file(self): |
| return self._metadata_file("published") |
| |
| @property |
| def _publish_failed_file(self): |
| return self._metadata_file("publish-failed") |
| |
| @property |
| def _parse_failed_file(self): |
| return self._metadata_file("parse-failed") |
| |
| @property |
| def _in_progress_file(self): |
| return self._metadata_file("in-progress") |
| |
| # Telemetry status properties. |
| @property |
| def is_published(self): |
| return self._published_file.exists() |
| |
| @property |
| def is_failed_publishing(self): |
| return self._publish_failed_file.exists() |
| |
| @property |
| def is_failed_parsing(self): |
| return self._parse_failed_file.exists() |
| |
| @property |
| def is_pending(self): |
| return self._in_progress_file.exists() |
| |
| @property |
| def is_publishable(self) -> bool: |
| return self._path.exists() and not ( |
| self.is_published |
| or self.is_pending |
| or self.is_failed_publishing |
| or self.is_failed_parsing |
| ) |
| |
| # Actions performed on the various results. |
| def parsing_failed(self) -> None: |
| """To be called when the file could not be parsed.""" |
| self._parse_failed_file.touch() |
| |
| def publishing_failed(self) -> None: |
| """To be called on failing to publish.""" |
| # TODO: Add retry mechanism to accommodate external failures: network |
| # flakes, clearcut outages, etc. |
| self._publish_failed_file.touch() |
| |
| def publishing_succeeded(self) -> None: |
| """To be called on successfully being published.""" |
| self._published_file.touch() |
| |
| def _is_younger_than(self, age: int) -> bool: |
| return (time.time() - self._path.stat().st_mtime) < age |
| |
| def delete(self, age: int): |
| """Delete the telemetry and relevant metadata if older than |age|. |
| |
| Args: |
| age: The age in seconds to serve as the cutoff for keeping the file. |
| """ |
| if age and self._is_younger_than(age): |
| return |
| |
| def _delete(f: "Path"): |
| if not f.exists(): |
| return |
| try: |
| osutils.SafeUnlink(f, sudo=True) |
| except cros_build_lib.RunCommandError as e: |
| # Doesn't exist for some reason. |
| logging.warning("Unable to delete %s:", f) |
| logging.warning(e) |
| |
| # Delete the file itself plus all metadata files. |
| _delete(self._path) |
| _delete(self._publish_failed_file) |
| _delete(self._published_file) |
| _delete(self._parse_failed_file) |
| _delete(self._in_progress_file) |
| |
| # Try to clear out empty parent directories. |
| for parent in self._path.parents: |
| if _get_telemetry_dir() not in parent.parents: |
| # At or above telemetry dir. |
| break |
| |
| try: |
| parent.rmdir() |
| except OSError: |
| # It's not empty. |
| break |
| |
| |
| class TraceSpanDataclassMixin: |
| """Mixin to facilitate translating from otel span json to TraceSpan proto. |
| |
| This is a one way translation from opentelemetry's json-encoded spans to our |
| TraceSpan proto, but the reverse case isn't supported (or needed). |
| For example, `x.from_json(data).to_json() == data` CANNOT be asserted. |
| """ |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| """Get the {otel: TraceSpan} field name mapping. |
| |
| Used to map a field from the otel representation to the dataclass field. |
| This needs only be populated for fields where the names differ. |
| """ |
| return {} |
| |
| def to_dict(self): |
| """Convert to a dict.""" |
| |
| def _dict_factory(values): |
| """Dict factory to convert enums to their value.""" |
| return { |
| k: v.value if isinstance(v, enum.Enum) else v for k, v in values |
| } |
| |
| return dataclasses.asdict(self, dict_factory=_dict_factory) |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| """Populate from an otel span dict. |
| |
| Args: |
| mapping: The relevant portion of the parsed otel span data. |
| NOTE: There are no guaranteed post conditions for the contents |
| of |mapping|, so pass a copy if you want the original data |
| intact. |
| |
| Returns: |
| A dict containing the unused portion of |mapping|. |
| """ |
| field_mapping = self._field_mapping() |
| remaining = {} |
| for k, v in mapping.items(): |
| k_attr = field_mapping.get(k, k) |
| if not hasattr(self, k_attr): |
| # Return unconsumed fields. |
| remaining[k] = v |
| continue |
| |
| current = getattr(self, k_attr) |
| current_type = type(current) |
| if current_type == type(v): |
| # All the scalars. |
| setattr(self, k_attr, v) |
| elif hasattr(current_type, "from_span_value"): |
| # Enums, create a new instance with the value. |
| setattr(self, k_attr, current_type.from_span_value(v)) |
| elif isinstance(current, TraceSpanDataclassMixin): |
| # A nested class. |
| current.from_dict(v) |
| |
| return remaining |
| |
| def to_json(self, indent: Optional[int] = None): |
| """Dump to json.""" |
| return json.dumps(self.to_dict(), indent=indent) |
| |
| def from_json(self, content: str): |
| """Parse an otel span json string.""" |
| self.from_dict(json.loads(content)) |
| |
| def to_proto(self, message: "proto_msg.Message"): |
| """Populate a proto.""" |
| json_format.ParseDict( |
| self.to_dict(), message, ignore_unknown_fields=True |
| ) |
| |
| |
| @dataclasses.dataclass |
| class TelemetrySdk(TraceSpanDataclassMixin): |
| """Telemetry SDK dataclass.""" |
| |
| name: str = "" |
| version: str = "" |
| language: str = "" |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| return { |
| resources.TELEMETRY_SDK_NAME: "name", |
| resources.TELEMETRY_SDK_VERSION: "version", |
| resources.TELEMETRY_SDK_LANGUAGE: "language", |
| } |
| |
| |
| @dataclasses.dataclass |
| class System(TraceSpanDataclassMixin): |
| """System information.""" |
| |
| os_name: str = "" |
| os_version: str = "" |
| os_type: str = "" |
| cpu: str = "" |
| host_architecture: str = "" |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| return { |
| detector.OS_NAME: "os_name", |
| resources.OS_DESCRIPTION: "os_version", |
| resources.OS_TYPE: "os_type", |
| detector.CPU_NAME: "cpu", |
| detector.CPU_ARCHITECTURE: "host_architecture", |
| } |
| |
| |
| @dataclasses.dataclass |
| class Process(TraceSpanDataclassMixin): |
| """Process dataclass.""" |
| |
| pid: str = "" |
| executable_name: str = "" |
| executable_path: str = "" |
| command: str = "" |
| command_args: List[str] = dataclasses.field(default_factory=list) |
| owner_is_root: bool = False |
| runtime_name: str = "" |
| runtime_version: str = "" |
| runtime_description: str = "" |
| api_version: str = "" |
| env: Dict[str, str] = dataclasses.field(default_factory=dict) |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| return { |
| resources.PROCESS_EXECUTABLE_NAME: "executable_name", |
| resources.PROCESS_EXECUTABLE_PATH: "executable_path", |
| resources.PROCESS_COMMAND: "command", |
| resources.PROCESS_COMMAND_ARGS: "command_args", |
| resources.PROCESS_RUNTIME_NAME: "runtime_name", |
| resources.PROCESS_RUNTIME_VERSION: "runtime_version", |
| resources.PROCESS_RUNTIME_DESCRIPTION: "runtime_description", |
| detector.PROCESS_RUNTIME_API_VERSION: "api_version", |
| } |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| self.pid = str(mapping.pop(resources.PROCESS_PID, "")) |
| self.owner_is_root = mapping.pop(resources.PROCESS_OWNER, 1) == 0 |
| env_keys = [k for k in mapping if k.startswith("process.env.")] |
| self.env = {k[len("process.env.") :]: mapping.pop(k) for k in env_keys} |
| return TraceSpanDataclassMixin.from_dict(self, mapping) |
| |
| |
| @dataclasses.dataclass |
| class Resource(TraceSpanDataclassMixin): |
| """Resource dataclass.""" |
| |
| process: Process = dataclasses.field(default_factory=Process) |
| system: System = dataclasses.field(default_factory=System) |
| attributes: Dict[str, Any] = dataclasses.field(default_factory=dict) |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| attrs = self.process.from_dict(mapping) |
| attrs = self.system.from_dict(attrs) |
| # Everything not already consumed. |
| self.attributes = {**attrs} |
| return {} |
| |
| |
| @dataclasses.dataclass |
| class InstrumentationScope(TraceSpanDataclassMixin): |
| """InstrumentationScope dataclass.""" |
| |
| name: str = "" |
| version: str = "" |
| |
| |
| class SpanKind(enum.Enum): |
| """Span type.""" |
| |
| SPAN_KIND_UNSPECIFIED = 0 |
| SPAN_KIND_INTERNAL = 1 |
| SPAN_KIND_SERVER = 2 |
| SPAN_KIND_CLIENT = 3 |
| |
| @classmethod |
| def from_span_value(cls, value): |
| """Create an enum from the value from the json span value.""" |
| # The otel implementation uses str(self.kind), where self.kind is |
| # an otel SpanKind enum, resulting in `Enum.ValueName` strings. |
| if value == "SpanKind.INTERNAL": |
| return cls.SPAN_KIND_INTERNAL |
| elif value == "SpanKind.SERVER": |
| return cls.SPAN_KIND_SERVER |
| elif value == "SpanKind.CLIENT": |
| return cls.SPAN_KIND_CLIENT |
| else: |
| return cls.SPAN_KIND_UNSPECIFIED |
| |
| |
| @dataclasses.dataclass |
| class Event(TraceSpanDataclassMixin): |
| """Event dataclass.""" |
| |
| event_time_millis: int = 0 |
| name: str = "" |
| attributes: Dict[str, Any] = dataclasses.field(default_factory=dict) |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| # TODO(python3.11): Use fromisoformat instead of strptime and replace. |
| start = datetime.datetime.strptime( |
| mapping.pop("timestamp"), "%Y-%m-%dT%H:%M:%S.%fZ" |
| ).replace(tzinfo=datetime.timezone.utc) |
| self.event_time_millis = int(start.timestamp() * 1000) |
| return TraceSpanDataclassMixin.from_dict(self, mapping) |
| |
| |
| @dataclasses.dataclass |
| class StackFrame(TraceSpanDataclassMixin): |
| """StackFrame dataclass.""" |
| |
| function_name: str = "" |
| file_name: str = "" |
| line_number: int = 0 |
| column_number: int = 0 |
| |
| |
| @dataclasses.dataclass |
| class StackTrace(TraceSpanDataclassMixin): |
| """StackTrace dataclass.""" |
| |
| stack_frames: List[StackFrame] = dataclasses.field(default_factory=list) |
| dropped_frames_count: int = 0 |
| stacktrace_hash: str = "" |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| for frame in mapping.pop("stack_frames", []): |
| stack_frame = StackFrame() |
| stack_frame.from_dict(frame) |
| self.stack_frames.append(stack_frame) |
| |
| return TraceSpanDataclassMixin.from_dict(self, mapping) |
| |
| |
| class StatusCode(enum.Enum): |
| """Status code.""" |
| |
| STATUS_CODE_UNSET = 0 |
| STATUS_CODE_OK = 1 |
| STATUS_CODE_ERROR = 2 |
| |
| @classmethod |
| def from_span_value(cls, value): |
| """Create an enum from the value from the json span value.""" |
| # The otel implementation uses str(self.status_code.name), where |
| # self.status_code is an otel StatusCode enum, resulting in simple |
| # `ValueName` strings. |
| if value == "ERROR": |
| return cls.STATUS_CODE_ERROR |
| else: |
| return cls.STATUS_CODE_OK |
| |
| |
| @dataclasses.dataclass |
| class Status(TraceSpanDataclassMixin): |
| """Status dataclass.""" |
| |
| status_code: StatusCode = StatusCode.STATUS_CODE_UNSET |
| message: str = "" |
| stack_trace: StackTrace = dataclasses.field(default_factory=StackTrace) |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| return { |
| "description": "message", |
| } |
| |
| |
| @dataclasses.dataclass |
| class Context(TraceSpanDataclassMixin): |
| """Context dataclass.""" |
| |
| trace_id: str = "" |
| span_id: str = "" |
| trace_state: str = "" |
| |
| |
| @dataclasses.dataclass |
| class Link(TraceSpanDataclassMixin): |
| """Link dataclass.""" |
| |
| context: Context = dataclasses.field(default_factory=Context) |
| attributes: Dict[str, Any] = dataclasses.field(default_factory=dict) |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| attrs = self.context.from_dict(mapping) |
| self.attributes = {**attrs} |
| return {} |
| |
| |
| @dataclasses.dataclass |
| class TraceSpan(TraceSpanDataclassMixin): |
| """Trace span dataclass.""" |
| |
| name: str = "" |
| context: Context = dataclasses.field(default_factory=Context) |
| parent_span_id: str = "" |
| span_kind: SpanKind = SpanKind.SPAN_KIND_UNSPECIFIED |
| start_time_millis: int = 0 |
| end_time_millis: int = 0 |
| attributes: Dict[str, Any] = dataclasses.field(default_factory=dict) |
| events: List[Event] = dataclasses.field(default_factory=list) |
| links: List[Link] = dataclasses.field(default_factory=list) |
| status: Status = dataclasses.field(default_factory=Status) |
| resource: Resource = dataclasses.field(default_factory=Resource) |
| # TODO: Verify whether InstrumentationScope is ever added to the json. |
| instrumentation_scope: InstrumentationScope = dataclasses.field( |
| default_factory=InstrumentationScope |
| ) |
| telemetry_sdk: TelemetrySdk = dataclasses.field( |
| default_factory=TelemetrySdk |
| ) |
| |
| def _field_mapping(self) -> Dict[str, str]: |
| return { |
| "kind": "span_kind", |
| } |
| |
| def from_dict(self, mapping: Dict[str, Any]) -> Dict[str, Any]: |
| # Force empty string when we get None. |
| self.parent_span_id = mapping.pop("parent_id", "") or "" |
| |
| # TODO(python3.11): Use fromisoformat instead of strptime and replace. |
| start = datetime.datetime.strptime( |
| mapping.pop("start_time"), "%Y-%m-%dT%H:%M:%S.%fZ" |
| ).replace(tzinfo=datetime.timezone.utc) |
| end = datetime.datetime.strptime( |
| mapping.pop("end_time"), "%Y-%m-%dT%H:%M:%S.%fZ" |
| ).replace(tzinfo=datetime.timezone.utc) |
| self.start_time_millis = int(start.timestamp() * 1000) |
| self.end_time_millis = int(end.timestamp() * 1000) |
| |
| for event_data in mapping.pop("events", []): |
| event = Event() |
| event.from_dict(event_data) |
| self.events.append(event) |
| |
| for link_data in mapping.pop("links", []): |
| link = Link() |
| link.from_dict(link_data) |
| self.links.append(link) |
| |
| # TelemetrySdk populates from the resource attributes, so make sure we |
| # allow it to consume those entries before populating the resource data. |
| resource_attrs = mapping.pop("resource", {}).get("attributes", {}) |
| resource_attrs = self.telemetry_sdk.from_dict(resource_attrs) |
| self.resource.from_dict(resource_attrs) |
| |
| TraceSpanDataclassMixin.from_dict(self, mapping) |
| return {} |
| |
| @classmethod |
| def parse(cls, span: str) -> "TraceSpan": |
| """Create an instance from the json encoded string.""" |
| instance = cls() |
| instance.from_json(span) |
| return instance |
| |
| |
| class ClearcutPublisher: |
| """Publish span to google http endpoint.""" |
| |
| def __init__( |
| self, |
| endpoint: str = _DEFAULT_ENDPOINT, |
| timeout: int = _DEFAULT_TIMEOUT, |
| max_batch_size: int = _DEFAULT_MAX_BATCH_SIZE, |
| next_request_ts: Optional[Union[int, float]] = None, |
| prefilter: Optional[Callable[[str], str]] = None, |
| ) -> None: |
| self._endpoint = endpoint |
| self._timeout = timeout |
| self._next_request_dt = ( |
| datetime.datetime.fromtimestamp(next_request_ts) |
| if next_request_ts |
| else datetime.datetime.now() |
| ) |
| self._queue = [] |
| self._max_batch_size = max_batch_size |
| self._prefilter = prefilter or utils.Anonymizer() |
| |
| @property |
| def wait_time(self) -> int: |
| """Get the wait time until the next publish.""" |
| wait_delta = self.next_request_dt - datetime.datetime.now() |
| wait_time = wait_delta.total_seconds() |
| |
| return wait_time if wait_time > 0 else 0 |
| |
| @property |
| def next_request_dt(self) -> datetime.datetime: |
| """Get the next request datetime.""" |
| return self._next_request_dt |
| |
| @property |
| def next_publish_ts(self) -> float: |
| """Get the timestamp the next publish can be made.""" |
| return self.next_request_dt.timestamp() |
| |
| @property |
| def queue_len(self) -> int: |
| """Get the number of items in the queue.""" |
| return len(self._queue) |
| |
| @tracer.start_as_current_span( |
| "chromite.lib.telemetry_publisher.ClearcutPublisher.publish" |
| ) |
| def publish(self, timeout: Optional[int] = None) -> None: |
| """Publish a batch.""" |
| spans = self._queue[: self._max_batch_size] |
| self._queue = self._queue[self._max_batch_size :] |
| |
| if not spans: |
| # Skip publishing nothing. |
| self._next_request_dt = datetime.datetime.now() |
| return |
| |
| log_request = self._prepare_request_body(spans) |
| log_response = self._do_publish_request(log_request, timeout) |
| |
| now = datetime.datetime.now() |
| delta = datetime.timedelta( |
| milliseconds=log_response.next_request_wait_millis |
| ) |
| self._next_request_dt = now + delta |
| |
| def queue(self, spans: Iterable[str]) -> bool: |
| """Add spans to the queue if not above max batch size.""" |
| try: |
| parsed = [TraceSpan.parse(self._prefilter(x)) for x in spans] |
| except Exception as e: # pylint: disable=broad-except |
| # We don't want a single malformed file to interrupt the publishing |
| # process, so catch Exception and raise a ParseError instead. |
| logging.warning("Error parsing a span:") |
| logging.warning(spans) |
| raise ParseSpanError( |
| "Unable to parse a span, see logs for details." |
| ) from e |
| |
| if self._can_queue(len(parsed)): |
| self._queue.extend(parsed) |
| return True |
| |
| return False |
| |
| def _can_queue(self, count: int) -> bool: |
| """Check if |count| spans can be published in the batch.""" |
| return self._max_batch_size - self.queue_len >= count |
| |
| def _prepare_request_body( |
| self, spans: Iterable[TraceSpan] |
| ) -> clientanalytics_pb2.LogRequest: |
| log_request = clientanalytics_pb2.LogRequest() |
| log_request.request_time_ms = int(time.time() * 1000) |
| log_request.client_info.client_type = _CLIENT_TYPE |
| log_request.log_source = _LOG_SOURCE |
| |
| for span in spans: |
| trace_span = trace_span_pb2.TraceSpan() |
| span.to_proto(trace_span) |
| log_event = log_request.log_event.add() |
| log_event.event_time_ms = int(time.time() * 1000) |
| log_event.source_extension = trace_span.SerializeToString() |
| |
| return log_request |
| |
| def _do_publish_request( |
| self, |
| log_request: clientanalytics_pb2.LogRequest, |
| timeout: Optional[int] = None, |
| ) -> clientanalytics_pb2.LogResponse: |
| req = urllib.request.Request( |
| self._endpoint, |
| data=log_request.SerializeToString(), |
| method="POST", |
| ) |
| |
| try: |
| with urllib.request.urlopen( |
| req, timeout=timeout or self._timeout |
| ) as f: |
| response = f.read() |
| except (urllib.error.URLError, socket.timeout) as e: |
| logging.exception(e) |
| raise PublishError( |
| f"Encountered an error while publishing: {e}" |
| ) from e |
| |
| logging.debug("Response:") |
| logging.debug(response) |
| |
| log_response = clientanalytics_pb2.LogResponse() |
| try: |
| log_response.ParseFromString(response) |
| except proto_msg.DecodeError as e: |
| logging.warning("could not decode data into proto: %s", e) |
| raise PublishError(f"Unable to decode proto: {e}") from e |
| |
| return log_response |