blob: 38dc36c709b5d46701cff0a292770784548abd8a [file] [log] [blame] [edit]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/scheduler/enqueue_job.h"
#include <memory>
#include <utility>
#include <base/strings/strcat.h>
#include <base/strings/string_number_conversions.h>
#include <base/task/bind_post_task.h>
#include <base/task/sequenced_task_runner.h>
#include <base/task/task_traits.h>
#include <base/task/thread_pool.h>
#include "missive/proto/interface.pb.h"
#include "missive/scheduler/scheduler.h"
#include "missive/storage/storage_module_interface.h"
#include "missive/util/status.h"
namespace reporting {
EnqueueJob::EnqueueResponseDelegate::EnqueueResponseDelegate(
scoped_refptr<HealthModule> health_module,
std::unique_ptr<
brillo::dbus_utils::DBusMethodResponse<EnqueueRecordResponse>> response)
: task_runner_(base::SequencedTaskRunner::GetCurrentDefault()),
health_module_(health_module),
response_(std::move(response)) {
CHECK(task_runner_);
CHECK(response_);
}
Status EnqueueJob::EnqueueResponseDelegate::Complete() {
return SendResponse(Status::StatusOK());
}
Status EnqueueJob::EnqueueResponseDelegate::Cancel(Status status) {
return SendResponse(status);
}
Status EnqueueJob::EnqueueResponseDelegate::SendResponse(Status status) {
EnqueueRecordResponse response_body;
status.SaveTo(response_body.mutable_status());
auto response_cb = base::BindPostTask(
task_runner_, base::BindOnce(&brillo::dbus_utils::DBusMethodResponse<
EnqueueRecordResponse>::Return,
std::move(response_)));
if (!health_module_->is_debugging()) {
std::move(response_cb).Run(std::move(response_body));
return Status::StatusOK();
}
health_module_->GetHealthData(
base::BindPostTaskToCurrentDefault(base::BindOnce(
[](base::OnceCallback<void(const EnqueueRecordResponse&)> response_cb,
EnqueueRecordResponse response_body, ERPHealthData health_data) {
*response_body.mutable_health_data() = std::move(health_data);
std::move(response_cb).Run(std::move(response_body));
},
std::move(response_cb), std::move(response_body))));
return Status::StatusOK();
}
// static
Scheduler::Job::SmartPtr<EnqueueJob> EnqueueJob::Create(
scoped_refptr<StorageModuleInterface> storage_module,
scoped_refptr<HealthModule> health_module,
EnqueueRecordRequest request,
std::unique_ptr<EnqueueResponseDelegate> delegate) {
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()});
return std::unique_ptr<EnqueueJob, base::OnTaskRunnerDeleter>(
new EnqueueJob(storage_module, health_module, sequenced_task_runner,
std::move(request), std::move(delegate)),
base::OnTaskRunnerDeleter(sequenced_task_runner));
}
EnqueueJob::EnqueueJob(
scoped_refptr<StorageModuleInterface> storage_module,
scoped_refptr<HealthModule> health_module,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner,
EnqueueRecordRequest request,
std::unique_ptr<EnqueueResponseDelegate> delegate)
: Job(std::move(delegate), sequenced_task_runner),
storage_module_(storage_module),
health_module_(health_module),
request_(std::move(request)) {}
void EnqueueJob::StartImpl() {
if (request_.has_health_data_logging_enabled()) {
health_module_->set_debugging(request_.health_data_logging_enabled());
}
storage_module_->AddRecord(
request_.priority(), std::move(request_.record()),
base::BindPostTask(
sequenced_task_runner(),
base::BindOnce(&EnqueueJob::Finish, weak_ptr_factory_.GetWeakPtr())));
}
} // namespace reporting