blob: 92e32f2c7263410e4e55ef4490b98a32353b1ff7 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MISSIVE_CLIENT_REPORT_QUEUE_PROVIDER_H_
#define MISSIVE_CLIENT_REPORT_QUEUE_PROVIDER_H_
#include <memory>
#include <utility>
#include <base/callback.h>
#include <base/feature_list.h>
#include <base/memory/ref_counted.h>
#include <base/memory/scoped_refptr.h>
#include <base/task/sequenced_task_runner.h>
#include "missive/client/report_queue.h"
#include "missive/client/report_queue_configuration.h"
#include "missive/proto/record_constants.pb.h"
#include "missive/storage/storage_module_interface.h"
#include "missive/util/shared_queue.h"
#include "missive/util/status.h"
#include "missive/util/statusor.h"
namespace reporting {
// ReportQueueProvider acts a single point for instantiating
// |reporting::ReportQueue|s. By performing initialization atomically it ensures
// that all ReportQueues are created with the same global settings.
//
// In order to utilize the ReportQueueProvider the EncryptedReportingPipeline
// feature must be turned on using --enable-features=EncryptedReportingPipeline.
//
// ReportQueueProvider is a singleton which can be accessed through
// |ReportQueueProvider::GetInstance|. This static method must be implemented
// for a specific configuration - Chrome (for ChromeOS and for other OSes),
// other ChromeOS executables.
//
// Example Usage:
// void SendMessage(google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb) {
// // Create configuration.
// auto config_result = reporting::ReportQueueConfiguration::Create(...);
// // Bail out if configuration failed to create.
// if (!config_result.ok()) {
// std::move(done_cb).Run(config_result.status());
// return;
// }
// // Asynchronously create ReportingQueue.
// base::ThreadPool::PostTask(
// FROM_HERE,
// base::BindOnce(
// [](google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb,
// std::unique_ptr<reporting::ReportQueueConfiguration> config) {
// // Asynchronously create ReportingQueue.
// reporting::ReportQueueProvider::CreateQueue(
// std::move(config),
// base::BindOnce(
// [](base::StringPiece data,
// reporting::ReportQueue::EnqueueCallback
// done_cb, reporting::StatusOr<std::unique_ptr<
// reporting::ReportQueue>>
// report_queue_result) {
// // Bail out if queue failed to create.
// if (!report_queue_result.ok()) {
// std::move(done_cb).Run(report_queue_result.status());
// return;
// }
// // Queue created successfully, enqueue the message.
// report_queue_result.ValueOrDie()->Enqueue(
// important_message, std::move(done_cb));
// },
// important_message, std::move(done_cb)));
// },
// important_message, std::move(done_cb),
// std::move(config_result.ValueOrDie())))
// }
class ReportQueueProvider {
public:
using CreateReportQueueResponse = StatusOr<std::unique_ptr<ReportQueue>>;
// The response will come back utilizing the ReportQueueProvider's thread. It
// is likely that within Chromium you will want to the response to come back
// on your own thread. The simplest way to achieve that is to pass a
// base::BindPostTask rather than a base::OnceCallback. Another way to achieve
// the same result is to utilize base::OnceCallback, and capture the response
// and forward it to your own thread. We maintain base::OnceCallback here for
// use in ChromiumOS.
using CreateReportQueueCallback =
base::OnceCallback<void(CreateReportQueueResponse)>;
using InitCompleteCallback = base::OnceCallback<void(Status)>;
using OnStorageModuleCreatedCallback =
base::OnceCallback<void(StatusOr<scoped_refptr<StorageModuleInterface>>)>;
using StorageModuleCreateCallback =
base::RepeatingCallback<void(OnStorageModuleCreatedCallback)>;
// Callback triggered with updated report queue config after it has been
// configured with appropriate DM tokens after it is retrieved. Typically,
// this is when we go ahead and create the report queue.
using ReportQueueConfiguredCallback = base::OnceCallback<void(
StatusOr<std::unique_ptr<ReportQueueConfiguration>>)>;
explicit ReportQueueProvider(StorageModuleCreateCallback storage_create_cb);
ReportQueueProvider(const ReportQueueProvider& other) = delete;
ReportQueueProvider& operator=(const ReportQueueProvider& other) = delete;
virtual ~ReportQueueProvider();
// Asynchronously creates a queue based on the configuration. In the process
// singleton ReportQueueProvider is potentially created and retrieved
// internally, and then started to initialize (if initialization fails,
// |CreateQueue| will return with error, but next attempt may succeed).
// Returns with the callback handing ownership to the caller (unless there is
// an error, and then it gets the error status).
static void CreateQueue(std::unique_ptr<ReportQueueConfiguration> config,
CreateReportQueueCallback queue_cb);
// Synchronously creates a speculative queue based on the configuration.
// Returns result as a smart pointer to ReportQueue with the on-thread
// deleter.
static StatusOr<std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter>>
CreateSpeculativeQueue(std::unique_ptr<ReportQueueConfiguration> config);
// Instantiates ReportQueueProvider singleton based on the overall process
// state and will refer to StorageModuleInterface and optional Uploader
// accordingly.
static ReportQueueProvider* GetInstance();
static bool IsEncryptedReportingPipelineEnabled();
static const base::Feature kEncryptedReportingPipeline;
protected:
// Accessors.
scoped_refptr<StorageModuleInterface> storage();
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner();
class InitializationStateTracker
: public base::RefCountedThreadSafe<InitializationStateTracker> {
public:
using ReleaseLeaderCallback = base::OnceCallback<void(bool)>;
using LeaderPromotionRequestCallback =
base::OnceCallback<void(StatusOr<ReleaseLeaderCallback>)>;
using GetInitStateCallback = base::OnceCallback<void(bool)>;
static scoped_refptr<InitializationStateTracker> Create();
// Will call |get_init_state_cb| with |is_initialized_| value.
void GetInitState(GetInitStateCallback get_init_state_cb);
// Will promote one initializer to leader at a time. Will deny
// initialization requests if the provider is already initialized. If
// there are no errors will return a ReleaseLeaderCallback for releasing the
// initializing leadership.
//
// Error code responses:
// RESOURCE_EXHAUSTED - Returned when a promotion is requested when there is
// already a leader.
// FAILED_PRECONDITION - Returned when a promotion is requested when
// provider is already initialized.
void RequestLeaderPromotion(
LeaderPromotionRequestCallback promo_request_cb);
private:
friend class base::RefCountedThreadSafe<InitializationStateTracker>;
InitializationStateTracker();
virtual ~InitializationStateTracker();
void OnIsInitializedRequest(GetInitStateCallback get_init_state_cb);
void OnLeaderPromotionRequest(
LeaderPromotionRequestCallback promo_request_cb);
void ReleaseLeader(bool initialization_successful);
void OnLeaderRelease(bool initialization_successful);
bool has_promoted_initializing_context_{false};
bool is_initialized_{false};
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
SEQUENCE_CHECKER(sequence_checker_);
};
protected:
// Storage module creator (can be substituted for testing purposes).
StorageModuleCreateCallback storage_create_cb_;
private:
// Holds the creation request for a ReportQueue.
class CreateReportQueueRequest {
public:
CreateReportQueueRequest(std::unique_ptr<ReportQueueConfiguration> config,
CreateReportQueueCallback create_cb);
~CreateReportQueueRequest();
CreateReportQueueRequest(CreateReportQueueRequest&& other);
std::unique_ptr<ReportQueueConfiguration> config();
CreateReportQueueCallback create_cb();
private:
std::unique_ptr<ReportQueueConfiguration> config_;
CreateReportQueueCallback create_cb_;
};
class InitializingContext;
// Finalizes provider, if the initialization process succeeded.
// May to be overridden by subclass to make more updates to the provider.
virtual void OnInitCompleted();
// Creates and initializes queue implementation. Returns status in case of
// error.
virtual void CreateNewQueue(std::unique_ptr<ReportQueueConfiguration> config,
CreateReportQueueCallback cb);
virtual StatusOr<std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter>>
CreateNewSpeculativeQueue();
// Configures a given report queue config with appropriate DM tokens after its
// retrieval so it can be used for downstream processing while building a
// report queue, and triggers the corresponding completion callback with the
// updated config.
virtual void ConfigureReportQueue(
std::unique_ptr<ReportQueueConfiguration> report_queue_config,
ReportQueueConfiguredCallback completion_cb) = 0;
void OnPushComplete();
void OnInitState(bool provider_configured);
void OnInitializationComplete(Status init_status);
void ClearRequestQueue(base::queue<CreateReportQueueRequest> failed_requests);
void BuildRequestQueue(StatusOr<CreateReportQueueRequest> pop_result);
// Queue for storing creation requests while the provider is
// initializing.
scoped_refptr<SharedQueue<CreateReportQueueRequest>> create_request_queue_;
scoped_refptr<InitializationStateTracker> init_state_tracker_;
// Storage module associated with the provider. It serves all queues created
// by it. Protected by sequenced_task_runner_.
scoped_refptr<StorageModuleInterface> storage_;
// Task runner used for guarding the provider elements.
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
SEQUENCE_CHECKER(sequence_checker_);
};
} // namespace reporting
#endif // MISSIVE_CLIENT_REPORT_QUEUE_PROVIDER_H_