blob: 33477787dd4a9e678158cb2302eb107490e27269 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MISSIVE_CLIENT_REPORT_QUEUE_H_
#define MISSIVE_CLIENT_REPORT_QUEUE_H_
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <base/callback.h>
#include <base/values.h>
#include <google/protobuf/message_lite.h>
#include <missive/proto/record.pb.h>
#include <missive/proto/record_constants.pb.h>
#include <missive/util/status.h>
#include <missive/util/statusor.h>
namespace reporting {
// A |ReportQueue| is not meant to be created directly, instead it is
// instantiated by |ReportQueueProvider|. |ReportQueue| allows a user
// to |Enqueue| a message for delivery to a handler specified by the
// |Destination| held by the provided |ReportQueueConfiguration|.
// |ReportQueue| implementation handles scheduling storage and
// delivery.
// Enqueue can also be used with a |base::Value| or |std::string|.
//
// Example Usage:
// void SendMessage(google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb) {
// // Create configuration.
// auto config_result = reporting::ReportQueueConfiguration::Create(...);
// // Bail out if configuration failed to create.
// if (!config_result.ok()) {
// std::move(done_cb).Run(config_result.status());
// return;
// }
// // Asynchronously instantiate ReportQueue.
// base::ThreadPool::PostTask(
// FROM_HERE,
// base::BindOnce(
// [](google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb,
// std::unique_ptr<reporting::ReportQueueConfiguration> config) {
// reporting::ReportQueueProvider::CreateQueue(
// std::move(config),
// base::BindOnce(
// [](google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb,
// reporting::StatusOr<std::unique_ptr<
// reporting::ReportQueue>> report_queue_result) {
// // Bail out if queue failed to create.
// if (!report_queue_result.ok()) {
// std::move(done_cb).Run(report_queue_result.status());
// return;
// }
// // Queue created successfully, enqueue the message.
// report_queue_result.ValueOrDie()->Enqueue(
// std::move(important_message), std::move(done_cb));
// },
// std::move(important_message), std::move(done_cb)));
// },
// std::move(important_message), std::move(done_cb),
// std::move(config_result.ValueOrDie())));
// }
//
// |SpeculativeReportQueueImpl| is an extension to |ReportQueue| which allows
// to speculatively enqueue records before the actual |ReportQueue| is created
// (which may be delayed by inability to initialize |ReportClient|).
// Instantiated by |ReportQueueProvider| and can be used anywhere |ReportQueue|
// fits. Note however, that records enqueued before actual |ReportQueue|
// is ready may be lost, e.g. if the machine reboots, so for the records
// that need to be definiately recorded |ReportQueue| is preferable.
//
// Example Usage:
// void SendMessage(google::protobuf::LessImportantMessage
// less_important_message,
// reporting::ReportQueue::EnqueueCallback done_cb) {
// // Create configuration.
// auto config_result = reporting::ReportQueueConfiguration::Create(...);
// // Bail out if configuration failed to create.
// if (!config_result.ok()) {
// std::move(done_cb).Run(config_result.status());
// return;
// }
// // Synchronously instantiate SpeculativeReportQueueImpl, returning it as
// // ReportQueue still.
// auto report_queue_result =
// reporting::ReportQueueProvider::CreateSpeculativeQueue(
// std::move(config));
// if (!report_queue_result.ok()) {
// std::move(done_cb).Run(config_result.status());
// return;
// }
// // Enqueue event (store it in memory only until the actual queue is
// // created).
// report_queue_result.ValueOrDie()->Enqueue(
// std::move(less_important_message), std::move(done_cb));
// }
class ReportQueue {
public:
// An EnqueueCallback is called on the completion of any |Enqueue| call.
using EnqueueCallback = base::OnceCallback<void(Status)>;
// A FlushCallback is called on the completion of |Flush| call.
using FlushCallback = base::OnceCallback<void(Status)>;
virtual ~ReportQueue();
// Enqueue asynchronously stores and delivers a record. The |callback| will
// be called on any errors. If storage is successful |callback| will be called
// with an OK status.
//
// |priority| will Enqueue the record to the specified Priority queue.
//
// The current destinations have the following data requirements:
// (destination : requirement)
// UPLOAD_EVENTS : UploadEventsRequest
//
// |record| will be sent as a string with no conversion.
void Enqueue(base::StringPiece record,
Priority priority,
EnqueueCallback callback) const;
// |record| will be converted to a JSON string with base::JsonWriter::Write.
void Enqueue(const base::Value& record,
Priority priority,
EnqueueCallback callback) const;
// |record| will be converted to a string with SerializeToString(). The
// handler is responsible for converting the record back to a proto with a
// ParseFromString() call.
void Enqueue(const google::protobuf::MessageLite* record,
Priority priority,
EnqueueCallback callback) const;
// Initiates upload of collected records according to the priority.
// Called usually for a queue with an infinite or very large upload period.
// Multiple |Flush| calls can safely run in parallel.
// Returns error if cannot start upload.
virtual void Flush(Priority priority, FlushCallback callback) = 0;
// Prepares a callback to attach actual queue to the speculative.
// Implemented only in SpeculativeReportQueue, CHECKs in a regular one.
[[nodiscard]] virtual base::OnceCallback<
void(StatusOr<std::unique_ptr<ReportQueue>>)>
PrepareToAttachActualQueue() const = 0;
protected:
virtual void AddRecord(base::StringPiece record,
Priority priority,
EnqueueCallback callback) const = 0;
};
} // namespace reporting
#endif // MISSIVE_CLIENT_REPORT_QUEUE_H_