blob: db63111414f15eeb2f68f606bffade14b6ff9bca [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import gzip
import json
import md5
import os
import requests
# ==================== Documents digests
def calculate_digest(doc):
"""
Calculates digests for given document.
@param doc: document's content
@returns calculated digests as a string of hexadecimals
"""
if ( doc[0:64].find(b'\x1B%-12345X@PJL') >= 0
or doc[0:64].find('%!PS-Adobe') >= 0 ):
# PJL or Postscript or PJL with encapsulated Postscript
# Split by newline character and filter out problematic lines
lines = doc.split('\n')
for i, line in enumerate(lines):
if ( line.startswith('@PJL SET ')
or line.startswith('@PJL COMMENT')
or line.startswith('@PJL JOB NAME')
or line.startswith('trailer << ')
or line.startswith('%%Title:')
or line.startswith('%%For:') ):
lines[i] = ''
doc = '\n'.join(lines)
elif doc[0:8] == b'\x24\x01\x00\x00\x07\x00\x00\x00':
# LIDIL
LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning
LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end
nd = len(doc)
if nd > LIDIL_JOBID_1_OFF + LIDIL_JOBID_2_OFF + 2:
# remove the second JOB ID (at the end)
doc = doc[:(nd-LIDIL_JOBID_2_OFF)] + doc[(nd-LIDIL_JOBID_2_OFF+2):]
# remove the first JOB ID (at the beginning)
doc = doc[:LIDIL_JOBID_1_OFF+1] + doc[LIDIL_JOBID_1_OFF+2:]
# Calculates hash
return md5.new(doc).hexdigest()
def parse_digests_file(path_digests):
"""
Parses digests from file.
@param path_digests: a path to a file with digests
@returns a dictionary with digests indexed by ppd filenames or an empty
dictionary if the given file does not exist
"""
digests = dict()
if os.path.isfile(path_digests):
with open(path_digests, 'rb') as file_digests:
lines = file_digests.read().splitlines()
for line in lines:
cols = line.split()
if len(cols) >= 2:
digests[cols[0]] = cols[1]
return digests
def save_digests_file(path_digests, digests, blacklist):
"""
Saves list of digests to file.
@param digests: dictionary with digests (keys are names)
@param blacklist: list of keys to ignore
@return a content of digests file
"""
digests_content = ''
names = sorted(set(digests.keys()).difference(blacklist))
for name in names:
digest = digests[name]
assert name.find('\t') < 0 and name.find('\n') < 0
assert digest.find('\t') < 0 and digest.find('\n') < 0
digests_content += name + '\t' + digest + '\n'
with open(path_digests, 'wb') as file_digests:
file_digests.write(digests_content)
def load_blacklist():
"""
Loads blacklist of outputs to omit.
Raw outputs generated by some PPD files cannot be verified by digests,
because they contain variables like date/time, job id or other non-static
parameters. This routine returns list of blacklisted ppds.
@returns a list of ppds to ignore during calculation of digests
"""
path_current = os.path.dirname(os.path.realpath(__file__))
path_blacklist = os.path.join(path_current, 'digests_blacklist.txt')
with open(path_blacklist) as file_blacklist:
lines = file_blacklist.readlines()
blacklist = []
for entry in lines:
entry = entry.strip()
if entry != '':
blacklist.append(entry)
return blacklist
# ===================== PPD files on the SCS server
def get_filenames_from_PPD_index(task_id):
"""
It downloads an index file from the SCS server and extracts names
of PPD files from it.
@param task_id: an order number of an index file to process; this is
an integer from the interval [0..20)
@returns a list of PPD filenames (may contain duplicates)
"""
# calculates a URL of the index file
url_metadata = 'https://www.gstatic.com/chromeos_printing/metadata_v2/'
url_ppd_index = url_metadata + ('index-%02d.json' % task_id)
# donwloads and parses the index file
request = requests.get(url_ppd_index)
entries = json.loads(request.content)
# extracts PPD filenames (the second element in each index entry)
output = []
for entry in entries:
output.append(entry[1])
# returns a list of extracted filenames
return output
def download_PPD_file(ppd_file):
"""
It downloads a PPD file from the SCS server.
@param ppd_file: a filename of PPD file (neither path nor URL)
@returns content of the PPD file
"""
url_ppds = 'https://www.gstatic.com/chromeos_printing/ppds/'
request = requests.get(url_ppds + ppd_file)
return request.content
# ==================== Local filesystem
def list_entries_from_directory(
path,
with_suffixes=None, nonempty_results=False,
include_files=True, include_directories=True ):
"""
It returns all filenames from given directory. Results may be filtered
by filenames suffixes or entries types.
@param path: a path to directory to list files from
@param with_suffixes: if set, only entries with given suffixes are
returned; it must be a tuple
@param nonempty_results: if True then Exception is raised if there is no
results
@param include_files: if False, then regular files and links are omitted
@param include_directories: if False, directories are omitted
@returns a nonempty list of entries meeting given criteria
@raises Exception if no matching filenames were found and
nonempty_results is set to True
"""
# lists all files from the directory and filter them by given criteria
list_of_files = []
for filename in os.listdir(path):
path_entry = os.path.join(path, filename)
# check type
if os.path.isfile(path_entry):
if not include_files:
continue
elif os.path.isdir(path_entry):
if not include_directories:
continue
else:
continue
# check suffix
if with_suffixes is not None:
if not filename.endswith(with_suffixes):
continue
list_of_files.append(filename)
# throws exception if no files were found
if nonempty_results and len(list_of_files) == 0:
message = 'Directory %s does not contain any ' % path
message += 'entries meeting the criteria'
raise Exception(message)
# returns a non-empty list
return list_of_files