blob: e9abb530c5067b74b5494affe36033b5cfc173bf [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <list>
#include <map>
#include <memory>
#include <string>
#include <base/callback.h>
#include <base/containers/flat_set.h>
#include <base/files/file.h>
#include <base/files/file_enumerator.h>
#include <base/files/file_path.h>
#include <base/memory/ref_counted.h>
#include <base/memory/ref_counted_delete_on_sequence.h>
#include <base/memory/scoped_refptr.h>
#include <base/optional.h>
#include <base/sequenced_task_runner.h>
#include <base/strings/string_piece.h>
#include <base/threading/thread.h>
#include <base/threading/thread_task_runner_handle.h>
#include <base/timer/timer.h>
#include "missive/compression/compression_module.h"
#include "missive/encryption/encryption_module_interface.h"
#include "missive/proto/record.pb.h"
#include "missive/storage/storage_configuration.h"
#include "missive/storage/storage_uploader_interface.h"
#include "missive/util/status.h"
#include "missive/util/statusor.h"
namespace reporting {
// Storage queue represents single queue of data to be collected and stored
// persistently. It allows to add whole data records as necessary,
// flush previously collected records and confirm records up to certain
// sequencing id to be eliminated.
class StorageQueue : public base::RefCountedDeleteOnSequence<StorageQueue> {
// Callback type for UploadInterface provider for this queue.
using AsyncStartUploaderCb = base::RepeatingCallback<void(
// Creates StorageQueue instance with the specified options, and returns it
// with the |completion_cb| callback. |async_start_upload_cb| is a factory
// callback that instantiates UploaderInterface every time the queue starts
// uploading records - periodically or immediately after Write (and in the
// near future - upon explicit Flush request).
static void Create(
const QueueOptions& options,
AsyncStartUploaderCb async_start_upload_cb,
scoped_refptr<EncryptionModuleInterface> encryption_module,
scoped_refptr<CompressionModule> compression_module,
// Wraps and serializes Record (taking ownership of it), encrypts and writes
// the resulting blob into the StorageQueue (the last file of it) with the
// next sequencing id assigned. The write is a non-blocking operation -
// caller can "fire and forget" it (|completion_cb| allows to verify that
// record has been successfully enqueued). If file is going to become too
// large, it is closed and new file is created.
// Helper methods: AssignLastFile, WriteHeaderAndBlock, OpenNewWriteableFile,
// WriteMetadata, DeleteOutdatedMetadata.
void Write(Record record, base::OnceCallback<void(Status)> completion_cb);
// Confirms acceptance of the records up to |sequencing_id| (inclusively).
// All records with sequencing ids <= this one can be removed from
// the StorageQueue, and can no longer be uploaded.
// If |force| is false (which is used in most cases), |sequencing_id| is
// only accepted if no higher ids were confirmed before; otherwise it is
// accepted unconditionally.
// Helper methods: RemoveConfirmedData.
void Confirm(base::Optional<int64_t> sequencing_id,
bool force,
base::OnceCallback<void(Status)> completion_cb);
// Initiates upload of collected records. Called periodically by timer, based
// on upload_period of the queue, and can also be called explicitly - for
// a queue with an infinite or very large upload period. Multiple |Flush|
// calls can safely run in parallel.
// Starts by calling |async_start_upload_cb_| that instantiates
// |UploaderInterface uploader|. Then repeatedly reads EncryptedRecord(s) one
// by one from the StorageQueue starting from |first_sequencing_id_|, handing
// each one over to |uploader|->ProcessRecord (keeping ownership of the
// buffer) and resuming after result callback returns 'true'. Only files that
// have been closed are included in reading; |Upload| makes sure to close the
// last writeable file and create a new one before starting to send records to
// the |uploader|. If some records are not available or corrupt,
// |uploader|->ProcessGap is called. If the monotonic order of sequencing is
// broken, INTERNAL error Status is reported. |Upload| can be stopped after
// any record by returning 'false' to |processed_cb| callback - in that case
// |Upload| will behave as if the end of data has been reached. While one or
// more |Upload|s are active, files can be added to the StorageQueue but
// cannot be deleted. If processing of the record takes significant time,
// |uploader| implementation should be offset to another thread to avoid
// locking StorageQueue. Helper methods: SwitchLastFileIfNotEmpty,
// CollectFilesForUpload.
void Flush();
// Test only: makes specified records fail on reading.
void TestInjectBlockReadErrors(std::initializer_list<int64_t> sequencing_ids);
// Access queue options.
const QueueOptions& options() const { return options_; }
StorageQueue(const StorageQueue& other) = delete;
StorageQueue& operator=(const StorageQueue& other) = delete;
virtual ~StorageQueue();
friend class base::RefCountedDeleteOnSequence<StorageQueue>;
friend class base::DeleteHelper<StorageQueue>;
// Private data structures for Read and Write (need access to the private
// StorageQueue fields).
class WriteContext;
class ReadContext;
class ConfirmContext;
// Private envelope class for single file in a StorageQueue.
class SingleFile : public base::RefCountedThreadSafe<SingleFile> {
// Factory method creates a SingleFile object for existing
// or new file (of zero size). In case of any error (e.g. insufficient disk
// space) returns status.
static StatusOr<scoped_refptr<SingleFile>> Create(
const base::FilePath& filename, int64_t size);
Status Open(bool read_only); // No-op if already opened.
void Close(); // No-op if not opened.
Status Delete();
// Attempts to read |size| bytes from position |pos| and returns
// reference to the data that were actually read (no more than |size|).
// End of file is indicated by empty data.
// |max_buffer_size| specifies the largest allowed buffer, which
// must accommodate the largest possible data block plus header and
// overhead.
// |expect_readonly| must match to is_readonly() (when set to false,
// the file is expected to be writeable; this only happens when scanning
// files restarting the queue).
StatusOr<base::StringPiece> Read(uint32_t pos,
uint32_t size,
size_t max_buffer_size,
bool expect_readonly = true);
// Appends data to the file.
StatusOr<uint32_t> Append(base::StringPiece data);
bool is_opened() const { return handle_.get() != nullptr; }
bool is_readonly() const {
return is_readonly_.value();
uint64_t size() const { return size_; }
std::string name() const { return filename_.MaybeAsASCII(); }
virtual ~SingleFile();
friend class base::RefCountedThreadSafe<SingleFile>;
// Private constructor, called by factory method only.
SingleFile(const base::FilePath& filename, int64_t size);
// Flag (valid for opened file only): true if file was opened for reading
// only, false otherwise.
base::Optional<bool> is_readonly_;
const base::FilePath filename_; // relative to the StorageQueue directory
uint64_t size_ = 0; // tracked internally rather than by filesystem
std::unique_ptr<base::File> handle_; // Set only when opened/created.
// When reading the file, this is the buffer and data positions.
// If the data is read sequentially, buffered portions are reused
// improving performance. When the sequential order is broken (e.g.
// we start reading the same file in parallel from different position),
// the buffer is reset.
size_t data_start_ = 0;
size_t data_end_ = 0;
uint64_t file_position_ = 0;
size_t buffer_size_ = 0;
std::unique_ptr<char[]> buffer_;
// Private constructor, to be called by Create factory method only.
StorageQueue(scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner,
const QueueOptions& options,
AsyncStartUploaderCb async_start_upload_cb,
scoped_refptr<EncryptionModuleInterface> encryption_module,
scoped_refptr<CompressionModule> compression_module);
// Initializes the object by enumerating files in the assigned directory
// and determines the sequencing information of the last record.
// Must be called once and only once after construction.
// Returns OK or error status, if anything failed to initialize.
// Called once, during initialization.
// Helper methods: EnumerateDataFiles, ScanLastFile, RestoreMetadata.
Status Init();
// Retrieves last record digest (does not exist at a generation start).
base::Optional<std::string> GetLastRecordDigest() const;
// Helper method for Init(): process single data file.
// Return sequencing_id from <prefix>.<sequencing_id> file name, or Status
// in case there is any error.
StatusOr<int64_t> AddDataFile(
const base::FilePath& full_name,
const base::FileEnumerator::FileInfo& file_info);
// Helper method for Init(): enumerates all data files in the directory.
// Valid file names are <prefix>.<sequencing_id>, any other names are ignored.
// Adds used data files to the set.
Status EnumerateDataFiles(base::flat_set<base::FilePath>* used_files_set);
// Helper method for Init(): scans the last file in StorageQueue, if there are
// files at all, and learns the latest sequencing id. Otherwise (if there
// are no files) sets it to 0.
Status ScanLastFile();
// Helper method for Write(): increments sequencing id and assigns last
// file to place record in. |size| parameter indicates the size of data that
// comprise the record expected to be appended; if appending the record will
// make the file too large, the current last file will be closed, and a new
// file will be created and assigned to be the last one.
StatusOr<scoped_refptr<SingleFile>> AssignLastFile(size_t size);
// Helper method for Write() and Read(): creates and opens a new empty
// writeable file, adding it to |files_|.
StatusOr<scoped_refptr<SingleFile>> OpenNewWriteableFile();
// Helper method for Write(): stores a file with metadata to match the
// incoming new record. Synchronously composes metadata to record, then
// asynchronously writes it into a file with next sequencing id and then
// notifies the Write operation that it can now complete. After that it
// asynchronously deletes all other files with lower sequencing id
// (multiple Writes can see the same files and attempt to delete them, and
// that is not an error).
Status WriteMetadata(base::StringPiece current_record_digest);
// Helper method for RestoreMetadata(): loads and verifies metadata file
// contents. If accepted, adds the file to the set.
Status ReadMetadata(const base::FilePath& meta_file_path,
size_t size,
int64_t sequencing_id,
base::flat_set<base::FilePath>* used_files_set);
// Helper method for Init(): locates file with metadata that matches the
// last sequencing id and loads metadata from it.
// Adds used metadata file to the set.
Status RestoreMetadata(base::flat_set<base::FilePath>* used_files_set);
// Delete all files except those listed in the set.
void DeleteUnusedFiles(
const base::flat_set<base::FilePath>& used_files_setused_files_set);
// Helper method for Write(): deletes meta files up to, but not including
// |sequencing_id_to_keep|. Any errors are ignored.
void DeleteOutdatedMetadata(int64_t sequencing_id_to_keep);
// Helper method for Write(): composes record header and writes it to the
// file, followed by data. Stores record digest in the queue, increments
// next sequencing id.
Status WriteHeaderAndBlock(base::StringPiece data,
base::StringPiece current_record_digest,
scoped_refptr<SingleFile> file);
// Helper method for Upload: if the last file is not empty (has at least one
// record), close it and create the new one, so that its records are also
// included in the reading.
Status SwitchLastFileIfNotEmpty();
// Helper method for Upload: collects and sets aside |files| in the
// StorageQueue that have data for the Upload (all files that have records
// with sequencing ids equal or higher than |sequencing_id|).
std::map<int64_t, scoped_refptr<SingleFile>> CollectFilesForUpload(
int64_t sequencing_id) const;
// Helper method for Confirm: Moves |first_sequencing_id_| to
// (|sequencing_id|+1) and removes files that only have records with seq
// ids below or equal to |sequencing_id| (below |first_sequencing_id_|).
Status RemoveConfirmedData(int64_t sequencing_id);
// Helper method to release all file instances held by the queue.
// Files on the disk remain as they were.
void ReleaseAllFileInstances();
// Helper method to retry upload if prior one failed or if some events below
// |next_sequencing_id| were not uploaded.
void CheckBackUpload(Status status, int64_t next_sequencing_id);
// Helper method called by periodic time to upload data.
void PeriodicUpload();
// Sequential task runner for all activities in this StorageQueue
// (must be first member in class).
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
// Immutable options, stored at the time of creation.
const QueueOptions options_;
// Current generation id, unique per device and queue.
// Set up once during initialization by reading from the 'gen_id.NNNN' file
// matching the last sequencing id, or generated anew as a random number if no
// such file found (files do not match the id).
int64_t generation_id_ = 0;
// Digest of the last written record (loaded at queue initialization, absent
// if the new generation has just started, and no records where stored yet).
base::Optional<std::string> last_record_digest_;
// Queue of the write context instances in the order of creation, sequencing
// ids and record digests. Context is always removed from this queue before
// being destructed. We use std::list rather than std::queue, because
// if the write fails, it needs to be removed from the queue regardless of
// whether it is at the head, tail or middle.
std::list<WriteContext*> write_contexts_queue_;
// Next sequencing id to store (not assigned yet).
int64_t next_sequencing_id_ = 0;
// First sequencing id store still has (no records with lower
// sequencing id exist in store).
int64_t first_sequencing_id_ = 0;
// First unconfirmed sequencing id (no records with lower
// sequencing id will be ever uploaded). Set by the first
// Confirm call.
// If first_unconfirmed_sequencing_id_ < first_sequencing_id_,
// [first_unconfirmed_sequencing_id_, first_sequencing_id_) is a gap
// that cannot be filled in and is uploaded as such.
base::Optional<int64_t> first_unconfirmed_sequencing_id_;
// Latest metafile. May be null.
scoped_refptr<SingleFile> meta_file_;
// Ordered map of the files by ascending sequencing id.
std::map<int64_t, scoped_refptr<SingleFile>> files_;
// Counter of the Read operations. When not 0, none of the files_ can be
// deleted. Incremented by |ReadContext::OnStart|, decremented by
// |ReadContext::OnComplete|. Accessed by |RemoveConfirmedData|.
// All accesses take place on sequenced_task_runner_.
int32_t active_read_operations_ = 0;
// Upload timer (active only if options_.upload_period() is not 0).
base::RepeatingTimer upload_timer_;
// Upload provider callback.
const AsyncStartUploaderCb async_start_upload_cb_;
// Encryption module.
scoped_refptr<EncryptionModuleInterface> encryption_module_;
// Compression module.
scoped_refptr<CompressionModule> compression_module_;
// Test only: records specified to fail on reading.
base::flat_set<int64_t> test_injected_fail_sequencing_ids_;
// Weak pointer factory (must be last member in class).
base::WeakPtrFactory<StorageQueue> weakptr_factory_{this};
} // namespace reporting