| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import json |
| import md5 |
| import os |
| import requests |
| |
| # ==================== Documents digests |
| |
| def calculate_digest(doc): |
| """ |
| Calculates digests for given document. |
| |
| @param doc: document's content |
| |
| @returns calculated digests as a string of hexadecimals |
| |
| """ |
| |
| if ( doc[0:64].find(b'\x1B%-12345X@PJL') >= 0 |
| or doc[0:64].find('%!PS-Adobe') >= 0 ): |
| # PJL or Postscript or PJL with encapsulated Postscript |
| # Split by newline character and filter out problematic lines |
| lines = doc.split('\n') |
| for i, line in enumerate(lines): |
| if ( line.startswith('@PJL SET ') |
| or line.startswith('@PJL COMMENT') |
| or line.startswith('@PJL JOB NAME') |
| or line.startswith('trailer << ') |
| or line.startswith('%%Title:') |
| or line.startswith('%%For:') ): |
| lines[i] = '' |
| doc = '\n'.join(lines) |
| elif doc[0:8] == b'\x24\x01\x00\x00\x07\x00\x00\x00': |
| # LIDIL |
| LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning |
| LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end |
| nd = len(doc) |
| if nd > LIDIL_JOBID_1_OFF + LIDIL_JOBID_2_OFF + 2: |
| # remove the second JOB ID (at the end) |
| doc = doc[:(nd-LIDIL_JOBID_2_OFF)] + doc[(nd-LIDIL_JOBID_2_OFF+2):] |
| # remove the first JOB ID (at the beginning) |
| doc = doc[:LIDIL_JOBID_1_OFF+1] + doc[LIDIL_JOBID_1_OFF+2:] |
| # Calculates hash |
| return md5.new(doc).hexdigest() |
| |
| |
| def parse_digests_file(path_digests, blacklist): |
| """ |
| Parses digests from file. |
| |
| @param path_digests: a path to a file with digests |
| @param blacklist: list of keys to omit |
| |
| @returns a dictionary with digests indexed by ppd filenames or an empty |
| dictionary if the given file does not exist |
| |
| """ |
| digests = dict() |
| blacklist = set(blacklist) |
| if os.path.isfile(path_digests): |
| with open(path_digests, 'rb') as file_digests: |
| lines = file_digests.read().splitlines() |
| for line in lines: |
| cols = line.split() |
| if len(cols) >= 2 and cols[0] not in blacklist: |
| digests[cols[0]] = cols[1] |
| return digests |
| |
| |
| def save_digests_file(path_digests, digests, blacklist): |
| """ |
| Saves list of digests to file. |
| |
| @param digests: dictionary with digests (keys are names) |
| @param blacklist: list of keys to ignore |
| |
| @return a content of digests file |
| |
| """ |
| digests_content = '' |
| names = sorted(set(digests.keys()).difference(blacklist)) |
| for name in names: |
| digest = digests[name] |
| assert name.find('\t') < 0 and name.find('\n') < 0 |
| assert digest.find('\t') < 0 and digest.find('\n') < 0 |
| digests_content += name + '\t' + digest + '\n' |
| |
| with open(path_digests, 'wb') as file_digests: |
| file_digests.write(digests_content) |
| |
| |
| def load_blacklist(path_blacklist): |
| """ |
| Loads blacklist of outputs to omit. |
| |
| Raw outputs generated by some PPD files cannot be verified by digests, |
| because they contain variables like date/time, job id or other non-static |
| parameters. This routine returns list of blacklisted ppds. |
| |
| @param path_blacklist: a path to the file with the list of blacklisted |
| PPD files |
| |
| @returns a list of ppds to ignore during verification of digests |
| |
| """ |
| with open(path_blacklist) as file_blacklist: |
| lines = file_blacklist.readlines() |
| |
| blacklist = [] |
| for entry in lines: |
| entry = entry.strip() |
| if entry != '': |
| blacklist.append(entry) |
| |
| return blacklist |
| |
| |
| # ===================== PPD files on the SCS server |
| |
| def get_filenames_from_PPD_index(task_id): |
| """ |
| It downloads an index file from the SCS server and extracts names |
| of PPD files from it. |
| |
| @param task_id: an order number of an index file to process; this is |
| an integer from the interval [0..20) |
| |
| @returns a list of PPD filenames (may contain duplicates) |
| |
| """ |
| # calculates a URL of the index file |
| url_metadata = 'https://www.gstatic.com/chromeos_printing/metadata_v2/' |
| url_ppd_index = url_metadata + ('index-%02d.json' % task_id) |
| # donwloads and parses the index file |
| request = requests.get(url_ppd_index) |
| entries = json.loads(request.content) |
| # extracts PPD filenames (the second element in each index entry) |
| output = [] |
| for entry in entries: |
| output.append(entry[1]) |
| # returns a list of extracted filenames |
| return output |
| |
| |
| def download_PPD_file(ppd_file): |
| """ |
| It downloads a PPD file from the SCS server. |
| |
| @param ppd_file: a filename of PPD file (neither path nor URL) |
| |
| @returns content of the PPD file |
| """ |
| url_ppds = 'https://www.gstatic.com/chromeos_printing/ppds/' |
| request = requests.get(url_ppds + ppd_file) |
| return request.content |
| |
| |
| # ==================== Local filesystem |
| |
| def list_entries_from_directory( |
| path, |
| with_suffixes=None, nonempty_results=False, |
| include_files=True, include_directories=True ): |
| """ |
| It returns all filenames from given directory. Results may be filtered |
| by filenames suffixes or entries types. |
| |
| @param path: a path to directory to list files from |
| @param with_suffixes: if set, only entries with given suffixes are |
| returned; it must be a tuple |
| @param nonempty_results: if True then Exception is raised if there is no |
| results |
| @param include_files: if False, then regular files and links are omitted |
| @param include_directories: if False, directories are omitted |
| |
| @returns a nonempty list of entries meeting given criteria |
| |
| @raises Exception if no matching filenames were found and |
| nonempty_results is set to True |
| |
| """ |
| # lists all files from the directory and filter them by given criteria |
| list_of_files = [] |
| for filename in os.listdir(path): |
| path_entry = os.path.join(path, filename) |
| # check type |
| if os.path.isfile(path_entry): |
| if not include_files: |
| continue |
| elif os.path.isdir(path_entry): |
| if not include_directories: |
| continue |
| else: |
| continue |
| # check suffix |
| if with_suffixes is not None: |
| if not filename.endswith(with_suffixes): |
| continue |
| list_of_files.append(filename) |
| # throws exception if no files were found |
| if nonempty_results and len(list_of_files) == 0: |
| message = 'Directory %s does not contain any ' % path |
| message += 'entries meeting the criteria' |
| raise Exception(message) |
| # returns a non-empty list |
| return list_of_files |