| #!/usr/bin/env python3 |
| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| r"""Test different compression settings on binpkgs. |
| |
| NB: The working directory can easily take 300 GB! |
| |
| NB: It is safe to interrupt the test at anytime. It will safely resume. |
| |
| The --url option can quickly fetch a remote binpkg store. It is not required |
| if you already have a packages/ directory to test against. This assumes the |
| remote tree is a binpkg repository -- i.e. there is a "packages" directory and |
| it contains all the packages. |
| |
| $ ./eval_binpkg_compression --workdir eve-14665.0.0 \ |
| --url gs://chromeos-dev-installer/board/eve/14665.0.0 |
| |
| $ portageq-amd64-generic envvar PORTAGE_BINHOST |
| $ ./eval_binpkg_compression --workdir amd64-generic-R102-14665.0.0-rc1 \ |
| --url gs://chromeos-prebuilt/board/amd64-generic/full-R102-14665.0.0-rc1 |
| |
| $ portageq envvar PORTAGE_BINHOST |
| $ ./eval_binpkg_compression --workdir chroot-2022.04.03.163758 \ |
| --url gs://chromeos-prebuilt/host/amd64/amd64-host/chroot-2022.04.03.163758 |
| |
| The table will be written to stdout and should be pasteable to spreadsheets. |
| """ |
| |
| import argparse |
| import json |
| import math |
| import os |
| from pathlib import Path |
| import resource |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from typing import List, Optional |
| |
| |
| # Terminal escape sequence to erase the current line after the cursor. |
| CSI_ERASE_LINE_AFTER = "\x1b[K" |
| |
| |
| # List of compression formats & commands & levels to test. |
| # |
| # NB: When adding or updating algorithms here, be careful with -0 & -1 levels. |
| # Not all support -0, and some will treat them as "no compression". |
| # |
| # NB: The order here is used when generating the table. |
| ALGOS = ( |
| # NB: Single threaded bzip2 -9 has been our historical default baseline. |
| ("bz2", ["bzip2"], (9,)), |
| # NB: Test multiple bzip2 implementations to compare speeds, not final size. |
| ("bz2", ["lbzip2", "-n%(threads)i"], (1, 3, 6, 9)), |
| ("bz2", ["pbzip2", "-p%(threads)i"], (1, 3, 6, 9)), |
| # NB: lz4 is always single threaded. |
| ("lz4", ["lz4"], (1, 3, 6, 9)), |
| # NB: lzip is always single threaded. |
| ("lzip", ["lzip"], (0, 3, 6, 9)), |
| ("lzip", ["plzip", "-n%(threads)i"], (0, 3, 6, 9)), |
| # NB: lzop is always single threaded. |
| # NB: [2..6] levels are the same in lzop atm. |
| ("lzop", ["lzop"], (1, 3, 9)), |
| ("gz", ["gzip"], (1, 3, 6, 9)), |
| ("gz", ["pigz", "-p%(threads)i"], (1, 3, 6, 9)), |
| # NB: Test multiple xz implementations to compare speeds, not final size. |
| ("xz", ["xz", "-T%(threads)i"], (0, 3, 6, 9)), |
| ("xz", ["pixz", "-t", "-p%(threads)i"], (0, 3, 6, 9)), |
| ("zst", ["zstd", "-T%(threads)i", "--ultra"], (3, 6, 9, 19, 22)), |
| # NB: Z is an alias for 11 since -11 doesn't work (treated as -1 -1). |
| ("brotli", ["brotli"], (0, 3, 6, 9, "Z")), |
| ) |
| |
| |
| def print_status(*args, **kwargs): |
| """Print a status-bar message. |
| |
| Output is flushed to stderr w/out newline. |
| """ |
| kwargs.setdefault("end", "") |
| kwargs.setdefault("file", sys.stderr) |
| kwargs.setdefault("flush", True) |
| print(*args, **kwargs) |
| |
| |
| def run_stats(output, *args, **kwargs): |
| """Gather rusage data from a run() command & write them to |output|.""" |
| kwargs.setdefault("check", True) |
| |
| # print('+', ' '.join(str(x) for x in args[0]), end='', flush=True) |
| pid = os.fork() |
| if pid == 0: |
| # Fork a child to reset the rusage stats for this one run. |
| start_time = time.time() |
| # check=True was set above. |
| # pylint: disable=subprocess-run-check |
| subprocess.run(*args, **kwargs) |
| stop_time = time.time() |
| # Save the stats into a JSON file for later processing |
| stats = resource.getrusage(resource.RUSAGE_CHILDREN) |
| data = dict( |
| (x, str(getattr(stats, x))) |
| for x in dir(stats) |
| if x.startswith("ru_") |
| ) |
| data["wall_clock"] = stop_time - start_time |
| with output.open("wb") as fp: |
| json.dump(data, fp) |
| # We forked above, so use _exit here. |
| # pylint: disable=protected-access |
| os._exit(0) |
| |
| _, status = os.waitpid(pid, 0) |
| code = os.WEXITSTATUS(status) |
| if code: |
| sys.exit(code) |
| |
| |
| def compress_tars( |
| workdir: Path, |
| atoms: List[str], |
| threads_compress: int = 0, |
| threads_decompress: int = 0, |
| ) -> None: |
| """Compress the tars with a variety of algorithms & levels. |
| |
| Also gather cpu/timing stats. |
| """ |
| tar_dir = workdir / "tar" |
| |
| def comp_cmd(cmd, threads, level=None): |
| settings = {"threads": threads} |
| ret = [x % settings for x in cmd] + ["-c"] |
| if level is not None: |
| ret += [f"-{level}"] |
| return ret |
| |
| temp_file = workdir / ".tmp" |
| numpkgs = len(atoms) |
| width = int(math.log10(numpkgs)) + 1 |
| for i, atom in enumerate(atoms, start=1): |
| print_status( |
| f"\r[{i:{width}}/{numpkgs}] {atom:50}{CSI_ERASE_LINE_AFTER}" |
| ) |
| tar_file = tar_dir / f"{atom}.tar" |
| |
| for suffix, cmd, levels in ALGOS: |
| cmd_name = cmd[0] |
| print_status(f" {cmd_name}:") |
| for level in levels: |
| print_status(level) |
| out_dir = workdir / f"{cmd_name}-{level}" |
| out_file = out_dir / f"{atom}.tar.{suffix}" |
| |
| # Compress the file to gather stats. |
| if not out_file.exists(): |
| out_file.parent.mkdir(parents=True, exist_ok=True) |
| stats_file = out_dir / f"{atom}.compress.json" |
| with tar_file.open("rb") as stdin, temp_file.open( |
| "wb" |
| ) as stdout: |
| run_stats( |
| stats_file, |
| comp_cmd(cmd, threads_compress, level=level), |
| stdin=stdin, |
| stdout=stdout, |
| ) |
| temp_file.rename(out_file) |
| |
| # Decompress the file to gather stats. |
| stats_file = out_dir / f"{atom}.decompress.json" |
| if not stats_file.exists(): |
| with out_file.open("rb") as stdin: |
| run_stats( |
| stats_file, |
| comp_cmd(cmd + ["-d"], threads_decompress), |
| stdin=stdin, |
| stdout=subprocess.DEVNULL, |
| ) |
| |
| print_status(f"\r{CSI_ERASE_LINE_AFTER}") |
| |
| |
| def unpack_binpkgs( |
| workdir: Path, atoms: List[str], threads_decompress: int = 0 |
| ) -> None: |
| """Unpack the binpkgs & return a list of the uncompressed tarballs. |
| |
| This also pulls the archive out (i.e. strips the xpak) to provide easier |
| baseline references. |
| |
| This creates the structure: |
| tar/ |
| $CATEGORY/$PF.tar |
| orig-data/ |
| $CATEGORY/$PF |
| """ |
| pkgdir = workdir / "packages" |
| tar_dir = workdir / "tar" |
| data_dir = workdir / "orig-data" |
| |
| temp_file = workdir / ".tmp" |
| numpkgs = len(atoms) |
| width = int(math.log10(numpkgs)) + 1 |
| for i, atom in enumerate(atoms, start=1): |
| print_status( |
| f"\r[{i:{width}}/{numpkgs}] unpacking {atom:50}", |
| CSI_ERASE_LINE_AFTER, |
| ) |
| src_tbz2 = pkgdir / f"{atom}.tbz2" |
| |
| # Copy the Gentoo binpkg & strip the xpak off. |
| data_file = data_dir / atom |
| if not data_file.exists(): |
| print_status("extracting ") |
| shutil.copy(src_tbz2, temp_file) |
| with temp_file.open("rb") as fp: |
| size = temp_file.stat().st_size |
| fp.seek(size - 8) |
| xpak_len = int.from_bytes(fp.read(4), byteorder="big") + 8 |
| data_len = size - xpak_len |
| os.truncate(temp_file, data_len) |
| |
| data_file.parent.mkdir(parents=True, exist_ok=True) |
| temp_file.rename(data_file) |
| data_file.chmod(0o400) |
| |
| # Decompress the binpkg to a tarball. |
| tar_file = tar_dir / f"{atom}.tar" |
| if not tar_file.exists(): |
| print_status(f"decompressing-T{threads_decompress} ") |
| tar_file.parent.mkdir(parents=True, exist_ok=True) |
| run_stats( |
| Path(f"{tar_file}.decompress.json"), |
| ["pbzip2", "-dc", f"-p{threads_decompress}"], |
| stdin=data_file.open("rb"), |
| stdout=temp_file.open("wb"), |
| ) |
| temp_file.rename(tar_file) |
| tar_file.chmod(0o400) |
| |
| print_status(f"\r{CSI_ERASE_LINE_AFTER}") |
| |
| |
| def fetch_binpkgs(workdir: Path, url: str) -> List[str]: |
| """Download the binpkgs & return a list of atoms. |
| |
| NB: "tbz2" does not mean the archive is using bzip2. It's a historical name |
| that just means "Gentoo binpkg". |
| |
| This creates the structure: |
| packages/ |
| Packages |
| $CATEGORY/$PF.tbz2 |
| """ |
| # gsutil wants this to exist first. |
| workdir.mkdir(parents=True, exist_ok=True) |
| |
| if url: |
| url = url.rstrip("/") |
| if not url.endswith("/packages"): |
| url += "/packages" |
| print_status("Downloading archives\r") |
| subprocess.run(["gsutil", "-m", "cp", "-r", url, workdir], check=True) |
| |
| pkgdir = workdir / "packages" |
| return [ |
| str(x.relative_to(pkgdir).with_suffix("")) |
| for x in pkgdir.glob("**/*.tbz2") |
| ] |
| |
| |
| class Rusage: |
| """Object to hold rusage fields we care about.""" |
| |
| wall_clock = 0.0 |
| ru_maxrss = 0 |
| ru_stime = 0.0 |
| ru_utime = 0.0 |
| |
| |
| def gather_stats( |
| workdir: Path, threads_compress: int = 0, threads_decompress: int = 0 |
| ) -> None: |
| """Produce a table of data for this run.""" |
| header = ( |
| "algo", |
| "size (bytes)", |
| "size (GiB)", |
| "% 🗜", |
| "🗜 threads", |
| "🗜 rss (GiB)", |
| "🗜 wall ⏲ (min)", |
| "🗜 sys ⏲ (min)", |
| "🗜 user ⏲ (min)", |
| "⇫ threads", |
| "⇫ rss (GiB)", |
| "⇫ wall ⏲ (min)", |
| "⇫ sys ⏲ (min)", |
| "⇫ user ⏲ (min)", |
| ) |
| |
| def iter_algos(): |
| for d in ("tar", "packages", "orig-data"): |
| yield workdir / d |
| for _, cmd, levels in ALGOS: |
| for level in levels: |
| yield workdir / f"{cmd[0]}-{level}" |
| |
| stats = [] |
| for algo_dir in iter_algos(): |
| algo = algo_dir.name |
| print_status(f"\rsummarize {algo} {CSI_ERASE_LINE_AFTER}") |
| size = 0 |
| compress_stats = Rusage() |
| decompress_stats = Rusage() |
| for path in algo_dir.glob("**/*"): |
| if path.name.endswith(".compress.json"): |
| data = json.loads(path.read_bytes()) |
| compress_stats.wall_clock += float(data["wall_clock"]) |
| compress_stats.ru_maxrss += int(data["ru_maxrss"]) |
| compress_stats.ru_stime += float(data["ru_stime"]) |
| compress_stats.ru_utime += float(data["ru_utime"]) |
| continue |
| elif path.name.endswith(".decompress.json"): |
| data = json.loads(path.read_bytes()) |
| decompress_stats.wall_clock += float(data["wall_clock"]) |
| decompress_stats.ru_maxrss += int(data["ru_maxrss"]) |
| decompress_stats.ru_stime += float(data["ru_stime"]) |
| decompress_stats.ru_utime += float(data["ru_utime"]) |
| continue |
| elif path.suffix in (".out", ".json"): |
| print(f"warning: ignoring {path}", file=sys.stderr) |
| continue |
| |
| size += path.stat().st_size |
| |
| stats.append( |
| [ |
| algo, |
| size, |
| round(size / 1024 / 1024 / 1024, 2), |
| 0, |
| threads_compress, |
| round(compress_stats.ru_maxrss / 1024 / 1024, 2), |
| round(compress_stats.wall_clock / 60, 2), |
| round(compress_stats.ru_stime / 60, 2), |
| round(compress_stats.ru_utime / 60, 2), |
| threads_decompress, |
| round(decompress_stats.ru_maxrss / 1024 / 1024, 2), |
| round(decompress_stats.wall_clock / 60, 2), |
| round(decompress_stats.ru_stime / 60, 2), |
| round(decompress_stats.ru_utime / 60, 2), |
| ] |
| ) |
| |
| def print_row(row): |
| row[3] = round((row[1] / total_size) * 100, 2) |
| print("\t".join(str(x) for x in row)) |
| |
| print_status(f"\r{CSI_ERASE_LINE_AFTER}") |
| print("\t".join(header)) |
| total_size = stats[0][1] |
| for row in stats: |
| print_row(row) |
| |
| |
| def check_tools() -> bool: |
| """Make sure tools we use are installed.""" |
| |
| def _check(cmd) -> bool: |
| ret = bool(shutil.which(cmd)) |
| if not ret: |
| print(f"missing {cmd}", file=sys.stderr) |
| return ret |
| |
| ret = _check("gsutil") |
| for tool in (x[1][0] for x in ALGOS): |
| ret = _check(tool) and ret |
| return ret |
| |
| |
| def get_parser() -> argparse.ArgumentParser: |
| """Get a command line parser.""" |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument( |
| "-w", |
| "--workdir", |
| type=Path, |
| required=True, |
| help="Base directory to write all work", |
| ) |
| parser.add_argument("-u", "--url", help="GS URI to mirror") |
| parser.add_argument( |
| "-t", |
| "--threads-compress", |
| type=int, |
| default=0, |
| help="Number of compression threads", |
| ) |
| parser.add_argument( |
| "-T", |
| "--threads-decompress", |
| type=int, |
| help="Number of decompression threads (defaults to -t)", |
| ) |
| return parser |
| |
| |
| def main(argv: Optional[List[str]] = None) -> Optional[int]: |
| parser = get_parser() |
| opts = parser.parse_args(argv) |
| |
| threads_compress = opts.threads_compress or os.cpu_count() |
| threads_decompress = opts.threads_decompress or threads_compress |
| |
| if not check_tools(): |
| sys.exit("please install tools first") |
| |
| atoms = fetch_binpkgs(opts.workdir, opts.url) |
| unpack_binpkgs(opts.workdir, atoms, threads_decompress=threads_decompress) |
| compress_tars( |
| opts.workdir, |
| atoms, |
| threads_compress=threads_compress, |
| threads_decompress=threads_decompress, |
| ) |
| gather_stats( |
| opts.workdir, |
| threads_compress=threads_compress, |
| threads_decompress=threads_decompress, |
| ) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv[1:])) |