blob: 1e62d804a3703fcdd0896bca5d3275f9da2f7f3d [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import md5
import os
import requests
# ==================== Documents digests
def calculate_digest(doc):
Calculates digests for given document.
@param doc: document's content
@returns calculated digests as a string of hexadecimals
if ( doc[0:64].find(b'\x1B%-12345X@PJL') >= 0
or doc[0:64].find('%!PS-Adobe') >= 0 ):
# PJL or Postscript or PJL with encapsulated Postscript
# Split by newline character and filter out problematic lines
lines = doc.split('\n')
for i, line in enumerate(lines):
if ( line.startswith('@PJL SET ')
or line.startswith('@PJL COMMENT')
or line.startswith('@PJL JOB NAME')
or line.startswith('trailer << ')
or line.startswith('%%Title:')
or line.startswith('%%For:') ):
lines[i] = ''
doc = '\n'.join(lines)
elif doc[0:8] == b'\x24\x01\x00\x00\x07\x00\x00\x00':
LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning
LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end
nd = len(doc)
# remove the second JOB ID (at the end)
doc = doc[:(nd-LIDIL_JOBID_2_OFF)] + doc[(nd-LIDIL_JOBID_2_OFF+2):]
# remove the first JOB ID (at the beginning)
doc = doc[:LIDIL_JOBID_1_OFF+1] + doc[LIDIL_JOBID_1_OFF+2:]
# Calculates hash
def parse_digests_file(path_digests, blacklist):
Parses digests from file.
@param path_digests: a path to a file with digests
@param blacklist: list of keys to omit
@returns a dictionary with digests indexed by ppd filenames or an empty
dictionary if the given file does not exist
digests = dict()
blacklist = set(blacklist)
if os.path.isfile(path_digests):
with open(path_digests, 'rb') as file_digests:
lines =
for line in lines:
cols = line.split()
if len(cols) >= 2 and cols[0] not in blacklist:
digests[cols[0]] = cols[1]
return digests
def save_digests_file(path_digests, digests, blacklist):
Saves list of digests to file.
@param digests: dictionary with digests (keys are names)
@param blacklist: list of keys to ignore
@return a content of digests file
digests_content = ''
names = sorted(set(digests.keys()).difference(blacklist))
for name in names:
digest = digests[name]
assert name.find('\t') < 0 and name.find('\n') < 0
assert digest.find('\t') < 0 and digest.find('\n') < 0
digests_content += name + '\t' + digest + '\n'
with open(path_digests, 'wb') as file_digests:
def load_blacklist(path_blacklist):
Loads blacklist of outputs to omit.
Raw outputs generated by some PPD files cannot be verified by digests,
because they contain variables like date/time, job id or other non-static
parameters. This routine returns list of blacklisted ppds.
@param path_blacklist: a path to the file with the list of blacklisted
PPD files
@returns a list of ppds to ignore during verification of digests
with open(path_blacklist) as file_blacklist:
lines = file_blacklist.readlines()
blacklist = []
for entry in lines:
entry = entry.strip()
if entry != '':
return blacklist
# ===================== PPD files on the SCS server
def get_filenames_from_PPD_index(task_id):
It downloads an index file from the SCS server and extracts names
of PPD files from it.
@param task_id: an order number of an index file to process; this is
an integer from the interval [0..20)
@returns a list of PPD filenames (may contain duplicates)
# calculates a URL of the index file
url_metadata = ''
url_ppd_index = url_metadata + ('index-%02d.json' % task_id)
# donwloads and parses the index file
request = requests.get(url_ppd_index)
entries = json.loads(request.content)
# extracts PPD filenames (the second element in each index entry)
output = []
for entry in entries:
# returns a list of extracted filenames
return output
def download_PPD_file(ppd_file):
It downloads a PPD file from the SCS server.
@param ppd_file: a filename of PPD file (neither path nor URL)
@returns content of the PPD file
url_ppds = ''
request = requests.get(url_ppds + ppd_file)
return request.content
# ==================== Local filesystem
def list_entries_from_directory(
with_suffixes=None, nonempty_results=False,
include_files=True, include_directories=True ):
It returns all filenames from given directory. Results may be filtered
by filenames suffixes or entries types.
@param path: a path to directory to list files from
@param with_suffixes: if set, only entries with given suffixes are
returned; it must be a tuple
@param nonempty_results: if True then Exception is raised if there is no
@param include_files: if False, then regular files and links are omitted
@param include_directories: if False, directories are omitted
@returns a nonempty list of entries meeting given criteria
@raises Exception if no matching filenames were found and
nonempty_results is set to True
# lists all files from the directory and filter them by given criteria
list_of_files = []
for filename in os.listdir(path):
path_entry = os.path.join(path, filename)
# check type
if os.path.isfile(path_entry):
if not include_files:
elif os.path.isdir(path_entry):
if not include_directories:
# check suffix
if with_suffixes is not None:
if not filename.endswith(with_suffixes):
# throws exception if no files were found
if nonempty_results and len(list_of_files) == 0:
message = 'Directory %s does not contain any ' % path
message += 'entries meeting the criteria'
raise Exception(message)
# returns a non-empty list
return list_of_files