| #!/usr/bin/env python3 |
| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| r"""Test different compression settings on binpkgs. |
| |
| NB: The working directory can easily take 300 GB! |
| |
| NB: It is safe to interrupt the test at anytime. It will safely resume. |
| |
| The --url option can quickly fetch a remote binpkg store. It is not required |
| if you already have a packages/ directory to test against. This assumes the |
| remote tree is a binpkg repository -- i.e. there is a "packages" directory and |
| it contains all the packages. |
| |
| $ ./eval_binpkg_compression --workdir eve-14665.0.0 \ |
| --url gs://chromeos-dev-installer/board/eve/14665.0.0 |
| |
| $ portageq-amd64-generic envvar PORTAGE_BINHOST |
| $ ./eval_binpkg_compression --workdir amd64-generic-R102-14665.0.0-rc1 \ |
| --url gs://chromeos-prebuilt/board/amd64-generic/full-R102-14665.0.0-rc1 |
| |
| $ portageq envvar PORTAGE_BINHOST |
| $ ./eval_binpkg_compression --workdir chroot-2022.04.03.163758 \ |
| --url gs://chromeos-prebuilt/host/amd64/amd64-host/chroot-2022.04.03.163758 |
| |
| The table will be written to stdout and should be pasteable to spreadsheets. |
| """ |
| |
| import argparse |
| import json |
| import math |
| import os |
| from pathlib import Path |
| import resource |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from typing import List, Optional |
| |
| |
| # Terminal escape sequence to erase the current line after the cursor. |
| CSI_ERASE_LINE_AFTER = '\x1b[K' |
| |
| |
| # List of compression formats & commands & levels to test. |
| # |
| # NB: When adding or updating algorithms here, be careful with -0 & -1 levels. |
| # Not all support -0, and some will treat them as "no compression". |
| # |
| # NB: The order here is used when generating the table. |
| ALGOS = ( |
| # NB: Single threaded bzip2 -9 has been our historical default baseline. |
| ('bz2', ['bzip2'], (9,)), |
| # NB: Test multiple bzip2 implementations to compare speeds, not final size. |
| ('bz2', ['lbzip2', '-n%(threads)i'], (1, 3, 6, 9)), |
| ('bz2', ['pbzip2', '-p%(threads)i'], (1, 3, 6, 9)), |
| # NB: lz4 is always single threaded. |
| ('lz4', ['lz4'], (1, 3, 6, 9)), |
| # NB: lzip is always single threaded. |
| ('lzip', ['lzip'], (0, 3, 6, 9)), |
| ('lzip', ['plzip', '-n%(threads)i'], (0, 3, 6, 9)), |
| # NB: lzop is always single threaded. |
| # NB: [2..6] levels are the same in lzop atm. |
| ('lzop', ['lzop'], (1, 3, 9)), |
| ('gz', ['gzip'], (1, 3, 6, 9)), |
| ('gz', ['pigz', '-p%(threads)i'], (1, 3, 6, 9)), |
| # NB: Test multiple xz implementations to compare speeds, not final size. |
| ('xz', ['xz', '-T%(threads)i'], (0, 3, 6, 9)), |
| ('xz', ['pixz', '-t', '-p%(threads)i'], (0, 3, 6, 9)), |
| ('zst', ['zstd', '-T%(threads)i', '--ultra'], (3, 6, 9, 19, 22)), |
| # NB: Z is an alias for 11 since -11 doesn't work (treated as -1 -1). |
| ('brotli', ['brotli'], (0, 3, 6, 9, 'Z')), |
| ) |
| |
| |
| def print_status(*args, **kwargs): |
| """Print a status-bar message. |
| |
| Output is flushed to stderr w/out newline. |
| """ |
| kwargs.setdefault('end', '') |
| kwargs.setdefault('file', sys.stderr) |
| kwargs.setdefault('flush', True) |
| print(*args, **kwargs) |
| |
| |
| def run_stats(output, *args, **kwargs): |
| """Gather rusage data from a run() command & write them to |output|.""" |
| kwargs.setdefault('check', True) |
| |
| # print('+', ' '.join(str(x) for x in args[0]), end='', flush=True) |
| pid = os.fork() |
| if pid == 0: |
| # Fork a child to reset the rusage stats for this one run. |
| start_time = time.time() |
| # check=True was set above. |
| # pylint: disable=subprocess-run-check |
| subprocess.run(*args, **kwargs) |
| stop_time = time.time() |
| # Save the stats into a JSON file for later processing |
| stats = resource.getrusage(resource.RUSAGE_CHILDREN) |
| data = dict((x, str(getattr(stats, x))) |
| for x in dir(stats) if x.startswith('ru_')) |
| data['wall_clock'] = stop_time - start_time |
| with output.open('w') as fp: |
| json.dump(data, fp) |
| # We forked above, so use _exit here. |
| # pylint: disable=protected-access |
| os._exit(0) |
| |
| _, status = os.waitpid(pid, 0) |
| code = os.WEXITSTATUS(status) |
| if code: |
| sys.exit(code) |
| |
| |
| def compress_tars( |
| workdir: Path, |
| atoms: List[str], |
| threads_compress: int = 0, |
| threads_decompress: int = 0) -> None: |
| """Compress the tars with a variety of algorithms & levels. |
| |
| Also gather cpu/timing stats. |
| """ |
| tar_dir = workdir / 'tar' |
| |
| def comp_cmd(cmd, threads, level=None): |
| settings = {'threads': threads} |
| ret = [x % settings for x in cmd] + ['-c'] |
| if level is not None: |
| ret += [f'-{level}'] |
| return ret |
| |
| temp_file = workdir / '.tmp' |
| numpkgs = len(atoms) |
| width = int(math.log10(numpkgs)) + 1 |
| for i, atom in enumerate(atoms, start=1): |
| print_status(f'\r[{i:{width}}/{numpkgs}] {atom:50}{CSI_ERASE_LINE_AFTER}') |
| tar_file = tar_dir / f'{atom}.tar' |
| |
| for suffix, cmd, levels in ALGOS: |
| cmd_name = cmd[0] |
| print_status(f' {cmd_name}:') |
| for level in levels: |
| print_status(level) |
| out_dir = workdir / f'{cmd_name}-{level}' |
| out_file = out_dir / f'{atom}.tar.{suffix}' |
| |
| # Compress the file to gather stats. |
| if not out_file.exists(): |
| out_file.parent.mkdir(parents=True, exist_ok=True) |
| stats_file = out_dir / f'{atom}.compress.json' |
| with tar_file.open('rb') as stdin, \ |
| temp_file.open('wb') as stdout: |
| run_stats(stats_file, |
| comp_cmd(cmd, threads_compress, level=level), |
| stdin=stdin, stdout=stdout) |
| temp_file.rename(out_file) |
| |
| # Decompress the file to gather stats. |
| stats_file = out_dir / f'{atom}.decompress.json' |
| if not stats_file.exists(): |
| with out_file.open('rb') as stdin: |
| run_stats(stats_file, |
| comp_cmd(cmd + ['-d'], threads_decompress), |
| stdin=stdin, stdout=subprocess.DEVNULL) |
| |
| print_status(f'\r{CSI_ERASE_LINE_AFTER}') |
| |
| |
| def unpack_binpkgs( |
| workdir: Path, |
| atoms: List[str], |
| threads_decompress: int = 0) -> None: |
| """Unpack the binpkgs & return a list of the uncompressed tarballs. |
| |
| This also pulls the archive out (i.e. strips the xpak) to provide easier |
| baseline references. |
| |
| This creates the structure: |
| tar/ |
| $CATEGORY/$PF.tar |
| orig-data/ |
| $CATEGORY/$PF |
| """ |
| pkgdir = workdir / 'packages' |
| tar_dir = workdir / 'tar' |
| data_dir = workdir / 'orig-data' |
| |
| temp_file = workdir / '.tmp' |
| numpkgs = len(atoms) |
| width = int(math.log10(numpkgs)) + 1 |
| for i, atom in enumerate(atoms, start=1): |
| print_status( |
| f'\r[{i:{width}}/{numpkgs}] unpacking {atom:50} {CSI_ERASE_LINE_AFTER}') |
| src_tbz2 = pkgdir / f'{atom}.tbz2' |
| |
| # Copy the Gentoo binpkg & strip the xpak off. |
| data_file = data_dir / atom |
| if not data_file.exists(): |
| print_status('extracting ') |
| shutil.copy(src_tbz2, temp_file) |
| with temp_file.open('rb') as fp: |
| size = temp_file.stat().st_size |
| fp.seek(size - 8) |
| xpak_len = int.from_bytes(fp.read(4), byteorder='big') + 8 |
| data_len = size - xpak_len |
| os.truncate(temp_file, data_len) |
| |
| data_file.parent.mkdir(parents=True, exist_ok=True) |
| temp_file.rename(data_file) |
| data_file.chmod(0o400) |
| |
| # Decompress the binpkg to a tarball. |
| tar_file = tar_dir / f'{atom}.tar' |
| if not tar_file.exists(): |
| print_status(f'decompressing-T{threads_decompress} ') |
| tar_file.parent.mkdir(parents=True, exist_ok=True) |
| run_stats( |
| Path(f'{tar_file}.decompress.json'), |
| ['pbzip2', '-dc', f'-p{threads_decompress}'], |
| stdin=data_file.open('rb'), |
| stdout=temp_file.open('wb')) |
| temp_file.rename(tar_file) |
| tar_file.chmod(0o400) |
| |
| print_status(f'\r{CSI_ERASE_LINE_AFTER}') |
| |
| |
| def fetch_binpkgs(workdir: Path, url: str) -> List[str]: |
| """Download the binpkgs & return a list of atoms. |
| |
| NB: "tbz2" does not mean the archive is using bzip2. It's a historical name |
| that just means "Gentoo binpkg". |
| |
| This creates the structure: |
| packages/ |
| Packages |
| $CATEGORY/$PF.tbz2 |
| """ |
| # gsutil wants this to exist first. |
| workdir.mkdir(parents=True, exist_ok=True) |
| |
| if url: |
| url = url.rstrip('/') |
| if not url.endswith('/packages'): |
| url += '/packages' |
| print_status('Downloading archives\r') |
| subprocess.run(['gsutil', '-m', 'cp', '-r', url, workdir], check=True) |
| |
| pkgdir = workdir / 'packages' |
| return [str(x.relative_to(pkgdir).with_suffix('')) |
| for x in pkgdir.glob('**/*.tbz2')] |
| |
| |
| class Rusage: |
| """Object to hold rusage fields we care about.""" |
| |
| wall_clock = 0.0 |
| ru_maxrss = 0 |
| ru_stime = 0.0 |
| ru_utime = 0.0 |
| |
| |
| def gather_stats( |
| workdir: Path, |
| threads_compress: int = 0, |
| threads_decompress: int = 0) -> None: |
| """Produce a table of data for this run.""" |
| header = ( |
| 'algo', |
| 'size (bytes)', |
| 'size (GiB)', |
| '% 🗜', |
| '🗜 threads', |
| '🗜 rss (GiB)', |
| '🗜 wall ⏲ (min)', |
| '🗜 sys ⏲ (min)', |
| '🗜 user ⏲ (min)', |
| '⇫ threads', |
| '⇫ rss (GiB)', |
| '⇫ wall ⏲ (min)', |
| '⇫ sys ⏲ (min)', |
| '⇫ user ⏲ (min)', |
| ) |
| |
| def iter_algos(): |
| for d in ('tar', 'packages', 'orig-data'): |
| yield workdir / d |
| for _, cmd, levels in ALGOS: |
| for level in levels: |
| yield workdir / f'{cmd[0]}-{level}' |
| |
| stats = [] |
| for algo_dir in iter_algos(): |
| algo = algo_dir.name |
| print_status(f'\rsummarize {algo} {CSI_ERASE_LINE_AFTER}') |
| size = 0 |
| compress_stats = Rusage() |
| decompress_stats = Rusage() |
| for path in algo_dir.glob('**/*'): |
| if path.name.endswith('.compress.json'): |
| data = json.loads(path.read_bytes()) |
| compress_stats.wall_clock += float(data['wall_clock']) |
| compress_stats.ru_maxrss += int(data['ru_maxrss']) |
| compress_stats.ru_stime += float(data['ru_stime']) |
| compress_stats.ru_utime += float(data['ru_utime']) |
| continue |
| elif path.name.endswith('.decompress.json'): |
| data = json.loads(path.read_bytes()) |
| decompress_stats.wall_clock += float(data['wall_clock']) |
| decompress_stats.ru_maxrss += int(data['ru_maxrss']) |
| decompress_stats.ru_stime += float(data['ru_stime']) |
| decompress_stats.ru_utime += float(data['ru_utime']) |
| continue |
| elif path.suffix in ('.out', '.json'): |
| print(f'warning: ignoring {path}', file=sys.stderr) |
| continue |
| |
| size += path.stat().st_size |
| |
| stats.append([ |
| algo, |
| size, |
| round(size / 1024 / 1024 / 1024, 2), |
| 0, |
| threads_compress, |
| round(compress_stats.ru_maxrss / 1024 / 1024, 2), |
| round(compress_stats.wall_clock / 60, 2), |
| round(compress_stats.ru_stime / 60, 2), |
| round(compress_stats.ru_utime / 60, 2), |
| threads_decompress, |
| round(decompress_stats.ru_maxrss / 1024 / 1024, 2), |
| round(decompress_stats.wall_clock / 60, 2), |
| round(decompress_stats.ru_stime / 60, 2), |
| round(decompress_stats.ru_utime / 60, 2), |
| ]) |
| |
| def print_row(row): |
| row[3] = round((row[1] / total_size) * 100, 2) |
| print('\t'.join(str(x) for x in row)) |
| |
| print_status(f'\r{CSI_ERASE_LINE_AFTER}') |
| print('\t'.join(header)) |
| total_size = stats[0][1] |
| for row in stats: |
| print_row(row) |
| |
| |
| def check_tools() -> bool: |
| """Make sure tools we use are installed.""" |
| def _check(cmd) -> bool: |
| ret = bool(shutil.which(cmd)) |
| if not ret: |
| print(f'missing {cmd}', file=sys.stderr) |
| return ret |
| |
| ret = _check('gsutil') |
| for tool in (x[1][0] for x in ALGOS): |
| ret = _check(tool) and ret |
| return ret |
| |
| |
| def get_parser() -> argparse.ArgumentParser: |
| """Get a command line parser.""" |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| parser.add_argument('-w', '--workdir', type=Path, required=True, |
| help='Base directory to write all work') |
| parser.add_argument('-u', '--url', |
| help='GS URI to mirror') |
| parser.add_argument('-t', '--threads-compress', type=int, default=0, |
| help='Number of compression threads') |
| parser.add_argument('-T', '--threads-decompress', type=int, |
| help='Number of decompression threads (defaults to -t)') |
| return parser |
| |
| |
| def main(argv: Optional[List[str]] = None) -> Optional[int]: |
| parser = get_parser() |
| opts = parser.parse_args(argv) |
| |
| threads_compress = opts.threads_compress or os.cpu_count() |
| threads_decompress = opts.threads_decompress or threads_compress |
| |
| if not check_tools(): |
| sys.exit('please install tools first') |
| |
| atoms = fetch_binpkgs(opts.workdir, opts.url) |
| unpack_binpkgs(opts.workdir, atoms, threads_decompress=threads_decompress) |
| compress_tars(opts.workdir, atoms, threads_compress=threads_compress, |
| threads_decompress=threads_decompress) |
| gather_stats(opts.workdir, threads_compress=threads_compress, |
| threads_decompress=threads_decompress) |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(sys.argv[1:])) |