| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import logging |
| import os |
| import subprocess |
| import shutil |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib import file_utils |
| from autotest_lib.client.common_lib.cros import dbus_send |
| from autotest_lib.client.cros import debugd_util |
| |
| import archiver |
| import helpers |
| import fake_printer |
| import multithreaded_processor |
| |
| # Timeout for printing documents in seconds |
| _FAKE_PRINTER_TIMEOUT = 200 |
| |
| # Prefix for CUPS printer name |
| _FAKE_PRINTER_ID = 'FakePrinter' |
| |
| # First port number to use, this test uses consecutive ports numbers, |
| # different for every PPD file |
| _FIRST_PORT_NUMBER = 9000 |
| |
| # Values are from platform/system_api/dbus/debugd/dbus-constants.h. |
| _CUPS_SUCCESS = 0 |
| |
| class platform_PrinterPpds(test.test): |
| """ |
| This test gets a list of PPD files and a list of test documents. It tries |
| to add printer using each PPD file and to print all test documents on |
| every printer created this way. Becasue the number of PPD files to test can |
| be large (more then 3K), PPD files are tested simultaneously in many |
| threads. |
| |
| """ |
| version = 2 |
| |
| |
| def _get_filenames_from_PPD_indexes(self): |
| """ |
| It returns all PPD filenames from SCS server. |
| |
| @returns a list of PPD filenames without duplicates |
| |
| """ |
| # extracts PPD filenames from all 20 index files (in parallel) |
| outputs = self._processor.run(helpers.get_filenames_from_PPD_index, 20) |
| # joins obtained lists and performs deduplication |
| ppd_files = set() |
| for output in outputs: |
| ppd_files.update(output) |
| return list(ppd_files) |
| |
| |
| def _load_component(self, component): |
| """ |
| Download filter component via dbus API |
| |
| @param component: name of component |
| |
| @raises error.TestFail is component is not loaded. |
| |
| """ |
| logging.info('download component:' + component); |
| res = dbus_send.dbus_send( |
| 'org.chromium.ComponentUpdaterService', |
| 'org.chromium.ComponentUpdaterService', |
| '/org/chromium/ComponentUpdaterService', |
| 'LoadComponent', |
| timeout_seconds=20, |
| user='root', |
| args=[dbus.String(component)]) |
| if res.response == '': |
| message = 'Component %s could not be loaded.' % component |
| raise error.TestFail(message) |
| |
| |
| def _delete_component(self, component): |
| """ |
| Delete filter component via dbus API |
| |
| @param component: name of component |
| |
| """ |
| logging.info('delete component:' + component); |
| dbus_send.dbus_send( |
| 'org.chromium.ComponentUpdaterService', |
| 'org.chromium.ComponentUpdaterService', |
| '/org/chromium/ComponentUpdaterService', |
| 'UnloadComponent', |
| timeout_seconds=20, |
| user='root', |
| args=[dbus.String(component)]) |
| |
| |
| def _calculate_full_path(self, path): |
| """ |
| Converts path given as a parameter to absolute path. |
| |
| @param path: a path set in configuration (relative, absolute or None) |
| |
| @returns absolute path or None if the input parameter was None |
| |
| """ |
| if path is None or os.path.isabs(path): |
| return path |
| path_current = os.path.dirname(os.path.realpath(__file__)) |
| return os.path.join(path_current, path) |
| |
| |
| def initialize( |
| self, path_docs, |
| path_ppds=None, path_digests=None, threads_count=8): |
| """ |
| @param path_docs: path to local directory with documents to print |
| @param path_ppds: path to local directory with PPD files to test; |
| if None is set then all PPD files from the SCS server are |
| downloaded and tested |
| @param path_digests: path to local directory with digests files for |
| test documents; if None is set then content of printed |
| documents is not verified |
| @param threads_count: number of threads to use |
| |
| """ |
| # This object is used for running tasks in many threads simultaneously |
| self._processor = multithreaded_processor.MultithreadedProcessor( |
| threads_count) |
| |
| # Unpack archives with all PPD files |
| path_archive = self._calculate_full_path('ppds_all.tar.xz') |
| path_target_dir = self._calculate_full_path('.') |
| file_utils.rm_dir_if_exists(os.path.join(path_target_dir,'ppds_all')) |
| subprocess.call(['tar', 'xJf', path_archive, '-C', path_target_dir]) |
| path_archive = self._calculate_full_path('ppds_100.tar.xz') |
| file_utils.rm_dir_if_exists(os.path.join(path_target_dir,'ppds_100')) |
| subprocess.call(['tar', 'xJf', path_archive, '-C', path_target_dir]) |
| |
| # Calculates locations of test documents, PPD files and digests files |
| self._location_of_test_docs = self._calculate_full_path(path_docs) |
| self._location_of_PPD_files = self._calculate_full_path(path_ppds) |
| location_of_digests_files = self._calculate_full_path(path_digests) |
| |
| # Reads list of test documents |
| self._docs = helpers.list_entries_from_directory( |
| path=self._location_of_test_docs, |
| with_suffixes=('.pdf'), |
| nonempty_results=True, |
| include_directories=False) |
| |
| # Get list of PPD files ... |
| if self._location_of_PPD_files is None: |
| # ... from the SCS server |
| self._ppds = self._get_filenames_from_PPD_indexes() |
| else: |
| # ... from the given local directory |
| self._ppds = helpers.list_entries_from_directory( |
| path=self._location_of_PPD_files, |
| with_suffixes=('.ppd','.ppd.gz'), |
| nonempty_results=True, |
| include_directories=False) |
| self._ppds.sort() |
| |
| # Load digests files |
| self._digests = dict() |
| for doc_name in self._docs: |
| if location_of_digests_files is None: |
| self._digests[doc_name] = dict() |
| else: |
| digests_name = doc_name + '.digests' |
| path = os.path.join(location_of_digests_files, digests_name) |
| self._digests[doc_name] = helpers.parse_digests_file(path) |
| |
| # Load components required by some printers (Epson and Star) |
| self._loaded_components = [] |
| for component in ('epson-inkjet-printer-escpr', 'star-cups-driver'): |
| self._load_component(component) |
| self._loaded_components.append(component) |
| |
| |
| def cleanup(self): |
| """ |
| Cleanup - unloading all loaded components. |
| |
| """ |
| # Deleted components loaded during initialization |
| if hasattr(self, '_loaded_components'): |
| for component in self._loaded_components: |
| self._delete_component(component) |
| |
| # Delete directories with PPD files |
| path_ppds = self._calculate_full_path('ppds_100') |
| file_utils.rm_dir_if_exists(path_ppds) |
| path_ppds = self._calculate_full_path('ppds_all') |
| file_utils.rm_dir_if_exists(path_ppds) |
| |
| |
| def run_once(self, path_outputs=None): |
| """ |
| This is the main test function. It runs the testing procedure for |
| every PPD file. Tests are run simultaneously in many threads. |
| |
| @param path_outputs: if it is not None, raw outputs sent |
| to printers are dumped here; the directory is overwritten if |
| already exists (is deleted and recreated) |
| |
| @raises error.TestFail if at least one of the tests failed |
| |
| """ |
| # Set directory for output documents |
| self._path_output_directory = self._calculate_full_path(path_outputs) |
| if self._path_output_directory is not None: |
| # Delete whole directory if already exists |
| file_utils.rm_dir_if_exists(self._path_output_directory) |
| # Create archivers |
| self._archivers = dict() |
| for doc_name in self._docs: |
| path_for_archiver = os.path.join(self._path_output_directory, |
| doc_name) |
| self._archivers[doc_name] = archiver.Archiver(path_for_archiver, |
| self._ppds, 50) |
| # A place for new digests |
| self._new_digests = dict() |
| for doc_name in self._docs: |
| self._new_digests[doc_name] = dict() |
| |
| # Runs tests for all PPD files (in parallel) |
| outputs = self._processor.run(self._thread_test_PPD, len(self._ppds)) |
| |
| # Analyses tests' outputs, prints a summary report and builds a list |
| # of PPD filenames that failed |
| failures = [] |
| for i, output in enumerate(outputs): |
| ppd_file = self._ppds[i] |
| if output != True: |
| failures.append(ppd_file) |
| else: |
| output = 'OK' |
| line = "%s: %s" % (ppd_file, output) |
| logging.info(line) |
| |
| # Calculate digests files for output documents (if dumped) |
| if self._path_output_directory is not None: |
| # loads list of ppds to omit |
| blacklist = helpers.load_blacklist() |
| blacklist += failures |
| # generates digest file fo each output directory |
| for doc_name in self._docs: |
| path = os.path.join(self._path_output_directory, |
| doc_name + '.digests') |
| helpers.save_digests_file(path, self._new_digests[doc_name], |
| blacklist) |
| |
| # Raises an exception if at least one test failed |
| if len(failures) > 0: |
| failures.sort() |
| raise error.TestFail( |
| 'Test failed for %d PPD files: %s' |
| % (len(failures), ', '.join(failures)) ) |
| |
| |
| def _thread_test_PPD(self, task_id): |
| """ |
| Runs a test procedure for single PPD file. |
| |
| It retrieves assigned PPD file and run for it a test procedure. |
| |
| @param task_id: an index of the PPD file in self._ppds |
| |
| @returns True when the test was passed or description of the error |
| (string) if the test failed |
| |
| """ |
| # Gets content of the PPD file |
| try: |
| ppd_file = self._ppds[task_id] |
| if self._location_of_PPD_files is None: |
| # Downloads PPD file from the SCS server |
| ppd_content = helpers.download_PPD_file(ppd_file) |
| else: |
| # Reads PPD file from local filesystem |
| path_ppd = os.path.join(self._location_of_PPD_files, ppd_file) |
| with open(path_ppd, 'rb') as ppd_file_descriptor: |
| ppd_content = ppd_file_descriptor.read() |
| except BaseException as e: |
| return 'MISSING PPD: ' + str(e) |
| |
| # Runs the test procedure |
| try: |
| port = _FIRST_PORT_NUMBER + task_id |
| self._PPD_test_procedure(ppd_file, ppd_content, port) |
| except BaseException as e: |
| return 'FAIL: ' + str(e) |
| |
| return True |
| |
| |
| def _PPD_test_procedure(self, ppd_name, ppd_content, port): |
| """ |
| Test procedure for single PPD file. |
| |
| It tries to run the following steps: |
| 1. Starts an instance of FakePrinter |
| 2. Configures CUPS printer |
| 3. Sends tests documents to the CUPS printer |
| 4. Fetches the raw document from the FakePrinter |
| 5. Dumps the raw document if self._path_output_directory is set |
| 6. Verifies raw document's digest if the digest is available |
| 7. Removes CUPS printer and stops FakePrinter |
| If the test fails this method throws an exception. |
| |
| @param ppd_content: a content of the PPD file |
| @param port: a port for the printer |
| |
| @throws Exception when the test fails |
| |
| """ |
| # Save the PPD file for an external pipeline |
| if self._path_output_directory is not None: |
| path_ppd = '/tmp/' + ppd_name |
| with open(path_ppd, 'wb') as file_ppd: |
| file_ppd.write(ppd_content) |
| if path_ppd.endswith('.gz'): |
| subprocess.call(['gzip', '-d', path_ppd]) |
| path_ppd = path_ppd[0:-3] |
| |
| try: |
| # Starts the fake printer |
| with fake_printer.FakePrinter(port) as printer: |
| |
| # Add a CUPS printer manually with given ppd file |
| cups_printer_id = '%s_at_%05d' % (_FAKE_PRINTER_ID,port) |
| result = debugd_util.iface().CupsAddManuallyConfiguredPrinter( |
| cups_printer_id, |
| 'socket://127.0.0.1:%d' % port, |
| dbus.ByteArray(ppd_content)) |
| if result != _CUPS_SUCCESS: |
| raise Exception('valid_config - Could not setup valid ' |
| 'printer %d' % result) |
| |
| # Prints all test documents |
| try: |
| for doc_name in self._docs: |
| # Full path to the test document |
| path_doc = os.path.join( |
| self._location_of_test_docs, doc_name) |
| # Sends test document to printer |
| job_name = 'job_%05d' % port |
| argv = ['lp', '-d', cups_printer_id, '-t', job_name] |
| argv += [path_doc] |
| subprocess.call(argv) |
| # Gets the output document from the fake printer |
| doc = printer.fetch_document(_FAKE_PRINTER_TIMEOUT) |
| digest = helpers.calculate_digest(doc) |
| # Dumps output document to the output directory (if set) |
| if self._path_output_directory is not None: |
| self._archivers[doc_name].save_file( |
| ppd_name, '.out', doc, apply_gzip=True) |
| # Set new digest |
| self._new_digests[doc_name][ppd_name] = digest |
| # Reruns the pipeline and dump intermediate outputs |
| path_pipeline = '/tmp/cups_%s' % job_name |
| path_temp_workdir = '/tmp/cups_dir_%s' % job_name |
| if os.path.isfile(path_pipeline): |
| self._rerun_whole_pipeline(path_pipeline, |
| path_temp_workdir, path_ppd, ppd_name, |
| path_doc, doc_name, doc) |
| # Check document's digest (if known) |
| if ( ppd_name in self._digests[doc_name] ): |
| digest_expected = self._digests[doc_name][ppd_name] |
| if digest_expected != digest: |
| message = 'Document\'s digest does not match' |
| raise Exception(message) |
| else: |
| # Simple validation |
| if len(doc) < 16: |
| raise Exception('Empty output') |
| |
| finally: |
| # remove CUPS printer |
| debugd_util.iface().CupsRemovePrinter(cups_printer_id) |
| |
| # The fake printer is stopped at the end of "with" statement |
| finally: |
| # finalize archivers and cleaning |
| if self._path_output_directory is not None: |
| for doc_name in self._docs: |
| self._archivers[doc_name].finalize_prefix(ppd_name) |
| os.remove(path_ppd) |
| |
| |
| def _rerun_whole_pipeline( |
| self, path_pipeline, path_temp_workdir, path_ppd, ppd_name, |
| path_doc, doc_name, doc): |
| """ |
| Reruns the whole pipeline outside CUPS server. |
| |
| Reruns a printing pipeline dumped from CUPS. All intermediate outputs |
| are dumped and archived for future analysis. |
| |
| @param path_pipeline: a path to file with pipeline dumped from CUPS |
| @param path_temp_workdir: a temporary directory to use as working |
| directory, it is deleted at the beginning if exists |
| @param path_ppd: a path to PPD file |
| @param ppd_name: a filenames prefix used for archivers |
| @param path_doc: a path to an input (printed) document |
| @param doc_name: a document name, used to select a proper archiver |
| @param doc: an output produced by CUPS (for comparison) |
| |
| """ |
| shutil.rmtree(path_temp_workdir, ignore_errors=True) |
| # edit pipeline script, remove TMPDIR and HOME variables |
| with open(path_pipeline, 'rb') as file_pipeline: |
| pipeline = file_pipeline.readlines() |
| with open(path_pipeline, 'wb') as file_pipeline: |
| for line in pipeline: |
| if line.startswith('export TMPDIR='): |
| continue |
| if line.startswith('export HOME='): |
| continue |
| file_pipeline.write(line) |
| # create work directory and run pipeline |
| os.mkdir(path_temp_workdir) |
| path_home_and_tmp_dir = os.path.join(path_temp_workdir,'tmp') |
| os.mkdir(path_home_and_tmp_dir) |
| my_env = os.environ.copy() |
| my_env["TEST_PPD"] = path_ppd |
| my_env["TEST_DOCUMENT"] = path_doc |
| my_env["HOME"] = my_env["TMPDIR"] = path_home_and_tmp_dir |
| argv = ['/bin/bash', '-e', path_pipeline] |
| ret = subprocess.Popen(argv, cwd=path_temp_workdir, env=my_env).wait() |
| # Archives the script and all intermediate files |
| self._archivers[doc_name].move_file(ppd_name, '.sh', path_pipeline) |
| i = 0 |
| while os.path.isfile(os.path.join(path_temp_workdir, "%d.doc" % (i+1))): |
| i += 1 |
| self._archivers[doc_name].copy_file(ppd_name, ".err%d" % i, |
| os.path.join(path_temp_workdir, "%d.err" % i)) |
| self._archivers[doc_name].copy_file(ppd_name, ".out%d" % i, |
| os.path.join(path_temp_workdir, "%d.doc" % i), True) |
| # Reads last output (to compare it with the output produced by CUPS) |
| filename_doc = os.path.join(path_temp_workdir, "%d.doc" % i) |
| with open(filename_doc, 'rb') as last_file: |
| content_digest = helpers.calculate_digest(last_file.read()) |
| shutil.rmtree(path_temp_workdir, ignore_errors=True) |
| # Validation |
| if content_digest != helpers.calculate_digest(doc): |
| raise Exception("The output returned by the pipeline is different" |
| " than the output produced by CUPS") |
| if ret != 0: |
| raise Exception("A pipeline script returned %d" % ret) |