blob: 8a6de071ac1437d8df6ba5fbe4d974d83b2fa287 [file] [log] [blame] [edit]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/storage/storage.h"
#include <cstdint>
#include <optional>
#include <utility>
#include <vector>
#include <base/barrier_closure.h>
#include <base/check.h>
#include <base/containers/adapters.h>
#include <base/files/file.h>
#include <base/files/file_enumerator.h>
#include <base/files/file_path.h>
#include <base/files/file_util.h>
#include <base/files/platform_file.h>
#include <base/functional/bind.h>
#include <base/functional/callback.h>
#include <base/logging.h>
#include <base/memory/scoped_refptr.h>
#include <base/sequence_checker.h>
#include <base/strings/strcat.h>
#include <base/strings/string_number_conversions.h>
#include <base/task/bind_post_task.h>
#include <base/task/sequenced_task_runner.h>
#include <base/task/task_runner.h>
#include <base/task/task_traits.h>
#include <base/task/thread_pool.h>
#include <base/threading/thread.h>
#include <base/time/time.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include "missive/analytics/metrics.h"
#include "missive/compression/compression_module.h"
#include "missive/encryption/encryption_module_interface.h"
#include "missive/encryption/primitives.h"
#include "missive/encryption/verification.h"
#include "missive/proto/record.pb.h"
#include "missive/proto/record_constants.pb.h"
#include "missive/resources/resource_manager.h"
#include "missive/storage/storage_base.h"
#include "missive/storage/storage_configuration.h"
#include "missive/storage/storage_queue.h"
#include "missive/storage/storage_uploader_interface.h"
#include "missive/util/file.h"
#include "missive/util/status.h"
#include "missive/util/status_macros.h"
#include "missive/util/statusor.h"
#include "missive/util/task_runner_context.h"
namespace reporting {
void Storage::Create(
const StorageOptions& options,
UploaderInterface::AsyncStartUploaderCb async_start_upload_cb,
scoped_refptr<QueuesContainer> queues_container,
scoped_refptr<EncryptionModuleInterface> encryption_module,
scoped_refptr<CompressionModule> compression_module,
base::OnceCallback<void(StatusOr<scoped_refptr<StorageInterface>>)>
completion_cb) {
// Initialize Storage object, populating all the queues.
class StorageInitContext
: public TaskRunnerContext<StatusOr<scoped_refptr<StorageInterface>>> {
public:
StorageInitContext(
const StorageOptions::QueuesOptionsList& queues_options,
scoped_refptr<Storage> storage,
base::OnceCallback<void(StatusOr<scoped_refptr<StorageInterface>>)>
callback)
: TaskRunnerContext<StatusOr<scoped_refptr<StorageInterface>>>(
std::move(callback),
storage->sequenced_task_runner_), // Same runner as the Storage!
queues_options_(queues_options),
storage_(std::move(storage)) {}
private:
// Context can only be deleted by calling Response method.
~StorageInitContext() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
DCHECK_EQ(count_, 0u);
}
void OnStart() override {
CheckOnValidSequence();
// If encryption is not enabled, proceed with the queues.
if (!storage_->encryption_module_->is_enabled()) {
InitAllQueues();
return;
}
// Encryption is enabled. Locate the latest signed_encryption_key file
// with matching key signature after deserialization.
const auto download_key_result =
storage_->key_in_storage_->DownloadKeyFile();
if (!download_key_result.ok()) {
// Key not found or corrupt. Proceed with encryption setup.
// Key will be downloaded during setup.
EncryptionSetUp(download_key_result.status());
return;
}
// Key found, verified and downloaded.
storage_->encryption_module_->UpdateAsymmetricKey(
download_key_result.ValueOrDie().first,
download_key_result.ValueOrDie().second,
base::BindPostTaskToCurrentDefault(base::BindOnce(
&StorageInitContext::EncryptionSetUp, base::Unretained(this))));
}
void EncryptionSetUp(Status status) {
CheckOnValidSequence();
if (status.ok()) {
// Encryption key has been found and set up. Must be available now.
DCHECK(storage_->encryption_module_->has_encryption_key());
} else {
LOG(WARNING)
<< "Encryption is enabled, but the key is not available yet, "
"status="
<< status;
// Start a task in the background which periodically requests the
// encryption key if we need it.
storage_->key_delivery_->StartPeriodicKeyUpdate(
storage_->options_.key_check_period());
}
InitAllQueues();
}
void InitAllQueues() {
CheckOnValidSequence();
// Construct all queues.
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
count_ = queues_options_.size();
if (count_ == 0) {
Response(std::move(storage_));
return;
}
// Create queues the queue directories we found in the storage directory
for (const auto& queue_options : queues_options_) {
StorageQueue::Create(
GenerationGuid(),
/*options=*/queue_options.second,
// Note: the callbacks below are attached to the Queue and do not
// outlive Storage, so they cannot refer to `storage_` itself!
base::BindRepeating(&QueueUploaderInterface::AsyncProvideUploader,
/*priority=*/queue_options.first,
storage_->async_start_upload_cb_,
storage_->encryption_module_),
// `queues_container_` refers a weak pointer only, so that its
// callback does not hold a reference to it.
base::BindPostTask(
storage_->sequenced_task_runner_,
base::BindRepeating(&QueuesContainer::GetDegradationCandidates,
storage_->queues_container_->GetWeakPtr(),
/*priority=*/queue_options.first)),
storage_->encryption_module_, storage_->compression_module_,
base::BindRepeating(&StorageQueue::MaybeBackoffAndReInit),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&StorageInitContext::AddQueue, base::Unretained(this),
/*priority=*/queue_options.first)));
}
}
void AddQueue(Priority priority,
StatusOr<scoped_refptr<StorageQueue>> storage_queue_result) {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
if (storage_queue_result.ok()) {
auto add_status = storage_->queues_container_->AddQueue(
priority, storage_queue_result.ValueOrDie());
if (final_status_.ok()) {
final_status_ = add_status;
}
} else {
LOG(ERROR) << "Could not create queue, priority=" << priority
<< ", status=" << storage_queue_result.status();
if (final_status_.ok()) {
final_status_ = storage_queue_result.status();
}
}
DCHECK_GT(count_, 0u);
if (--count_ > 0u) {
return;
}
if (!final_status_.ok()) {
Response(final_status_);
return;
}
Response(std::move(storage_));
}
const StorageOptions::QueuesOptionsList queues_options_;
const scoped_refptr<Storage> storage_;
size_t count_ GUARDED_BY_CONTEXT(storage_->sequence_checker_) = 0;
Status final_status_;
};
// Create Storage object.
// Cannot use base::MakeRefCounted<Storage>, because constructor is private.
scoped_refptr<Storage> storage = base::WrapRefCounted(
new Storage(options, queues_container, encryption_module,
compression_module, std::move(async_start_upload_cb)));
// Asynchronously run initialization.
Start<StorageInitContext>(options.ProduceQueuesOptionsList(),
std::move(storage), std::move(completion_cb));
}
Storage::Storage(const StorageOptions& options,
scoped_refptr<QueuesContainer> queues_container,
scoped_refptr<EncryptionModuleInterface> encryption_module,
scoped_refptr<CompressionModule> compression_module,
UploaderInterface::AsyncStartUploaderCb async_start_upload_cb)
: options_(options),
sequenced_task_runner_(queues_container->sequenced_task_runner()),
queues_container_(queues_container),
encryption_module_(encryption_module),
key_delivery_(
KeyDelivery::Create(encryption_module, async_start_upload_cb)),
compression_module_(compression_module),
key_in_storage_(std::make_unique<KeyInStorage>(
options.signature_verification_public_key(), options.directory())),
async_start_upload_cb_(async_start_upload_cb) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
void Storage::Write(Priority priority,
Record record,
base::OnceCallback<void(Status)> completion_cb) {
AsyncGetQueueAndProceed(
priority,
base::BindOnce(
[](scoped_refptr<Storage> self, Priority priority, Record record,
scoped_refptr<StorageQueue> queue,
base::OnceCallback<void(Status)> completion_cb) {
if (self->encryption_module_->is_enabled() &&
!self->encryption_module_->has_encryption_key()) {
// Key was not found at startup time. Note that if the key is
// outdated, we still can't use it, and won't load it now. So
// this processing can only happen after Storage is initialized
// (until the first successful delivery of a key). After that we
// will resume the write into the queue.
KeyDelivery::RequestCallback action = base::BindOnce(
[](scoped_refptr<StorageQueue> queue, Record record,
base::OnceCallback<void(Status)> completion_cb,
Status status) {
if (!status.ok()) {
std::move(completion_cb).Run(status);
return;
}
queue->Write(std::move(record), std::move(completion_cb));
},
queue, std::move(record), std::move(completion_cb));
self->key_delivery_->Request(std::move(action));
return;
}
// Otherwise we can write into the queue right away.
queue->Write(std::move(record), std::move(completion_cb));
},
base::WrapRefCounted(this), priority, std::move(record)),
std::move(completion_cb));
}
void Storage::Confirm(SequenceInformation sequence_information,
bool force,
base::OnceCallback<void(Status)> completion_cb) {
const Priority priority = sequence_information.priority();
AsyncGetQueueAndProceed(
priority,
base::BindOnce(
[](SequenceInformation sequence_information, bool force,
scoped_refptr<StorageQueue> queue,
base::OnceCallback<void(Status)> completion_cb) {
queue->Confirm(std::move(sequence_information), force,
std::move(completion_cb));
},
std::move(sequence_information), force),
std::move(completion_cb));
}
void Storage::Flush(Priority priority,
base::OnceCallback<void(Status)> completion_cb) {
AsyncGetQueueAndProceed(
priority,
base::BindOnce([](scoped_refptr<StorageQueue> queue,
base::OnceCallback<void(Status)> completion_cb) {
queue->Flush(std::move(completion_cb));
}),
std::move(completion_cb));
}
void Storage::UpdateEncryptionKey(SignedEncryptionInfo signed_encryption_key) {
// Verify received key signature. Bail out if failed.
const auto signature_verification_status =
key_in_storage_->VerifySignature(signed_encryption_key);
if (!signature_verification_status.ok()) {
LOG(WARNING) << "Key failed verification, status="
<< signature_verification_status;
key_delivery_->OnCompletion(signature_verification_status);
return;
}
// Assign the received key to encryption module.
encryption_module_->UpdateAsymmetricKey(
signed_encryption_key.public_asymmetric_key(),
signed_encryption_key.public_key_id(),
base::BindOnce(
[](scoped_refptr<Storage> storage, Status status) {
if (!status.ok()) {
LOG(WARNING) << "Encryption key update failed, status=" << status;
storage->key_delivery_->OnCompletion(status);
return;
}
// Encryption key updated successfully.
storage->key_delivery_->OnCompletion(Status::StatusOK());
},
base::WrapRefCounted(this)));
// Serialize whole signed_encryption_key to a new file, discard the old
// one(s). Do it on a thread which may block doing file operations.
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(
[](SignedEncryptionInfo signed_encryption_key,
scoped_refptr<Storage> storage) {
const Status status =
storage->key_in_storage_->UploadKeyFile(signed_encryption_key);
LOG_IF(ERROR, !status.ok())
<< "Failed to upload the new encription key.";
},
std::move(signed_encryption_key), base::WrapRefCounted(this)));
}
void Storage::AsyncGetQueueAndProceed(
Priority priority,
base::OnceCallback<void(scoped_refptr<StorageQueue>,
base::OnceCallback<void(Status)>)> queue_action,
base::OnceCallback<void(Status)> completion_cb) {
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](scoped_refptr<QueuesContainer> queues_container, Priority priority,
base::OnceCallback<void(scoped_refptr<StorageQueue>,
base::OnceCallback<void(Status)>)>
queue_action,
base::OnceCallback<void(Status)> completion_cb) {
// Attempt to get queue by priority on the Storage task runner.
auto queue_result =
queues_container->GetQueue(priority, GenerationGuid());
if (!queue_result.ok()) {
// Queue not found, abort.
std::move(completion_cb).Run(queue_result.status());
return;
}
// Queue found, execute the action (it should relocate on
// queue thread soon, to not block Storage task runner).
std::move(queue_action)
.Run(queue_result.ValueOrDie(), std::move(completion_cb));
},
queues_container_, priority, std::move(queue_action),
std::move(completion_cb)));
}
void Storage::RegisterCompletionCallback(base::OnceClosure callback) {
// Although this is an asynchronous action, note that Storage cannot be
// destructed until the callback is registered - QueuesContainer owns
// a reference to each StorageQueue and is itself held by an added
// reference here. Thus, the callback being registered is guaranteed
// to be called when the Storage is being destructed.
DCHECK(callback);
sequenced_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&QueuesContainer::RegisterCompletionCallback,
queues_container_, std::move(callback)));
}
} // namespace reporting