| // Copyright 2021 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef MISSIVE_CLIENT_REPORT_QUEUE_IMPL_H_ |
| #define MISSIVE_CLIENT_REPORT_QUEUE_IMPL_H_ |
| |
| #include <memory> |
| #include <queue> |
| #include <string> |
| #include <utility> |
| |
| #include <base/callback.h> |
| #include <base/memory/ref_counted.h> |
| #include <base/memory/scoped_refptr.h> |
| #include <base/memory/weak_ptr.h> |
| #include <base/sequence_checker.h> |
| #include <base/task/sequenced_task_runner.h> |
| #include <base/values.h> |
| #include <google/protobuf/message_lite.h> |
| |
| #include "missive/client/report_queue.h" |
| #include "missive/client/report_queue_configuration.h" |
| #include "missive/proto/record.pb.h" |
| #include "missive/proto/record_constants.pb.h" |
| #include "missive/storage/storage_module_interface.h" |
| #include "missive/util/status.h" |
| #include "missive/util/statusor.h" |
| |
| namespace reporting { |
| |
| // A |ReportQueueImpl| is configured with a |ReportQueueConfiguration|. A |
| // |ReportQueueImpl| allows a user to |Enqueue| a message for delivery to a |
| // handler specified by the |Destination| held by the provided |
| // |ReportQueueConfiguration|. |ReportQueueImpl| handles scheduling storage and |
| // delivery. |
| // |
| // ReportQueues are not meant to be created directly, instead use the |
| // reporting::ReportQueueProvider::CreateQueue(...) function. See the |
| // comments for reporting::ReportingClient for example usage. |
| // |
| // Enqueue can also be used with a |base::Value| or |std::string|. |
| class ReportQueueImpl : public ReportQueue { |
| public: |
| // Factory |
| static void Create( |
| std::unique_ptr<ReportQueueConfiguration> config, |
| scoped_refptr<StorageModuleInterface> storage, |
| base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb); |
| |
| ReportQueueImpl(const ReportQueueImpl& other) = delete; |
| ReportQueueImpl& operator=(const ReportQueueImpl& other) = delete; |
| ~ReportQueueImpl() override; |
| |
| void Flush(Priority priority, FlushCallback callback) override; |
| |
| // Dummy implementation for a regular queue. |
| [[nodiscard]] base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> |
| PrepareToAttachActualQueue() const override; |
| |
| protected: |
| ReportQueueImpl(std::unique_ptr<ReportQueueConfiguration> config, |
| scoped_refptr<StorageModuleInterface> storage); |
| |
| private: |
| void AddProducedRecord(RecordProducer record_producer, |
| Priority priority, |
| EnqueueCallback callback) const override; |
| |
| const std::unique_ptr<ReportQueueConfiguration> config_; |
| const scoped_refptr<StorageModuleInterface> storage_; |
| }; |
| |
| class SpeculativeReportQueueImpl : public ReportQueue { |
| public: |
| // Factory method returns a smart pointer with on-thread deleter. |
| static std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter> |
| Create(); |
| |
| SpeculativeReportQueueImpl(const SpeculativeReportQueueImpl& other) = delete; |
| SpeculativeReportQueueImpl& operator=( |
| const SpeculativeReportQueueImpl& other) = delete; |
| ~SpeculativeReportQueueImpl() override; |
| |
| // Forwards |Flush| to |ReportQueue|, if already created. |
| // Returns with failure otherwise. |
| void Flush(Priority priority, FlushCallback callback) override; |
| |
| // Provides a callback to attach initialized actual queue to the speculative |
| // queue. |
| [[nodiscard]] base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> |
| PrepareToAttachActualQueue() const override; |
| |
| // Substitutes actual queue to the speculative, when ready. |
| // Initiates processesing of all pending records. |
| void AttachActualQueue(std::unique_ptr<ReportQueue> actual_queue); |
| |
| private: |
| // Moveable, non-copyable struct holding a pending record producer for the |
| // |pending_record_producers_| queue below. |
| struct PendingRecordProducer { |
| PendingRecordProducer(RecordProducer producer, Priority priority); |
| PendingRecordProducer(PendingRecordProducer&& other); |
| PendingRecordProducer& operator=(PendingRecordProducer&& other); |
| ~PendingRecordProducer(); |
| |
| RecordProducer record_producer; |
| Priority record_priority; |
| }; |
| |
| // Private constructor, used by the factory method only. |
| explicit SpeculativeReportQueueImpl( |
| scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner); |
| |
| // Forwards |AddProducedRecord| to |ReportQueue|, if already created. |
| // Records the record internally otherwise. |
| void AddProducedRecord(RecordProducer record_producer, |
| Priority priority, |
| EnqueueCallback callback) const override; |
| |
| // Enqueues head of the |pending_record_producers_| and reapplies for the rest |
| // of it. |
| void EnqueuePendingRecordProducers(EnqueueCallback callback) const; |
| |
| // Optionally enqueues |record_producer| (owned) to actual queue, if ready. |
| // Otherwise adds it to the end of |pending_record_producers_|. |
| void MaybeEnqueueRecordProducer(Priority priority, |
| EnqueueCallback callback, |
| RecordProducer record_producer) const; |
| |
| // Task runner that protects |report_queue_| and |pending_record_producers_| |
| // and allows to synchronize the initialization. |
| const scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_; |
| SEQUENCE_CHECKER(sequence_checker_); |
| |
| // Actual |ReportQueue|, once created. |
| std::unique_ptr<ReportQueue> report_queue_ |
| GUARDED_BY_CONTEXT(sequence_checker_); |
| |
| // Queue of the pending record producers, collected before actual queue has |
| // been created. Declared 'mutable', because it is accessed by 'const' |
| // methods. |
| mutable std::queue<PendingRecordProducer> pending_record_producers_ |
| GUARDED_BY_CONTEXT(sequence_checker_); |
| |
| // Weak pointer factory. |
| base::WeakPtrFactory<SpeculativeReportQueueImpl> weak_ptr_factory_{this}; |
| }; |
| |
| } // namespace reporting |
| |
| #endif // MISSIVE_CLIENT_REPORT_QUEUE_IMPL_H_ |