lib: Delete image_extractor*
This is not used anywhere.
BUG=None
TEST=CQ passes
Change-Id: If41cfb1345c1cfbf0725f5f05fb86b7469d4d2dc
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crostestutils/+/1841587
Tested-by: Amin Hassani <ahassani@chromium.org>
Reviewed-by: Achuith Bhandarkar <achuith@chromium.org>
Commit-Queue: Amin Hassani <ahassani@chromium.org>
diff --git a/lib/image_extractor.py b/lib/image_extractor.py
deleted file mode 100644
index eb4f81e..0000000
--- a/lib/image_extractor.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing class to extract the latest image for a build."""
-
-import distutils.version
-import logging
-import os
-import re
-import shutil
-import zipfile
-import zlib
-
-from chromite.lib import cros_build_lib
-
-
-class ImageExtractor(object):
- """Class used to get the latest image for the board."""
- # The default image to extract.
- IMAGE_TO_EXTRACT = 'chromiumos_test_image.bin'
- # Archive directory in the src tree to keep latest archived image after
- # we've unzipped them.
- SRC_ARCHIVE_DIR = 'latest_image'
-
- def __init__(self, archive_dir, image_to_extract=None):
- """Initializes a extractor for the archive_dir."""
- self.archive = archive_dir
- if not image_to_extract:
- image_to_extract = self.IMAGE_TO_EXTRACT
- self.image_to_extract = image_to_extract
-
- def ValidateZip(self, zip_image):
- """Validate that a zipped image is not corrupt.
-
- Args:
- zip_image:
-
- Returns:
- True if valid, else False.
- """
-
- try:
- # These two lines will either return the name of the first bad file
- # inside the zip, or raise an exception if it doesn't look like a valid
- # zip at all.
- zf = zipfile.ZipFile(zip_image)
- return zf.testzip() == None
- except (zipfile.BadZipfile, zlib.error):
- return False
-
- def GetLatestImage(self, target_version):
- """Gets the last image archived for the board.
-
- Args:
- target_version: The version that is being tested. The archive
- directory may be being populated with the results of this version
- while we're running so we shouldn't use it as the last image archived.
- """
- logging.info('Searching for previously generated images in %s ... ',
- self.archive)
- if os.path.exists(self.archive):
- my_re = re.compile(r'R\d+-(\d+)\.(\d+)\.(\d+).*')
- filelist = []
- target_lv = distutils.version.LooseVersion(target_version)
- for filename in os.listdir(self.archive):
- lv = distutils.version.LooseVersion(filename)
- if my_re.match(filename):
- zip_image = os.path.join(self.archive, filename, 'image.zip')
- if lv < target_lv and os.path.exists(zip_image):
- if self.ValidateZip(zip_image):
- filelist.append(lv)
- else:
- logging.error('Version in archive dir is corrupt: %s', filename)
-
- elif not filename.startswith(target_version):
- logging.error('Version in archive dir is too new: %s', filename)
- if filelist:
- return os.path.join(self.archive, str(max(filelist)))
-
- logging.warn('Could not find a previously generated image on this host.')
- return None
-
- def UnzipImage(self, image_dir):
- """Unzips the image.zip from the image_dir and returns the image.
-
- This method unzips the specified image from the archive dir. In order to
- to save time, the image is cached under a subdirectory of the archive dir.
- If it is attempting to re-unzip the same image with the same version
- string, it uses the cached image. It determines the version string based
- on the last path parts of the image_dir.
-
- Args:
- image_dir: Directory with image to unzip.
-
- Returns:
- The path to the image.bin file after it has been unzipped. Returns None
- if the expected image is missing from the image.zip.
- """
- # Use the last 2 paths as the version_string path (may include board id).
- version_string = os.path.join(*image_dir.split(os.path.sep)[-2:])
- cached_dir = os.path.join(image_dir, ImageExtractor.SRC_ARCHIVE_DIR,
- version_string)
- cached_image = os.path.abspath(os.path.join(
- cached_dir, self.image_to_extract))
- # If we previously unzipped the image, we're done.
- if os.path.exists(cached_image):
- logging.info('Re-using image with version %s that we previously '
- 'unzipped to %s.', version_string, cached_image)
- else:
- # Cached image for version not found. Unzipping image from archive.
- if os.path.exists(cached_dir):
- logging.info('Removing previously archived images from %s',
- cached_dir)
- shutil.rmtree(cached_dir)
-
- os.makedirs(cached_dir)
- zip_path = os.path.join(image_dir, 'image.zip')
- logging.info('Unzipping image from %s to %s', zip_path, cached_dir)
- cros_build_lib.RunCommand(['unzip', '-d', cached_dir, zip_path],
- print_cmd=False)
- if not os.path.exists(cached_image):
- logging.warn('image.zip did not contain expected image.')
- return None
-
- return cached_image
diff --git a/lib/image_extractor_unittest.py b/lib/image_extractor_unittest.py
deleted file mode 100755
index 0a37cbd..0000000
--- a/lib/image_extractor_unittest.py
+++ /dev/null
@@ -1,194 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing unittests for the image_extractor module."""
-
-import logging
-import mox
-import os
-import shutil
-import sys
-import tempfile
-import unittest
-import zipfile
-
-import constants
-sys.path.append(constants.SOURCE_ROOT)
-
-import image_extractor
-
-
-class ImageExtractorTest(mox.MoxTestBase):
- """Testing all'm image extractors."""
- def setUp(self):
- super(ImageExtractorTest, self).setUp()
-
- self.work_dir = tempfile.mkdtemp('ImageExtractorTest')
- self.board = 'x86-generic-full'
- # Set constants to be easily testable.
- self.archive_dir = os.path.join(self.work_dir, 'archive', self.board)
- image_extractor.ImageExtractor.SRC_ARCHIVE_DIR = os.path.join(self.work_dir,
- 'src')
-
- # Our test object.
- self.test_extractor = image_extractor.ImageExtractor(self.archive_dir)
-
- # Convenience variables for testing.
- self.src_archive = image_extractor.ImageExtractor.SRC_ARCHIVE_DIR
- self.image = image_extractor.ImageExtractor.IMAGE_TO_EXTRACT
- self.mox.StubOutWithMock(logging, 'error')
-
- def tearDown(self):
- shutil.rmtree(self.work_dir)
-
- def _TouchImageZip(self, directory, add_image=True):
- """Creates a psuedo image.zip file to unzip."""
- if not os.path.exists(directory):
- os.makedirs(directory)
-
- with zipfile.ZipFile(os.path.join(directory, 'image.zip'), 'w') as z:
- # Zip file can't be empty so add a dummy file first.
- dummy_file = os.path.join(directory, 'TACOS_ARE_DELCIOUS.txt')
- with open(dummy_file, 'w') as f:
- f.write('tacos')
-
- z.write(dummy_file)
-
- if add_image:
- image_path = os.path.join(directory, self.image)
- archive_path = self.image
- # Create a dummy image file.
- with open(image_path, 'w') as f:
- f.write('ooga booga')
-
- z.write(image_path, archive_path)
-
- def CreateFakeArchiveDir(self, number_of_entries, add_build_number=False):
- """Creates a fake archive dir with specified number of entries."""
- # Create local board directory e.g. /var/archive/x86-generic-full.
- os.makedirs(self.archive_dir)
- # Create a latest file.
- open(os.path.join(self.archive_dir, 'LATEST'), 'w').close()
- version_s = 'R16-158.0.0-a1-b%d' if add_build_number else 'R16-158.0.%d-a1'
- # Create specified number of entries.
- for i in range(number_of_entries):
- new_dir = os.path.join(self.archive_dir, version_s % i)
- os.makedirs(new_dir)
- self._TouchImageZip(new_dir)
-
- def testGetLatestImageWithNoEntries(self):
- """Should return None if the directory has no entries."""
- self.CreateFakeArchiveDir(0)
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.11-a1')
- self.assertEqual(latest_image, None)
-
- def testGetLatestImageWithOldEntry(self):
- """Compatibility testing. GetLatestImage should ignore old style entries.
- """
- self.CreateFakeArchiveDir(0)
- os.makedirs(os.path.join(self.archive_dir, '0-158.0.1-a1'))
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.11-a1')
- self.assertEqual(latest_image, None)
-
- def testGetLatestImageWithBuildEntries(self):
- """The normal case with build#'s. Return the path to the highest entry.
-
- Test both ways to mix version strings with and without build numbers. We
- generate R16-158.0.0-a1-b[0-10] in the local archive and test again the
- target version R16-158.0.1-a1 and R16-158.0.0-a1-b11. These both should
- return R16-158.0.0-a1-b10.
- """
- self.CreateFakeArchiveDir(11, add_build_number=True)
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.1-a1')
- self.assertEqual(os.path.basename(latest_image), 'R16-158.0.0-a1-b10')
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.0-a1-b11')
- self.assertEqual(os.path.basename(latest_image), 'R16-158.0.0-a1-b10')
-
- def testGetLatestImageWithEntries(self):
- """The normal case. Return the path to the highest entry."""
- self.CreateFakeArchiveDir(11)
- # Throw in a bad directory for good measure.
- os.makedirs(os.path.join(self.archive_dir, '0-158.0.1-a1'))
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.11-a1')
- self.assertEqual(os.path.basename(latest_image), 'R16-158.0.10-a1')
-
- def testGetLatestImageWithEntriesAndTarget(self):
- """The normal case but we pass in a target_version.
-
- Returns the path to the highest entry before target and spits out a
- logging error saying that 10 is too high.
- """
- self.CreateFakeArchiveDir(11)
- os.makedirs(os.path.join(self.archive_dir, 'R16-158.0.9-a1-b123'))
- logging.error(mox.IgnoreArg(), 'R16-158.0.10-a1')
- self.mox.ReplayAll()
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.9-a1')
- self.assertEqual(os.path.basename(latest_image), 'R16-158.0.8-a1')
- self.mox.VerifyAll()
-
- def testUnzipImageArchiveAlready(self):
- """Ensure we create a new archive and delete the old one."""
- old_entry = os.path.join(self.src_archive, self.board, 'R16-158.0.0-a1')
- os.makedirs(old_entry)
- new_entry = os.path.join(self.src_archive, self.board, 'R16-158.0.1-a1')
- archived_image_dir = os.path.join(self.archive_dir, 'R16-158.0.1-a1')
- self._TouchImageZip(archived_image_dir)
-
- self.mox.ReplayAll()
- expected_image = os.path.join(new_entry, self.image)
- image_returned = self.test_extractor.UnzipImage(archived_image_dir)
- self.mox.VerifyAll()
- self.assertFalse(os.path.exists(old_entry))
- self.assertTrue(os.path.exists(new_entry))
- self.assertEqual(expected_image, image_returned)
-
- def testUnzipImageNoArchive(self):
- """Ensure we create a new archive with none before."""
- new_entry = os.path.join(self.src_archive, self.board, 'R16-158.0.1-a1')
- archived_image_dir = os.path.join(self.archive_dir, 'R16-158.0.1-a1')
- self._TouchImageZip(archived_image_dir)
-
- self.mox.ReplayAll()
- expected_image = os.path.join(new_entry, self.image)
- image_returned = self.test_extractor.UnzipImage(archived_image_dir)
- self.mox.VerifyAll()
- self.assertTrue(os.path.exists(new_entry))
- self.assertEqual(expected_image, image_returned)
-
- def testUnzipImageMissingImage(self):
- """Ensure that we return None when the expected image is missing."""
- new_entry = os.path.join(self.src_archive, self.board, 'R16-158.0.1-a1')
- archived_image_dir = os.path.join(self.archive_dir, 'R16-158.0.1-a1')
- # Don't actually add the image.bin!
- self._TouchImageZip(archived_image_dir, add_image=False)
-
- self.mox.ReplayAll()
- self.assertEqual(self.test_extractor.UnzipImage(archived_image_dir), None)
- self.mox.VerifyAll()
- self.assertTrue(os.path.exists(new_entry))
-
- def testBadZipImageArchive(self):
- """Ensure we ignore corrupt archives."""
-
- # Create valid archive followed by corrupt one.
- self.CreateFakeArchiveDir(2, add_build_number=True)
- bad_zip_name = os.path.join(
- self.archive_dir, 'R16-158.0.0-a1-b1', 'image.zip')
- with open(bad_zip_name, 'w') as f:
- f.write('oogabooga')
-
- # This is normally mox'd out to ensure it's never called, but we expect it.
- logging.error('Version in archive dir is corrupt: %s', 'R16-158.0.0-a1-b1')
-
- self.mox.ReplayAll()
-
- # Ensure we fine the first one (valid), not the second (corrupt)
- latest_image = self.test_extractor.GetLatestImage('R16-158.0.1-a1')
- self.assertEqual(os.path.basename(latest_image), 'R16-158.0.0-a1-b0')
-
-
-if __name__ == '__main__':
- unittest.main()