blob: 234e3eae9ad81ec133c8dc019359936b00036b0e [file] [log] [blame]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/client/report_queue_impl.h"
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <base/functional/bind.h>
#include <base/functional/callback.h>
#include <base/json/json_writer.h>
#include <base/memory/ptr_util.h>
#include <base/memory/scoped_refptr.h>
#include <base/notreached.h>
#include <base/sequence_checker.h>
#include <base/strings/strcat.h>
#include <base/strings/string_number_conversions.h>
#include <base/task/bind_post_task.h>
#include <base/task/task_traits.h>
#include <base/task/thread_pool.h>
#include <base/time/time.h>
#include "missive/analytics/metrics.h"
#include "missive/client/report_queue_configuration.h"
#include "missive/proto/record.pb.h"
#include "missive/proto/record_constants.pb.h"
#include "missive/storage/storage_module_interface.h"
#include "missive/util/status.h"
#include "missive/util/statusor.h"
namespace reporting {
namespace {
// UTC time of 2122-01-01T00:00:00Z since Unix epoch 1970-01-01T00:00:00Z in
// microseconds.
constexpr int64_t kTime2122 = 4'796'668'800'000'000;
// Calls |record_producer|, checks the result and in case of success, forwards
// it to the storage. In production code should be invoked asynchronously, on a
// thread pool (no synchronization expected).
void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
Priority priority,
std::string dm_token,
Destination destination,
int64_t reserved_space,
ReportQueue::RecordProducer record_producer,
StorageModuleInterface::EnqueueCallback callback) {
// Generate record data.
auto record_result = std::move(record_producer).Run();
if (!record_result.ok()) {
std::move(callback).Run(record_result.status());
return;
}
// Augment data.
Record record;
*record.mutable_data() = std::move(record_result.ValueOrDie());
record.set_destination(destination);
if (reserved_space > 0L) {
record.set_reserved_space(reserved_space);
}
// |record| with no DM token is assumed to be associated with device DM token
if (!dm_token.empty()) {
*record.mutable_dm_token() = std::move(dm_token);
}
// Calculate timestamp in microseconds - to match Spanner expectations.
const int64_t time_since_epoch_us =
base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
if (time_since_epoch_us > kTime2122) {
// Unusual timestamp. Reject the record even though the record is good
// otherwise, because we can't obtain a reasonable timestamp. We have this
// code block here because server very occasionally detects very large
// timestamps. The reason could come from occasional irregular system time.
// Filtering out irregular timestamps here should address the problem
// without leaving timestamp-related bugs in the ERP undiscovered (should
// there be any).
analytics::Metrics::SendBoolToUMA(
"Platform.Missive.UnusualEnqueueTimestamp", true);
std::move(callback).Run(Status(
error::FAILED_PRECONDITION,
base::StrCat(
{"Abnormal system timestamp obtained. Microseconds since epoch: ",
base::NumberToString(time_since_epoch_us)})));
return;
}
record.set_timestamp_us(time_since_epoch_us);
if (!record_result.ok()) {
std::move(callback).Run(record_result.status());
return;
}
// Add resulting Record to the storage.
storage->AddRecord(priority, std::move(record), std::move(callback));
}
} // namespace
void ReportQueueImpl::Create(
std::unique_ptr<ReportQueueConfiguration> config,
scoped_refptr<StorageModuleInterface> storage,
base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
new ReportQueueImpl(std::move(config), storage)));
}
ReportQueueImpl::ReportQueueImpl(
std::unique_ptr<ReportQueueConfiguration> config,
scoped_refptr<StorageModuleInterface> storage)
: config_(std::move(config)), storage_(storage) {}
ReportQueueImpl::~ReportQueueImpl() = default;
void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
Priority priority,
EnqueueCallback callback) const {
const Status status = config_->CheckPolicy();
if (!status.ok()) {
std::move(callback).Run(status);
return;
}
if (priority == Priority::UNDEFINED_PRIORITY) {
std::move(callback).Run(
Status(error::INVALID_ARGUMENT, "Priority must be defined"));
return;
}
// Execute |record_producer| on arbitrary thread, analyze the result and send
// it to the Storage, returning with the callback.
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(&AddRecordToStorage, storage_, priority,
config_->dm_token(), config_->destination(),
config_->reserved_space(), std::move(record_producer),
std::move(callback)));
}
void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
storage_->Flush(priority, std::move(callback));
}
base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
ReportQueueImpl::PrepareToAttachActualQueue() const {
NOTREACHED();
return base::BindOnce(
[](StatusOr<std::unique_ptr<ReportQueue>>) { NOTREACHED(); });
}
// Implementation of SpeculativeReportQueueImpl::PendingRecordProducer
SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
RecordProducer producer, EnqueueCallback callback, Priority priority)
: record_producer(std::move(producer)),
record_callback(std::move(callback)),
record_priority(priority) {}
SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
PendingRecordProducer&& other)
: record_producer(std::move(other.record_producer)),
record_callback(std::move(other.record_callback)),
record_priority(other.record_priority) {}
SpeculativeReportQueueImpl::PendingRecordProducer::~PendingRecordProducer() =
default;
SpeculativeReportQueueImpl::PendingRecordProducer&
SpeculativeReportQueueImpl::PendingRecordProducer::operator=(
PendingRecordProducer&& other) {
record_producer = std::move(other.record_producer);
record_callback = std::move(other.record_callback);
record_priority = other.record_priority;
return *this;
}
// static
std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
SpeculativeReportQueueImpl::Create() {
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()});
return std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>(
new SpeculativeReportQueueImpl(sequenced_task_runner),
base::OnTaskRunnerDeleter(sequenced_task_runner));
}
SpeculativeReportQueueImpl::SpeculativeReportQueueImpl(
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: sequenced_task_runner_(sequenced_task_runner) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
SpeculativeReportQueueImpl::~SpeculativeReportQueueImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
PurgePendingProducers(
Status(error::DATA_LOSS, "The queue is being destructed"));
}
void SpeculativeReportQueueImpl::Flush(Priority priority,
FlushCallback callback) {
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](Priority priority, FlushCallback callback,
base::WeakPtr<SpeculativeReportQueueImpl> self) {
if (!self) {
std::move(callback).Run(
Status(error::UNAVAILABLE, "Queue has been destructed"));
return;
}
DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
if (!self->actual_report_queue_.has_value()) {
std::move(callback).Run(Status(error::FAILED_PRECONDITION,
"ReportQueue is not ready yet."));
return;
}
const std::unique_ptr<ReportQueue>& report_queue =
self->actual_report_queue_.value();
report_queue->Flush(priority, std::move(callback));
},
priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
}
void SpeculativeReportQueueImpl::AddProducedRecord(
RecordProducer record_producer,
Priority priority,
EnqueueCallback callback) const {
// Invoke producer on a thread pool, then enqueue record on sequenced task
// runner.
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer,
weak_ptr_factory_.GetWeakPtr(), priority,
std::move(callback), std::move(record_producer)));
}
void SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer(
Priority priority,
EnqueueCallback callback,
RecordProducer record_producer) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!actual_report_queue_.has_value()) {
// Queue is not ready yet, store the record in the memory queue.
pending_record_producers_.emplace(std::move(record_producer),
std::move(callback), priority);
return;
}
// Queue is ready. If memory queue is empty, just forward the record.
if (pending_record_producers_.empty()) {
const std::unique_ptr<ReportQueue>& report_queue =
actual_report_queue_.value();
report_queue->AddProducedRecord(std::move(record_producer), priority,
std::move(callback));
return;
}
// If memory queue is not empty, attach the new record at the
// end and initiate enqueuing of everything from there.
pending_record_producers_.emplace(std::move(record_producer),
std::move(callback), priority);
EnqueuePendingRecordProducers();
}
void SpeculativeReportQueueImpl::EnqueuePendingRecordProducers() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(actual_report_queue_.has_value());
if (pending_record_producers_.empty()) {
return;
}
const std::unique_ptr<ReportQueue>& report_queue =
actual_report_queue_.value();
auto head = std::move(pending_record_producers_.front());
pending_record_producers_.pop();
if (pending_record_producers_.empty()) {
// Last of the pending records.
report_queue->AddProducedRecord(std::move(head.record_producer),
head.record_priority,
std::move(head.record_callback));
return;
}
report_queue->AddProducedRecord(
std::move(head.record_producer), head.record_priority,
base::BindPostTask(
sequenced_task_runner_,
base::BindOnce(
[](base::WeakPtr<const SpeculativeReportQueueImpl> self,
EnqueueCallback callback, Status status) {
if (!status.ok()) {
std::move(callback).Run(status);
return;
}
if (!self) {
std::move(callback).Run(
Status(error::UNAVAILABLE, "Queue has been destructed"));
return;
}
std::move(callback).Run(status);
self->EnqueuePendingRecordProducers();
},
weak_ptr_factory_.GetWeakPtr(),
std::move(head.record_callback))));
}
base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
SpeculativeReportQueueImpl::PrepareToAttachActualQueue() const {
return base::BindPostTask(
sequenced_task_runner_,
base::BindOnce(&SpeculativeReportQueueImpl::AttachActualQueue,
weak_ptr_factory_.GetMutableWeakPtr()));
}
void SpeculativeReportQueueImpl::AttachActualQueue(
StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (actual_report_queue_.has_value()) {
// Already attached, do nothing.
return;
}
if (!status_or_actual_queue.ok()) {
// Failed to create actual queue.
// Flush all pending records with this status.
PurgePendingProducers(status_or_actual_queue.status());
return;
}
// Actual report queue succeeded, store it (never to change later).
actual_report_queue_ = std::move(status_or_actual_queue.ValueOrDie());
EnqueuePendingRecordProducers();
}
void SpeculativeReportQueueImpl::PurgePendingProducers(Status status) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
while (!pending_record_producers_.empty()) {
auto head = std::move(pending_record_producers_.front());
pending_record_producers_.pop();
std::move(head.record_callback).Run(status);
}
}
} // namespace reporting