blob: 5208ade65101e04fcaf4f96609ebac9fdf1de929 [file] [log] [blame]
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for xz_auto.py."""
import os
from pathlib import Path
import unittest
from unittest import mock
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.scripts import xz_auto
DIR = Path(__file__).resolve().parent
def FindXzAutoLocation():
"""Figures out where the xz_auto binary is."""
return DIR / "xz_auto"
class XzAutoTests(cros_test_lib.MockTempDirTestCase):
"""Various tests for xz_auto."""
TEST_FILE_CONTENTS = (b"", b"some random file contents")
def DisablePixzForCurrentTest(self) -> None:
"""Disables the use of pixz for the current test."""
# This will be cleaned up by cros_test_lib, so no need to addCleanup.
os.environ[xz_auto.PIXZ_DISABLE_VAR] = "1"
def testPixzArgParsingSeemsToWork(self) -> None:
"""Tests our detection of file names in pixz commandlines."""
self.assertEqual(
xz_auto.ParsePixzArgs(["to_compress.txt"]),
([], "to_compress.txt", None),
)
self.assertEqual(
xz_auto.ParsePixzArgs(["to_compress.txt", "compressed.txt"]),
([], "to_compress.txt", "compressed.txt"),
)
self.assertEqual(
xz_auto.ParsePixzArgs(
["to_compress.txt", "-c", "compressed.txt", "-9"]
),
(["-c", "-9"], "to_compress.txt", "compressed.txt"),
)
self.assertEqual(
xz_auto.ParsePixzArgs(
["-t", "to_compress.txt", "-c", "compressed.txt", "-p", "2"]
),
(["-t", "-c", "-p", "2"], "to_compress.txt", "compressed.txt"),
)
self.assertEqual(
xz_auto.ParsePixzArgs(
["-tcp2", "to_compress.txt", "compressed.txt"]
),
(["-t", "-c", "-p", "2"], "to_compress.txt", "compressed.txt"),
)
@unittest.skipIf(not xz_auto.HasPixz(), "need pixz for this test")
@mock.patch.object(xz_auto, "Execvp")
def testPixzCommandCreationSelectsPixzIfAvailable(
self, execvp_mock
) -> None:
"""Tests that we actually execute pixz when we intend to."""
class ExecvpStopError(Exception):
"""Convenient way to halt execution."""
def execvp_side_effect(argv) -> None:
"""Does testing of our execvp calls."""
self.assertEqual(argv[0], "pixz")
raise ExecvpStopError()
execvp_mock.side_effect = execvp_side_effect
with self.assertRaises(ExecvpStopError):
xz_auto.ExecCompressCommand(stdout=False, argv=[])
with self.assertRaises(ExecvpStopError):
xz_auto.ExecDecompressCommand(stdout=False, argv=[])
def _TestFileCompressionImpl(self, test_empty_file=True) -> None:
"""Tests that compressing a file with xz_auto WAI."""
xz_auto_script = str(FindXzAutoLocation())
test_file_contents = self.TEST_FILE_CONTENTS
if not test_empty_file:
test_file_contents = (x for x in test_file_contents if x)
for file_contents in test_file_contents:
file_location = os.path.join(self.tempdir, "file.txt")
osutils.WriteFile(file_location, file_contents, mode="wb")
cros_build_lib.run(
[
xz_auto_script,
file_location,
],
)
xz_location = file_location + ".xz"
self.assertExists(xz_location)
self.assertNotExists(file_location)
cros_build_lib.run(
[
xz_auto_script,
"--decompress",
xz_location,
]
)
self.assertNotExists(xz_location)
self.assertExists(file_location)
self.assertEqual(
osutils.ReadFile(file_location, mode="rb"),
file_contents,
)
def _TestStdoutCompressionImpl(self) -> None:
"""Tests that compressing stdstreams with xz_auto WAI."""
xz_auto_script = str(FindXzAutoLocation())
for file_contents in self.TEST_FILE_CONTENTS:
run_result = cros_build_lib.run(
[
xz_auto_script,
"-c",
],
capture_output=True,
input=file_contents,
)
compressed_file = run_result.stdout
self.assertNotEqual(compressed_file, file_contents)
run_result = cros_build_lib.run(
[
xz_auto_script,
"--decompress",
"-c",
],
input=compressed_file,
capture_output=True,
)
uncompressed_file = run_result.stdout
self.assertEqual(file_contents, uncompressed_file)
def _TestStdoutCompressionFromFileImpl(self) -> None:
"""Tests that compression of a file & outputting to stdout works.
Pixz has some semi-weird behavior here (b/202735786).
"""
xz_auto_script = str(FindXzAutoLocation())
for file_contents in self.TEST_FILE_CONTENTS:
file_location = os.path.join(self.tempdir, "file.txt")
osutils.WriteFile(file_location, file_contents, mode="wb")
run_result = cros_build_lib.run(
[
xz_auto_script,
"-c",
file_location,
],
capture_output=True,
)
compressed_file = run_result.stdout
self.assertExists(file_location)
self.assertNotEqual(compressed_file, file_contents)
run_result = cros_build_lib.run(
[
xz_auto_script,
"--decompress",
"-c",
],
capture_output=True,
input=compressed_file,
)
uncompressed_file = run_result.stdout
self.assertEqual(file_contents, uncompressed_file)
@unittest.skipIf(not xz_auto.HasPixz(), "need pixz for this test")
def testFileCompressionWithPixzWorks(self) -> None:
"""Tests that compressing a file with pixz WAI."""
self._TestFileCompressionImpl()
# We fall back to `xz` with small files. Make sure we actually cover the
# pixz case, too. We disable testing of empty inputs, since pixz breaks
# with those.
#
# cros_test_lib will clean this var up at the end of the test.
os.environ[xz_auto.XZ_DISABLE_VAR] = "1"
self._TestFileCompressionImpl(test_empty_file=False)
@unittest.skipIf(not xz_auto.HasPixz(), "need pixz for this test")
def testStdoutCompressionWithPixzWorks(self) -> None:
"""Tests that compressing `stdout` with pixz WAI."""
self._TestStdoutCompressionImpl()
@unittest.skipIf(not xz_auto.HasPixz(), "need pixz for this test")
def testStdoutCompressionFromFileWithPixzWorks(self) -> None:
"""Tests that compressing from a file to stdout with pixz WAI."""
self._TestStdoutCompressionFromFileImpl()
def testFileCompressionWithXzWorks(self) -> None:
"""Tests that compressing a file with pixz WAI."""
self.DisablePixzForCurrentTest()
self._TestFileCompressionImpl()
def testStdoutCompressionWithXzWorks(self) -> None:
"""Tests that compressing `stdout` with pixz WAI."""
self.DisablePixzForCurrentTest()
self._TestStdoutCompressionImpl()
def testStdoutCompressionFromFileWithXzWorks(self) -> None:
"""Tests that compressing from a file to stdout WAI."""
self.DisablePixzForCurrentTest()
self._TestStdoutCompressionFromFileImpl()