| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import gzip |
| import os |
| import shutil |
| import subprocess |
| import threading |
| |
| class Archiver(): |
| """ |
| An instance of this class stores set of files in given directory on local |
| filesystem. Stored files are automatically compressed and organized into |
| tar.xz archives based on their filenames prefixes. It is a very useful tool |
| when one has to deal with many files with similar content that are generated |
| continuously. Packing similar files together into tar.xz archive can |
| singificantly reduce amount of required disk space (even for gzipped files). |
| As a parameter, the constructor takes set of filenames prefixes. These |
| prefixes are automatically clustered into archives by their common prefixes |
| (yes, prefixes of prefixes). These archives are automatically created, when |
| all files assigned to the given set of prefixes is added to Archiver object. |
| Methods provided by this class are synchronized and can be called from |
| different Python threads. |
| |
| """ |
| |
| def _split_names_by_prefixes( |
| self, names, max_names_per_prefix, prefix_length=0): |
| """ |
| Recursive function used to split given set of names into groups by |
| common prefixes. It tries to find configuration with minimum number of |
| groups (prefixes) where the number of elements (names) in each group is |
| not larger than given parameter. |
| |
| @param names: list of names to split into groups (names MUST BE sorted |
| and unique). |
| @param max_names_per_prefix: maximum number of names assigned to |
| group (single prefix). |
| @param prefix_length: current length of the prefix (for recursive |
| calls); all elements in the list given as the parameter 'names' |
| MUST HAVE the same prefix with this length. |
| @returns dictionary with prefixes (each one represents single group) and |
| size (a number of names in the group). |
| |
| """ |
| assert max_names_per_prefix > 1 |
| # Returns the current prefix if the group is small enough |
| if len(names) <= max_names_per_prefix: |
| return { names[0][0:prefix_length] : len(names) } |
| # Increases prefix_length until a difference is found: |
| # - elements in 'names' are sorted and unique |
| # - elements in 'names' have a common prefix with a length of |
| # 'prefix_length' characters |
| while ( len(names[0]) > prefix_length and |
| names[0][prefix_length] == names[-1][prefix_length] ): |
| prefix_length += 1 |
| # Checks for special case, when the first name == prefix |
| if len(names[0]) == prefix_length: |
| return { names[0][0:prefix_length] : len(names) } |
| # Calculates resultant list of prefixes |
| results = dict() |
| i_begin = 0 |
| # Calculates all prefixes (groups) using recursion: |
| # - 'prefix_length' points to the first character that differentiates |
| # elements from the 'names' list |
| while i_begin < len(names): |
| char = names[i_begin][prefix_length] |
| i_end = i_begin + 1 |
| while i_end < len(names) and char == names[i_end][prefix_length]: |
| i_end += 1 |
| results.update(self._split_names_by_prefixes(names[i_begin:i_end], |
| max_names_per_prefix, prefix_length+1)) |
| i_begin = i_end |
| return results |
| |
| |
| def __init__(self, path_directory, prefixes, max_prefixes_per_archive): |
| """ |
| Constructor. |
| |
| @param path_directory: directory where files and archives are stored. |
| It is created if not exists. |
| @param prefixes: a set of allowed filenames prefixes. |
| @param max_prefixes_per_archive: maximum number of filenames prefixes |
| assigned to single group (archive). |
| |
| """ |
| self._lock = threading.Lock() |
| self._path_directory = path_directory |
| if not os.path.exists(self._path_directory): |
| os.makedirs(self._path_directory) |
| |
| prefixes = sorted(set(prefixes)) |
| self._archives_names = self._split_names_by_prefixes(prefixes, |
| max_prefixes_per_archive) |
| self._filenames_prefixes = dict() |
| prefixes.reverse() |
| for ap, fc in sorted(self._archives_names.iteritems()): |
| self._archives_names[ap] = [fc, []] |
| while fc > 0: |
| self._filenames_prefixes[prefixes.pop()] = [ap, set()] |
| fc -= 1 |
| |
| |
| def save_file(self, prefix, name, content, apply_gzip=False): |
| """ |
| Add a new file with given content to the archive. |
| |
| @param prefix: prefix of filename that the new file will be saved with |
| @param name: the rest of the filename of the new file; in summary, the |
| resultant filename of the new file will be prefix+name |
| @param content: a content of the file |
| @param apply_gzip: if true, the added file will be gzipped, the suffix |
| .gz will be added to its resultant filename |
| |
| """ |
| if apply_gzip: |
| name += ".gz" |
| path_target = os.path.join(self._path_directory, prefix + name) |
| |
| with self._lock: |
| assert prefix in self._filenames_prefixes |
| assert self._filenames_prefixes[prefix][1] is not None |
| assert name not in self._filenames_prefixes[prefix][1] |
| self._filenames_prefixes[prefix][1].add(name) |
| |
| if apply_gzip: |
| file_target = gzip.GzipFile(path_target, 'wb', 9, None, 0) |
| else: |
| file_target = open(path_target, 'wb') |
| with file_target: |
| file_target.write(content) |
| |
| |
| def copy_file(self, prefix, name, path_file, apply_gzip=False): |
| """ |
| Add a new file to the archive. The file is copied from given location. |
| |
| @param prefix: prefix of filename that the new file will be saved with |
| @param name: the rest of the filename of the new file; in summary, the |
| resultant filename of the new file will be prefix+name |
| @param path_file: path to the source file |
| @param apply_gzip: if true, the added file will be gzipped, the suffix |
| .gz will be added to its resultant filename |
| |
| """ |
| with open(path_file, 'rb') as file_source: |
| content = file_source.read() |
| self.save_file(prefix, name, content, apply_gzip) |
| |
| |
| def move_file(self, prefix, name, path_file, apply_gzip=False): |
| """ |
| Add a new file to the archive. The file is moved, i.e. an original |
| file is deleted. |
| |
| @param prefix: prefix of filename that the new file will be saved with |
| @param name: the rest of the filename of the new file; in summary, the |
| resultant filename of the new file will be prefix+name |
| @param path_file: path to the source file, it will be deleted |
| @param apply_gzip: if true, the added file will be gzipped, the suffix |
| .gz will be added to its resultant filename |
| |
| """ |
| if apply_gzip: |
| self.copy_file(prefix, name, path_file, apply_gzip) |
| os.remove(path_file) |
| else: |
| path_target = os.path.join(self._path_directory, prefix + name) |
| with self._lock: |
| assert prefix in self._filenames_prefixes |
| assert self._filenames_prefixes[prefix][1] is not None |
| assert name not in self._filenames_prefixes[prefix][1] |
| self._filenames_prefixes[prefix][1].add(name) |
| shutil.move(path_file, path_target) |
| |
| |
| def finalize_prefix(self, prefix): |
| """ |
| This method is called to mark that there is no more files to add with |
| given prefix. This method creates a tar archive when the last prefix |
| assigned to the corresponding group is finalized. This method must be |
| called for all prefixes given to the constructor. |
| |
| @param prefix: prefix to finalize, no more files with this prefix can |
| be added to the archive |
| |
| """ |
| with self._lock: |
| assert prefix in self._filenames_prefixes |
| assert self._filenames_prefixes[prefix][1] is not None |
| |
| filenames = [] |
| for name in sorted(self._filenames_prefixes[prefix][1]): |
| filenames.append(prefix + name) |
| self._filenames_prefixes[prefix][1] = None |
| archive_name = self._filenames_prefixes[prefix][0] |
| |
| self._archives_names[archive_name][0] -= 1 |
| self._archives_names[archive_name][1] += filenames |
| if self._archives_names[archive_name][0] == 0: |
| archive_is_complete = True |
| filenames = self._archives_names[archive_name][1] |
| else: |
| archive_is_complete = False |
| |
| if archive_is_complete and len(filenames) > 0: |
| argv = ['tar', 'cJf', 'archive_' + archive_name + '.tar.xz'] |
| argv += filenames |
| process_tar = subprocess.Popen(argv, cwd=self._path_directory) |
| if process_tar.wait() != 0: |
| raise Exception("Process 'tar cJf' failed!") |
| for filename in filenames: |
| os.remove(os.path.join(self._path_directory, filename)) |