blob: c7e1f58aa34b356fffbf0b84db3d27dbe6301d3d [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
r"""Test different compression settings on binpkgs.
NB: The working directory can easily take 300 GB!
NB: It is safe to interrupt the test at anytime. It will safely resume.
The --url option can quickly fetch a remote binpkg store. It is not required
if you already have a packages/ directory to test against. This assumes the
remote tree is a binpkg repository -- i.e. there is a "packages" directory and
it contains all the packages.
$ ./eval_binpkg_compression --workdir eve-14665.0.0 \
--url gs://chromeos-dev-installer/board/eve/14665.0.0
$ portageq-amd64-generic envvar PORTAGE_BINHOST
$ ./eval_binpkg_compression --workdir amd64-generic-R102-14665.0.0-rc1 \
--url gs://chromeos-prebuilt/board/amd64-generic/full-R102-14665.0.0-rc1
$ portageq envvar PORTAGE_BINHOST
$ ./eval_binpkg_compression --workdir chroot-2022.04.03.163758 \
--url gs://chromeos-prebuilt/host/amd64/amd64-host/chroot-2022.04.03.163758
The table will be written to stdout and should be pasteable to spreadsheets.
"""
import argparse
import json
import math
import os
from pathlib import Path
import resource
import shutil
import subprocess
import sys
import time
from typing import List, Optional
# Terminal escape sequence to erase the current line after the cursor.
CSI_ERASE_LINE_AFTER = "\x1b[K"
# List of compression formats & commands & levels to test.
#
# NB: When adding or updating algorithms here, be careful with -0 & -1 levels.
# Not all support -0, and some will treat them as "no compression".
#
# NB: The order here is used when generating the table.
ALGOS = (
# NB: Single threaded bzip2 -9 has been our historical default baseline.
("bz2", ["bzip2"], (9,)),
# NB: Test multiple bzip2 implementations to compare speeds, not final size.
("bz2", ["lbzip2", "-n%(threads)i"], (1, 3, 6, 9)),
("bz2", ["pbzip2", "-p%(threads)i"], (1, 3, 6, 9)),
# NB: lz4 is always single threaded.
("lz4", ["lz4"], (1, 3, 6, 9)),
# NB: lzip is always single threaded.
("lzip", ["lzip"], (0, 3, 6, 9)),
("lzip", ["plzip", "-n%(threads)i"], (0, 3, 6, 9)),
# NB: lzop is always single threaded.
# NB: [2..6] levels are the same in lzop atm.
("lzop", ["lzop"], (1, 3, 9)),
("gz", ["gzip"], (1, 3, 6, 9)),
("gz", ["pigz", "-p%(threads)i"], (1, 3, 6, 9)),
# NB: Test multiple xz implementations to compare speeds, not final size.
("xz", ["xz", "-T%(threads)i"], (0, 3, 6, 9)),
("xz", ["pixz", "-t", "-p%(threads)i"], (0, 3, 6, 9)),
("zst", ["zstd", "-T%(threads)i", "--ultra"], (3, 6, 9, 19, 22)),
# NB: Z is an alias for 11 since -11 doesn't work (treated as -1 -1).
("brotli", ["brotli"], (0, 3, 6, 9, "Z")),
)
def print_status(*args, **kwargs):
"""Print a status-bar message.
Output is flushed to stderr w/out newline.
"""
kwargs.setdefault("end", "")
kwargs.setdefault("file", sys.stderr)
kwargs.setdefault("flush", True)
print(*args, **kwargs)
def run_stats(output, *args, **kwargs):
"""Gather rusage data from a run() command & write them to |output|."""
kwargs.setdefault("check", True)
# print('+', ' '.join(str(x) for x in args[0]), end='', flush=True)
pid = os.fork()
if pid == 0:
# Fork a child to reset the rusage stats for this one run.
start_time = time.time()
# check=True was set above.
# pylint: disable=subprocess-run-check
subprocess.run(*args, **kwargs)
stop_time = time.time()
# Save the stats into a JSON file for later processing
stats = resource.getrusage(resource.RUSAGE_CHILDREN)
data = dict(
(x, str(getattr(stats, x)))
for x in dir(stats)
if x.startswith("ru_")
)
data["wall_clock"] = stop_time - start_time
with output.open("wb") as fp:
json.dump(data, fp)
# We forked above, so use _exit here.
# pylint: disable=protected-access
os._exit(0)
_, status = os.waitpid(pid, 0)
code = os.WEXITSTATUS(status)
if code:
sys.exit(code)
def compress_tars(
workdir: Path,
atoms: List[str],
threads_compress: int = 0,
threads_decompress: int = 0,
) -> None:
"""Compress the tars with a variety of algorithms & levels.
Also gather cpu/timing stats.
"""
tar_dir = workdir / "tar"
def comp_cmd(cmd, threads, level=None):
settings = {"threads": threads}
ret = [x % settings for x in cmd] + ["-c"]
if level is not None:
ret += [f"-{level}"]
return ret
temp_file = workdir / ".tmp"
numpkgs = len(atoms)
width = int(math.log10(numpkgs)) + 1
for i, atom in enumerate(atoms, start=1):
print_status(
f"\r[{i:{width}}/{numpkgs}] {atom:50}{CSI_ERASE_LINE_AFTER}"
)
tar_file = tar_dir / f"{atom}.tar"
for suffix, cmd, levels in ALGOS:
cmd_name = cmd[0]
print_status(f" {cmd_name}:")
for level in levels:
print_status(level)
out_dir = workdir / f"{cmd_name}-{level}"
out_file = out_dir / f"{atom}.tar.{suffix}"
# Compress the file to gather stats.
if not out_file.exists():
out_file.parent.mkdir(parents=True, exist_ok=True)
stats_file = out_dir / f"{atom}.compress.json"
with tar_file.open("rb") as stdin, temp_file.open(
"wb"
) as stdout:
run_stats(
stats_file,
comp_cmd(cmd, threads_compress, level=level),
stdin=stdin,
stdout=stdout,
)
temp_file.rename(out_file)
# Decompress the file to gather stats.
stats_file = out_dir / f"{atom}.decompress.json"
if not stats_file.exists():
with out_file.open("rb") as stdin:
run_stats(
stats_file,
comp_cmd(cmd + ["-d"], threads_decompress),
stdin=stdin,
stdout=subprocess.DEVNULL,
)
print_status(f"\r{CSI_ERASE_LINE_AFTER}")
def unpack_binpkgs(
workdir: Path, atoms: List[str], threads_decompress: int = 0
) -> None:
"""Unpack the binpkgs & return a list of the uncompressed tarballs.
This also pulls the archive out (i.e. strips the xpak) to provide easier
baseline references.
This creates the structure:
tar/
$CATEGORY/$PF.tar
orig-data/
$CATEGORY/$PF
"""
pkgdir = workdir / "packages"
tar_dir = workdir / "tar"
data_dir = workdir / "orig-data"
temp_file = workdir / ".tmp"
numpkgs = len(atoms)
width = int(math.log10(numpkgs)) + 1
for i, atom in enumerate(atoms, start=1):
print_status(
f"\r[{i:{width}}/{numpkgs}] unpacking {atom:50}",
CSI_ERASE_LINE_AFTER,
)
src_tbz2 = pkgdir / f"{atom}.tbz2"
# Copy the Gentoo binpkg & strip the xpak off.
data_file = data_dir / atom
if not data_file.exists():
print_status("extracting ")
shutil.copy(src_tbz2, temp_file)
with temp_file.open("rb") as fp:
size = temp_file.stat().st_size
fp.seek(size - 8)
xpak_len = int.from_bytes(fp.read(4), byteorder="big") + 8
data_len = size - xpak_len
os.truncate(temp_file, data_len)
data_file.parent.mkdir(parents=True, exist_ok=True)
temp_file.rename(data_file)
data_file.chmod(0o400)
# Decompress the binpkg to a tarball.
tar_file = tar_dir / f"{atom}.tar"
if not tar_file.exists():
print_status(f"decompressing-T{threads_decompress} ")
tar_file.parent.mkdir(parents=True, exist_ok=True)
run_stats(
Path(f"{tar_file}.decompress.json"),
["pbzip2", "-dc", f"-p{threads_decompress}"],
stdin=data_file.open("rb"),
stdout=temp_file.open("wb"),
)
temp_file.rename(tar_file)
tar_file.chmod(0o400)
print_status(f"\r{CSI_ERASE_LINE_AFTER}")
def fetch_binpkgs(workdir: Path, url: str) -> List[str]:
"""Download the binpkgs & return a list of atoms.
NB: "tbz2" does not mean the archive is using bzip2. It's a historical name
that just means "Gentoo binpkg".
This creates the structure:
packages/
Packages
$CATEGORY/$PF.tbz2
"""
# gsutil wants this to exist first.
workdir.mkdir(parents=True, exist_ok=True)
if url:
url = url.rstrip("/")
if not url.endswith("/packages"):
url += "/packages"
print_status("Downloading archives\r")
subprocess.run(["gsutil", "-m", "cp", "-r", url, workdir], check=True)
pkgdir = workdir / "packages"
return [
str(x.relative_to(pkgdir).with_suffix(""))
for x in pkgdir.glob("**/*.tbz2")
]
class Rusage:
"""Object to hold rusage fields we care about."""
wall_clock = 0.0
ru_maxrss = 0
ru_stime = 0.0
ru_utime = 0.0
def gather_stats(
workdir: Path, threads_compress: int = 0, threads_decompress: int = 0
) -> None:
"""Produce a table of data for this run."""
header = (
"algo",
"size (bytes)",
"size (GiB)",
"% 🗜",
"🗜 threads",
"🗜 rss (GiB)",
"🗜 wall ⏲ (min)",
"🗜 sys ⏲ (min)",
"🗜 user ⏲ (min)",
"⇫ threads",
"⇫ rss (GiB)",
"⇫ wall ⏲ (min)",
"⇫ sys ⏲ (min)",
"⇫ user ⏲ (min)",
)
def iter_algos():
for d in ("tar", "packages", "orig-data"):
yield workdir / d
for _, cmd, levels in ALGOS:
for level in levels:
yield workdir / f"{cmd[0]}-{level}"
stats = []
for algo_dir in iter_algos():
algo = algo_dir.name
print_status(f"\rsummarize {algo} {CSI_ERASE_LINE_AFTER}")
size = 0
compress_stats = Rusage()
decompress_stats = Rusage()
for path in algo_dir.glob("**/*"):
if path.name.endswith(".compress.json"):
data = json.loads(path.read_bytes())
compress_stats.wall_clock += float(data["wall_clock"])
compress_stats.ru_maxrss += int(data["ru_maxrss"])
compress_stats.ru_stime += float(data["ru_stime"])
compress_stats.ru_utime += float(data["ru_utime"])
continue
elif path.name.endswith(".decompress.json"):
data = json.loads(path.read_bytes())
decompress_stats.wall_clock += float(data["wall_clock"])
decompress_stats.ru_maxrss += int(data["ru_maxrss"])
decompress_stats.ru_stime += float(data["ru_stime"])
decompress_stats.ru_utime += float(data["ru_utime"])
continue
elif path.suffix in (".out", ".json"):
print(f"warning: ignoring {path}", file=sys.stderr)
continue
size += path.stat().st_size
stats.append(
[
algo,
size,
round(size / 1024 / 1024 / 1024, 2),
0,
threads_compress,
round(compress_stats.ru_maxrss / 1024 / 1024, 2),
round(compress_stats.wall_clock / 60, 2),
round(compress_stats.ru_stime / 60, 2),
round(compress_stats.ru_utime / 60, 2),
threads_decompress,
round(decompress_stats.ru_maxrss / 1024 / 1024, 2),
round(decompress_stats.wall_clock / 60, 2),
round(decompress_stats.ru_stime / 60, 2),
round(decompress_stats.ru_utime / 60, 2),
]
)
def print_row(row):
row[3] = round((row[1] / total_size) * 100, 2)
print("\t".join(str(x) for x in row))
print_status(f"\r{CSI_ERASE_LINE_AFTER}")
print("\t".join(header))
total_size = stats[0][1]
for row in stats:
print_row(row)
def check_tools() -> bool:
"""Make sure tools we use are installed."""
def _check(cmd) -> bool:
ret = bool(shutil.which(cmd))
if not ret:
print(f"missing {cmd}", file=sys.stderr)
return ret
ret = _check("gsutil")
for tool in (x[1][0] for x in ALGOS):
ret = _check(tool) and ret
return ret
def get_parser() -> argparse.ArgumentParser:
"""Get a command line parser."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-w",
"--workdir",
type=Path,
required=True,
help="Base directory to write all work",
)
parser.add_argument("-u", "--url", help="GS URI to mirror")
parser.add_argument(
"-t",
"--threads-compress",
type=int,
default=0,
help="Number of compression threads",
)
parser.add_argument(
"-T",
"--threads-decompress",
type=int,
help="Number of decompression threads (defaults to -t)",
)
return parser
def main(argv: Optional[List[str]] = None) -> Optional[int]:
parser = get_parser()
opts = parser.parse_args(argv)
threads_compress = opts.threads_compress or os.cpu_count()
threads_decompress = opts.threads_decompress or threads_compress
if not check_tools():
sys.exit("please install tools first")
atoms = fetch_binpkgs(opts.workdir, opts.url)
unpack_binpkgs(opts.workdir, atoms, threads_decompress=threads_decompress)
compress_tars(
opts.workdir,
atoms,
threads_compress=threads_compress,
threads_decompress=threads_decompress,
)
gather_stats(
opts.workdir,
threads_compress=threads_compress,
threads_decompress=threads_decompress,
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))