blob: 013b9e01e84f637f3a88b158d1f6d6be329979c5 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2019 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# pylint: disable=import-error
"""Extracts `perf-data` and `perfetto-data` from a feedback logs archive.
Example:
perf_data_extract.py system_logs.zip /tmp
"""
import argparse
import base64
import os
from pathlib import Path
import re
import subprocess
import sys
import tempfile
from typing import List, Optional
import zipfile
def get_perf_data(zip_file: str):
"""Extract the blobs of perf-data/perfetto-data from a zip archive.
Args:
zip_file: file path to system logs archive.
Returns:
(perf-data, perfetto-data)
"""
perf_prefix = "perf-data="
perf_data = None
perfetto_prefix = "perfetto-data="
perfetto_data = None
# Some blobs contain extra junk in the form "<TAG: NN>" (e.g., "<UID: 2>")
# which we need to filter out.
# TODO(skyostil): Figure out what is causing this corruption.
junk_tag = re.compile(r"<[A-Z0-9: ]*>")
blob_start = "<base64>:"
def read_next_blob(log, line):
while line:
if blob_start in line:
blob = line[line.index(blob_start) + len(blob_start) :].strip()
blob = re.sub(junk_tag, "", blob)
# These blobs are often truncated, so trim them down further
# down to the closest valid base64-encoded length.
if len(blob) * 3 % 4:
blob = blob[: -(len(blob) * 3 % 4)] + "=="
return base64.b64decode(blob)
line = log.readline()
with tempfile.TemporaryDirectory() as tmp_dir:
with zipfile.ZipFile(zip_file) as z:
z.extractall(path=tmp_dir)
log_path = os.path.join(tmp_dir, "system_logs.txt")
with open(log_path, encoding="utf-8") as log:
while line := log.readline():
if line.startswith(perf_prefix):
perf_data = read_next_blob(log, line)
elif line.startswith(perfetto_prefix):
perfetto_data = read_next_blob(log, line)
return (perf_data, perfetto_data)
def parse_args(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"system_logs_zip", type=Path, help="System logs zip file."
)
parser.add_argument(
"out",
type=Path,
default=os.curdir,
help="Output path (defaults to current dir).",
)
return parser.parse_args(argv)
# The two decompression functions below can decompress a possibly truncated
# compressed bitstream. If truncation is detected, the decompressor will print a
# warning to stderr and return the data recovered so far.
# TODO(skyostil): Find out where and why this data is getting truncated.
def decompress_lzma(lzma_data):
"""Best-effort decompresses potentially truncated lzma data."""
ret = subprocess.run(
["xz", "-dc", "-"], input=lzma_data, check=False, stdout=subprocess.PIPE
)
return ret.stdout
def decompress_zstd(zstd_data):
"""Best-effort decompresses potentially truncated zstd data."""
ret = subprocess.run(
["zstd", "-dc", "-"],
input=zstd_data,
check=False,
stdout=subprocess.PIPE,
)
return ret.stdout
def main(argv: Optional[List[str]] = None) -> Optional[int]:
opts = parse_args(argv)
perf_data, perfetto_data = get_perf_data(opts.system_logs_zip)
if not perf_data and not perfetto_data:
sys.exit("Error: perf-data/perfetto-data not found in the system logs")
if perf_data:
out = os.path.join(opts.out, "feedback_perf.data")
print("Writing perf data to", out)
perf_data = decompress_lzma(perf_data)
with open(out, "wb") as f:
f.write(perf_data)
if perfetto_data:
out = os.path.join(opts.out, "feedback.perfetto-trace")
print("Writing perfetto data to", out)
perfetto_data = decompress_zstd(perfetto_data)
with open(out, "wb") as f:
f.write(perfetto_data)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))