| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import zipfile |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import autotemp, error |
| from autotest_lib.client.cros.cros_disks import CrosDisksTester |
| from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage |
| from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent |
| from collections import deque |
| |
| |
| class CrosDisksArchiveTester(CrosDisksTester): |
| """A tester to verify archive support in CrosDisks. |
| """ |
| def __init__(self, test): |
| super(CrosDisksArchiveTester, self).__init__(test) |
| |
| def find_all_files(self, root_dir): |
| """Returns all files under a directory and its sub-directories. |
| |
| This is a generator that performs a breadth-first-search of |
| all files under a specified directory and its sub-directories. |
| |
| Args: |
| root_dir: The root directory where the search starts from. |
| Yields: |
| Path of any found file relative to the root directory. |
| """ |
| dirs_to_explore = deque(['']) |
| while len(dirs_to_explore) > 0: |
| current_dir = dirs_to_explore.popleft() |
| for path in os.listdir(os.path.join(root_dir, current_dir)): |
| expanded_path = os.path.join(root_dir, current_dir, path) |
| relative_path = os.path.join(current_dir, path) |
| if os.path.isdir(expanded_path): |
| dirs_to_explore.append(relative_path) |
| else: |
| yield relative_path |
| |
| def make_zip_archive(self, archive_path, root_dir, |
| compression=zipfile.ZIP_DEFLATED): |
| """Archives a specified directory into a ZIP file. |
| |
| The created ZIP file contains all files and sub-directories |
| under the specified root directory, but not the root directory |
| itself. |
| |
| Args: |
| archive_path: Path of the output archive. |
| root_dir: The root directory to archive. |
| compression: The ZIP compression method. |
| """ |
| # ZipFile in Python 2.6 does not work with the 'with' statement. |
| archive = zipfile.ZipFile(archive_path, 'w', compression) |
| for path in self.find_all_files(root_dir): |
| archive.write(os.path.join(root_dir, path), path) |
| archive.close() |
| |
| def test_zip_archive(self): |
| # Create the ZIP file content in a temporary directory. |
| zip_dir = autotemp.tempdir(unique_id='CrosDisks') |
| test_content = DefaultFilesystemTestContent() |
| if not test_content.create(zip_dir.name): |
| raise error.TestFail("Failed to create archive test content") |
| |
| # Create a FAT-formatted virtual filesystem image containing a ZIP |
| # file to help stimulate mounting a ZIP file on a removable drive. |
| with VirtualFilesystemImage( |
| block_size=1024, |
| block_count=65536, |
| filesystem_type='vfat', |
| mkfs_options=[ '-F', '32', '-n', 'ZIPTEST' ]) as image: |
| image.format() |
| image.mount(options=['sync']) |
| # Create the ZIP file on the virtual filesystem image. |
| archive_name = 'test.zip' |
| archive_path = os.path.join(image.mount_dir, archive_name) |
| self.make_zip_archive(archive_path, zip_dir.name) |
| image.unmount() |
| |
| # Mount the virtual filesystem image via CrosDisks. |
| device_file = image.loop_device |
| self.cros_disks.mount(device_file, '', |
| [ "ro", "nodev", "noexec", "nosuid" ]) |
| result = self.cros_disks.expect_mount_completion({ |
| 'status': 0, |
| 'source_path': device_file |
| }) |
| |
| # Mount the ZIP file on the mounted filesystem via CrosDisks. |
| archive_path = os.path.join(result['mount_path'], archive_name) |
| expected_mount_path = os.path.join('/media/archive', archive_name) |
| self.cros_disks.mount(archive_path) |
| result = self.cros_disks.expect_mount_completion({ |
| 'status': 0, |
| 'source_path': archive_path, |
| 'mount_path': expected_mount_path |
| }) |
| |
| # Verify the content of the mounted ZIP file. |
| if not test_content.verify(expected_mount_path): |
| raise error.TestFail("Failed to verify filesystem test content") |
| |
| self.cros_disks.unmount(expected_mount_path, ['force']) |
| self.cros_disks.unmount(device_file, ['force']) |
| |
| def get_tests(self): |
| return [self.test_zip_archive] |
| |
| |
| class platform_CrosDisksArchive(test.test): |
| version = 1 |
| |
| def run_once(self, *args, **kwargs): |
| tester = CrosDisksArchiveTester(self) |
| tester.run(*args, **kwargs) |