blob: 2db7bc4eaf436ff90f8054dceb9bab28069a2c7e [file] [log] [blame] [edit]
// Copyright 2023 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/storage/storage.h"
#include <string_view>
#include <utility>
#include <base/barrier_closure.h>
#include <base/check.h>
#include <base/containers/adapters.h>
#include <base/files/file.h>
#include <base/files/file_enumerator.h>
#include <base/files/file_path.h>
#include <base/files/file_util.h>
#include <base/files/platform_file.h>
#include <base/functional/bind.h>
#include <base/functional/callback.h>
#include <base/functional/callback_forward.h>
#include <base/functional/callback_helpers.h>
#include <base/logging.h>
#include <base/notreached.h>
#include <base/strings/strcat.h>
#include <base/strings/string_number_conversions.h>
#include <base/task/bind_post_task.h>
#include <base/task/sequenced_task_runner.h>
#include <base/task/task_runner.h>
#include <base/task/task_traits.h>
#include <base/task/thread_pool.h>
#include <base/threading/thread.h>
#include <base/memory/scoped_refptr.h>
#include <base/sequence_checker.h>
#include <base/time/time.h>
#include <base/types/expected.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include "missive/analytics/metrics.h"
#include "missive/encryption/encryption_module_interface.h"
#include "missive/health/health_module.h"
#include "missive/proto/health.pb.h"
#include "missive/proto/record.pb.h"
#include "missive/proto/record_constants.pb.h"
#include "missive/storage/key_delivery.h"
#include "missive/storage/storage_base.h"
#include "missive/storage/storage_configuration.h"
#include "missive/storage/storage_queue.h"
#include "missive/storage/storage_util.h"
#include "missive/util/status.h"
#include "missive/util/statusor.h"
#include "missive/util/task_runner_context.h"
// Temporary replacement for `Priority_Name` that does
// not work in certain CQ.
// TODO(b/294756107): Remove this function once fixed.
#include "missive/proto/priority_name.h"
namespace reporting {
constexpr std::string_view kUmaDeleteEmptyMultigenerationQueueDirectories =
"Platform.Missive.DeleteEmptyMultigenerationQueueDirectories";
// Context for creating a single queue. Upon success, calls the callback with
// the GenerationGuid passed into the context, otherwise error status.
class CreateQueueContext : public TaskRunnerContext<Status> {
public:
CreateQueueContext(
Priority priority,
QueueOptions queue_options,
scoped_refptr<Storage> storage,
GenerationGuid generation_guid,
base::OnceCallback<void(scoped_refptr<reporting::StorageQueue>,
base::OnceCallback<void(reporting::Status)>)>
queue_created_cb,
base::OnceCallback<void(Status)> completion_cb)
: TaskRunnerContext<Status>(
std::move(completion_cb),
storage->sequenced_task_runner_), // Same runner as the Storage!
queue_options_(queue_options),
storage_(storage),
generation_guid_(generation_guid),
priority_(priority),
queue_created_cb_(std::move(queue_created_cb)) {}
CreateQueueContext(const CreateQueueContext&) = delete;
CreateQueueContext& operator=(const CreateQueueContext&) = delete;
private:
void OnStart() override {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
// Set the extension of the queue directory name
queue_options_.set_subdirectory_extension(generation_guid_);
// Construct the queue
InitQueue(priority_, queue_options_);
}
void InitQueue(Priority priority, QueueOptions queue_options) {
CheckOnValidSequence();
// Instantiate queue.
storage_queue_ = StorageQueue::Create({
.generation_guid = generation_guid_,
.options = queue_options,
// Note: the callback below belongs to the Queue and does not
// outlive Storage, so it cannot refer to `storage_` itself!
.async_start_upload_cb = base::BindRepeating(
&QueueUploaderInterface::AsyncProvideUploader, priority,
storage_->health_module_, storage_->async_start_upload_cb_,
storage_->encryption_module_),
// `queues_container_` refers a weak pointer only, so that its
// callback does not hold a reference to it.
.degradation_candidates_cb = base::BindPostTask(
storage_->sequenced_task_runner_,
base::BindRepeating(&QueuesContainer::GetDegradationCandidates,
storage_->queues_container_->GetWeakPtr(),
priority)),
.disable_queue_cb = base::BindPostTask(
storage_->sequenced_task_runner_,
base::BindRepeating(&QueuesContainer::DisableQueue,
storage_->queues_container_->GetWeakPtr(),
priority)),
.disconnect_queue_cb = base::BindPostTask(
storage_->sequenced_task_runner_,
base::BindRepeating(&QueuesContainer::DisconnectQueue,
storage_->queues_container_->GetWeakPtr(),
priority)),
.encryption_module = storage_->encryption_module_,
.compression_module = storage_->compression_module_,
.uma_id = Priority_Name_Substitute(priority),
});
// Add queue to the container.
const auto added_status =
storage_->queues_container_->AddQueue(priority, storage_queue_);
if (added_status.ok()) {
// The queue has been added. Once it is initialized, we will resume at
// `Initialized` and invoke the `queue_created_cb_` (if successful).
// Asynchronously run initialization.
storage_queue_->Init(
/*init_retry_cb=*/base::BindRepeating(
&StorageQueue::MaybeBackoffAndReInit),
/*initialized_cb=*/base::BindPostTaskToCurrentDefault(base::BindOnce(
&CreateQueueContext::Initialized, base::Unretained(this),
/*priority=*/priority)));
return;
}
// The queue failed to add. It could happen because the same priority and
// guid were being added in parallel (could only happen when new
// multi-generation queues are needed for `Write` operation).
// We will check whether this is the case, and return that queue instead.
const auto query_result =
storage_->queues_container_->GetQueue(priority, generation_guid_);
if (!query_result.has_value()) {
// No pre-recorded queue either.
Response(added_status);
return;
}
// Substitute and use prior queue from now on.
storage_queue_ = query_result.value();
// Schedule `Initialized` to be invoked when initialization is done (or
// immediately, if the queue is already initialized).
storage_queue_->OnInit(base::BindPostTaskToCurrentDefault(base::BindOnce(
&CreateQueueContext::Initialized, base::Unretained(this), priority)));
}
void Initialized(Priority priority, Status initialization_result) {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
if (!initialization_result.ok()) {
LOG(ERROR) << "Could not initialize queue for generation_guid="
<< generation_guid_ << " priority=" << priority
<< ", error=" << initialization_result;
Response(initialization_result);
return;
}
// Report success.
std::move(queue_created_cb_)
.Run(storage_queue_,
base::BindPostTaskToCurrentDefault(base::BindOnce(
&CreateQueueContext::Response, base::Unretained(this))));
}
QueueOptions queue_options_;
scoped_refptr<StorageQueue> storage_queue_;
const scoped_refptr<Storage> storage_;
const GenerationGuid generation_guid_;
const Priority priority_;
base::OnceCallback<void(scoped_refptr<reporting::StorageQueue>,
base::OnceCallback<void(reporting::Status)>)>
queue_created_cb_;
};
void Storage::Create(
const Storage::Settings& settings,
base::OnceCallback<void(StatusOr<scoped_refptr<Storage>>)> completion_cb) {
// Initializes Storage object and populates all the queues by reading the
// storage directory and parsing queue directory names. Deletes directories
// that do not following the queue directory name format.
class StorageInitContext
: public TaskRunnerContext<StatusOr<scoped_refptr<Storage>>> {
public:
StorageInitContext(
scoped_refptr<Storage> storage,
base::OnceCallback<void(StatusOr<scoped_refptr<Storage>>)> callback)
: TaskRunnerContext<StatusOr<scoped_refptr<Storage>>>(
std::move(callback),
storage->sequenced_task_runner_), // Same runner as the Storage!
storage_(std::move(storage)) {}
private:
// Context can only be deleted by calling Response method.
~StorageInitContext() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
CHECK_EQ(count_, 0u);
}
void OnStart() override {
CheckOnValidSequence();
const bool executed_without_error =
StorageDirectory::DeleteEmptyMultigenerationQueueDirectories(
storage_->options_.directory());
const auto res = analytics::Metrics::SendBoolToUMA(
/*name=*/kUmaDeleteEmptyMultigenerationQueueDirectories.data(),
executed_without_error);
LOG_IF(ERROR, !res) << "SendEnumToUMA failure, "
<< kUmaDeleteEmptyMultigenerationQueueDirectories
<< " " << executed_without_error;
// Get the information we need to create queues
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
queue_parameters_ = StorageDirectory::FindQueueDirectories(
storage_->options_.directory(),
storage_->options_.ProduceQueuesOptionsList());
// If encryption is not enabled, proceed with the queues.
if (!storage_->encryption_module_->is_enabled()) {
InitAllQueues();
return;
}
// Encryption is enabled. Locate the latest signed_encryption_key
// file with matching key signature after deserialization.
const auto download_key_result =
storage_->key_in_storage_->DownloadKeyFile();
if (!download_key_result.has_value()) {
// Key not found or corrupt. Proceed with encryption setup.
// Key will be downloaded during setup.
EncryptionSetUp(download_key_result.error());
return;
}
// Key found, verified and downloaded.
storage_->encryption_module_->UpdateAsymmetricKey(
download_key_result.value().first, download_key_result.value().second,
base::BindPostTaskToCurrentDefault(base::BindOnce(
&StorageInitContext::EncryptionSetUp, base::Unretained(this))));
}
void EncryptionSetUp(Status status) {
CheckOnValidSequence();
if (status.ok()) {
// Encryption key has been found and set up. Must be available now.
CHECK(storage_->encryption_module_->has_encryption_key());
// Enable periodic updates of the key.
storage_->key_delivery_->StartPeriodicKeyUpdate();
} else {
LOG(WARNING)
<< "Encryption is enabled, but the key is not available yet, "
"status="
<< status;
}
InitAllQueues();
}
void InitAllQueues() {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
count_ = queue_parameters_.size();
if (count_ == 0) {
Response(std::move(storage_));
return;
}
// Create queues the queue directories we found in the storage directory
for (const auto& [priority, generation_guid] : queue_parameters_) {
Start<CreateQueueContext>(
// Don't transfer ownership of `storage_` via std::move() since
// we need to return `storage_` in the response
priority, storage_->options_.ProduceQueueOptions(priority),
storage_, generation_guid,
base::BindOnce(&StorageInitContext::QueueCreated,
base::Unretained(this)),
base::BindPostTaskToCurrentDefault(
base::BindOnce(&StorageInitContext::RespondIfAllQueuesCreated,
base::Unretained(this))));
}
}
void QueueCreated(scoped_refptr<StorageQueue> created_queue,
base::OnceCallback<void(Status)> completion_cb) {
CheckOnValidSequence();
std::move(completion_cb).Run(Status::StatusOK());
}
void RespondIfAllQueuesCreated(Status status) {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
if (!status.ok()) {
LOG(ERROR) << "Failed to create queue during Storage creation, error="
<< status;
final_status_ = status;
}
CHECK_GT(count_, 0u);
if (--count_ > 0u) {
return;
}
if (!final_status_.ok()) {
Response(base::unexpected(final_status_));
return;
}
Response(std::move(storage_));
}
StorageOptions::QueuesOptionsList queues_options_
GUARDED_BY_CONTEXT(storage_->sequence_checker_);
const scoped_refptr<Storage> storage_;
size_t count_ GUARDED_BY_CONTEXT(storage_->sequence_checker_) = 0;
Status final_status_ GUARDED_BY_CONTEXT(storage_->sequence_checker_) =
Status::StatusOK();
// Stores necessary fields for creating queues. Populated by parsing queue
// directory names.
StorageDirectory::Set queue_parameters_
GUARDED_BY_CONTEXT(storage_->sequence_checker_);
};
// Create Storage object.
// Cannot use base::MakeRefCounted<Storage>, because constructor is
// private.
auto storage = base::WrapRefCounted(new Storage(settings));
// Asynchronously run initialization.
Start<StorageInitContext>(std::move(storage), std::move(completion_cb));
}
Storage::Storage(const Storage::Settings& settings)
: options_(settings.options),
sequenced_task_runner_(
settings.queues_container->sequenced_task_runner()),
server_configuration_controller_(
settings.server_configuration_controller),
health_module_(settings.health_module),
encryption_module_(settings.encryption_module),
key_delivery_(KeyDelivery::Create(options_.key_check_period(),
settings.encryption_module,
settings.async_start_upload_cb)),
compression_module_(settings.compression_module),
key_in_storage_(std::make_unique<KeyInStorage>(
settings.options.signature_verification_public_key(),
settings.signature_verification_dev_flag,
settings.options.directory())),
async_start_upload_cb_(settings.async_start_upload_cb),
queues_container_(settings.queues_container) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
Storage::~Storage() = default;
void Storage::Write(Priority priority,
Record record,
base::OnceCallback<void(Status)> completion_cb) {
// Ensure everything is executed on Storage's sequenced task runner
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](scoped_refptr<Storage> self, Priority priority, Record record,
base::OnceCallback<void(Status)> completion_cb) {
// Check if the destination is blocked by the current configuration
// file provided by the server, this file has already been
// fetched and validated on the browser code.
if (self->server_configuration_controller_->IsDestinationBlocked(
record.destination())) {
// If the health module is enabled then we generate a record and
// move `recorder` into local variable, so that it gets
// destructed.
if (auto blocked_recorder = self->health_module_->NewRecorder()) {
auto* const blocked_record =
blocked_recorder->mutable_blocked_record_call();
blocked_record->set_priority(priority);
blocked_record->set_destination(record.destination());
// Move `blocked_recorder` into local variable, so that it
// destructs. After that it is no longer necessary anyway,
// but being destructed here, it will be included in
// health history and attached to write response request
// and thus immediately visible on Chrome.
const auto finished_recording = std::move(blocked_recorder);
}
// Since we are blocking this record we are not adding it to the
// storage, we are just returning.
std::move(completion_cb)
.Run(Status(error::CANCELLED,
"Record blocked by destination."));
return;
}
// Provide health module recorded, if debugging is enabled.
if (auto recorder = self->health_module_->NewRecorder()) {
auto* const enqueue_record =
recorder->mutable_enqueue_record_call();
enqueue_record->set_priority(priority);
enqueue_record->set_destination(record.destination());
completion_cb = base::BindOnce(
[](HealthModule::Recorder recorder,
base::OnceCallback<void(Status)> completion_cb,
Status status) {
if (recorder) {
if (!status.ok()) {
status.SaveTo(recorder->mutable_enqueue_record_call()
->mutable_status());
}
// Move `recorder` into local variable, so that it
// destructs. After that it is no longer necessary anyway,
// but being destructed here, it will be included in
// health history and attached to write response request
// and thus immediately visible on Chrome.
const auto finished_recording = std::move(recorder);
}
std::move(completion_cb).Run(status);
},
std::move(recorder), std::move(completion_cb));
}
const DMtoken& dm_token = record.dm_token();
// Provide health module recorded, if debugging is enabled.
auto recorder = self->health_module_->NewRecorder();
if (recorder) {
auto* const queue_action_record =
recorder->mutable_storage_queue_action();
queue_action_record->set_priority(priority);
queue_action_record
->mutable_storage_enqueue(); // Expected enqueue action.
}
// Callback that writes to the queue.
auto write_queue_action =
base::BindOnce(&Storage::WriteToQueue, self, std::move(record),
std::move(recorder));
GenerationGuid generation_guid;
if (self->options_.is_multi_generational(priority)) {
// Get or create the generation guid associated with the dm token
// and priority in this record.
StatusOr<GenerationGuid> generation_guid_result =
self->queues_container_->GetOrCreateGenerationGuid(dm_token,
priority);
if (!generation_guid_result.has_value()) {
// This should never happen. We should always be able to create
// a generation guid if one doesn't exist.
NOTREACHED_NORETURN();
}
generation_guid = generation_guid_result.value();
}
// Find the queue for this generation guid + priority and write to
// it.
auto queue_result = self->TryGetQueue(priority, generation_guid);
if (queue_result.has_value()) {
// The queue we need already exists, so we can write to it.
std::move(write_queue_action)
.Run(std::move(queue_result.value()),
std::move(completion_cb));
return;
}
// We don't have a queue for this priority + generation guid, so
// create one, and then let the context execute the write
// via `write_queue_action`. Note that we can end up in a race
// with another `Write` of the same `priority` and
// `generation_guid`, and in that case only one queue will survive
// and be used.
Start<CreateQueueContext>(
priority, self->options_.ProduceQueueOptions(priority), self,
generation_guid, std::move(write_queue_action),
std::move(completion_cb));
},
base::WrapRefCounted(this), priority, std::move(record),
std::move(completion_cb)));
}
void Storage::WriteToQueue(Record record,
HealthModule::Recorder recorder,
scoped_refptr<StorageQueue> queue,
base::OnceCallback<void(Status)> completion_cb) {
if (encryption_module_->is_enabled() &&
!encryption_module_->has_encryption_key()) {
// Key was not found at startup time. Note that if the key
// is outdated, we still can use it, and won't load it now.
// So this processing can only happen after Storage is
// initialized (until the first successful delivery of a
// key). After that we will resume the write into the queue.
KeyDelivery::RequestCallback action = base::BindOnce(
[](scoped_refptr<StorageQueue> queue, Record record,
HealthModule::Recorder recorder,
base::OnceCallback<void(Status)> completion_cb, Status status) {
if (!status.ok()) {
if (recorder) {
status.SaveTo(
recorder->mutable_storage_queue_action()->mutable_status());
// Move `recorder` into local variable, so that it destructs.
// After that it is no longer necessary anyway, but being
// destructed here, it will be included in health history and
// attached to write response request and thus immediately
// visible on Chrome.
const auto finished_recording = std::move(recorder);
}
std::move(completion_cb).Run(status);
return;
}
queue->Write(std::move(record), std::move(recorder),
std::move(completion_cb));
},
queue, std::move(record), std::move(recorder),
std::move(completion_cb));
key_delivery_->Request(std::move(action));
return;
}
// Otherwise we can write into the queue right away.
queue->Write(std::move(record), std::move(recorder),
std::move(completion_cb));
}
void Storage::Confirm(SequenceInformation sequence_information,
bool force,
base::OnceCallback<void(Status)> completion_cb) {
// Subtle bug: sequence_information is moved instead of copied, so we need
// to extract fields from it, or else those fields will be empty when
// sequence_information is consumed by std::move
const GenerationGuid generation_guid = sequence_information.generation_guid();
const Priority priority = sequence_information.priority();
if (auto recorder = health_module_->NewRecorder()) {
auto* const record = recorder->mutable_confirm_record_upload_call();
record->set_priority(sequence_information.priority());
record->set_sequencing_id(sequence_information.sequencing_id());
record->set_force_confirm(force);
completion_cb = base::BindOnce(
[](HealthModule::Recorder recorder,
base::OnceCallback<void(Status)> completion_cb, Status status) {
if (recorder) {
if (!status.ok()) {
status.SaveTo(recorder->mutable_confirm_record_upload_call()
->mutable_status());
}
// Move `recorder` into local variable, so that it destructs.
// After that it is no longer necessary anyway, but being
// destructed here, it will be included in health history and
// attached to write response request and thus immediately visible
// on Chrome.
const auto finished_recording = std::move(recorder);
}
std::move(completion_cb).Run(status);
},
std::move(recorder), std::move(completion_cb));
}
auto recorder = health_module_->NewRecorder();
if (recorder) {
auto* const queue_action_record = recorder->mutable_storage_queue_action();
queue_action_record->set_priority(sequence_information.priority());
queue_action_record->mutable_storage_dequeue(); // expected dequeue action
}
// Prepare an async confirmation action to be directed to the queue.
auto queue_confirm_action = base::BindOnce(
[](SequenceInformation sequence_information, bool force,
HealthModule::Recorder recorder, scoped_refptr<StorageQueue> queue,
base::OnceCallback<void(Status)> completion_cb) {
queue->Confirm(std::move(sequence_information), force,
std::move(recorder), std::move(completion_cb));
},
std::move(sequence_information), force, std::move(recorder));
// Locate or create a queue, pass it to the action callback.
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](scoped_refptr<Storage> self, Priority priority,
StatusOr<GenerationGuid> generation_guid,
base::OnceCallback<void(scoped_refptr<StorageQueue>,
base::OnceCallback<void(Status)>)>
queue_action,
base::OnceCallback<void(Status)> completion_cb) {
auto queue_result = self->TryGetQueue(priority, generation_guid);
if (!queue_result.has_value()) {
std::move(completion_cb).Run(queue_result.error());
return;
}
// Queue found, execute the action (it should relocate on
// queue thread soon, to not block Storage task runner).
std::move(queue_action)
.Run(queue_result.value(), std::move(completion_cb));
},
base::WrapRefCounted(this), priority, std::move(generation_guid),
std::move(queue_confirm_action), std::move(completion_cb)));
}
class FlushContext : public TaskRunnerContext<Status> {
public:
FlushContext(scoped_refptr<Storage> storage,
Priority priority,
base::OnceCallback<void(Status)> callback)
: TaskRunnerContext<Status>(
std::move(callback),
storage->sequenced_task_runner_), // Same runner as the Storage!
storage_(storage),
priority_(priority) {}
FlushContext(const FlushContext&) = delete;
FlushContext& operator=(const FlushContext&) = delete;
private:
// Context can only be deleted by calling Response method.
~FlushContext() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
CHECK_EQ(count_, 0u);
}
void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
// Flush each queue
count_ = storage_->queues_container_->RunActionOnAllQueues(
priority_,
base::BindRepeating(
[](FlushContext* context, scoped_refptr<StorageQueue> queue) {
queue->Flush(base::BindPostTaskToCurrentDefault(base::BindOnce(
&FlushContext::RespondIfAllQueuesAreFlush,
base::Unretained(context), queue->generation_guid())));
},
base::Unretained(this)));
}
void RespondIfAllQueuesAreFlush(GenerationGuid generation_guid,
Status status) {
CheckOnValidSequence();
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_->sequence_checker_);
if (!status.ok()) {
if (final_status_.ok()) {
final_status_ = status;
}
LOG(ERROR) << "Failed to flush queue with priority = " << priority_
<< " generation_guid=" << generation_guid
<< ", error=" << status.error_message();
}
CHECK_GT(count_, 0u);
if (--count_ > 0u) {
return;
}
Response(final_status_);
}
Status final_status_ GUARDED_BY_CONTEXT(storage_->sequence_checker_) =
Status::StatusOK();
const scoped_refptr<Storage> storage_;
size_t count_ GUARDED_BY_CONTEXT(storage_->sequence_checker_) = 0;
const Priority priority_;
};
void Storage::Flush(Priority priority,
base::OnceCallback<void(Status)> completion_cb) {
if (auto recorder = health_module_->NewRecorder()) {
recorder->mutable_flush_priority_call()->set_priority(priority);
completion_cb = base::BindOnce(
[](HealthModule::Recorder recorder,
base::OnceCallback<void(Status)> completion_cb, Status status) {
if (recorder) {
if (!status.ok()) {
status.SaveTo(
recorder->mutable_flush_priority_call()->mutable_status());
}
// Move `recorder` into local variable, so that it destructs.
// After that it is no longer necessary anyway, but being
// destructed here, it will be included in health history and
// attached to write response request and thus immediately visible
// on Chrome.
const auto finished_recording = std::move(recorder);
}
std::move(completion_cb).Run(status);
},
std::move(recorder), std::move(completion_cb));
}
// If key is not available, there is nothing to flush, but we need to
// request the key instead.
if (encryption_module_->is_enabled() &&
!encryption_module_->has_encryption_key()) {
key_delivery_->Request(std::move(completion_cb));
return;
}
Start<FlushContext>(base::WrapRefCounted(this), priority,
std::move(completion_cb));
}
void Storage::UpdateEncryptionKey(SignedEncryptionInfo signed_encryption_key) {
// Verify received key signature. Bail out if failed.
const auto signature_verification_status =
key_in_storage_->VerifySignature(signed_encryption_key);
if (!signature_verification_status.ok()) {
LOG(WARNING) << "Key failed verification, status="
<< signature_verification_status;
key_delivery_->OnKeyUpdateResult(signature_verification_status);
return;
}
// Assign the received key to encryption module.
encryption_module_->UpdateAsymmetricKey(
signed_encryption_key.public_asymmetric_key(),
signed_encryption_key.public_key_id(),
base::BindOnce(
[](scoped_refptr<Storage> storage, Status status) {
if (!status.ok()) {
LOG(WARNING) << "Encryption key update failed, status=" << status;
storage->key_delivery_->OnKeyUpdateResult(status);
return;
}
// Encryption key updated successfully.
storage->key_delivery_->OnKeyUpdateResult(Status::StatusOK());
},
base::WrapRefCounted(this)));
// Serialize whole signed_encryption_key to a new file, discard the old
// one(s). Do it on a thread which may block doing file operations.
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(
[](SignedEncryptionInfo signed_encryption_key,
scoped_refptr<Storage> storage) {
const Status status =
storage->key_in_storage_->UploadKeyFile(signed_encryption_key);
LOG_IF(ERROR, !status.ok())
<< "Failed to upload the new encription key.";
},
std::move(signed_encryption_key), base::WrapRefCounted(this)));
}
StatusOr<scoped_refptr<StorageQueue>> Storage::TryGetQueue(
Priority priority, StatusOr<GenerationGuid> generation_guid) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!generation_guid.has_value()) {
return base::unexpected(std::move(generation_guid).error());
}
// Attempt to get queue by priority and generation_guid on
// the Storage task runner.
auto queue_result =
queues_container_->GetQueue(priority, generation_guid.value());
if (!queue_result.has_value()) {
// Queue not found, abort.
return base::unexpected(std::move(queue_result).error());
}
// Queue found, return it.
return std::move(queue_result).value();
}
void Storage::RegisterCompletionCallback(base::OnceClosure callback) {
// Although this is an asynchronous action, note that Storage cannot be
// destructed until the callback is registered - StorageQueue is held by
// added reference here. Thus, the callback being registered is guaranteed
// to be called when the Storage is being destructed.
CHECK(callback);
sequenced_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&QueuesContainer::RegisterCompletionCallback,
queues_container_, std::move(callback)));
}
} // namespace reporting