blob: 97dc2f2ebbc0a4642d1d52fa431c60a5c91a44d4 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "missive/scheduler/upload_job.h"
#include <string>
#include <utility>
#include <vector>
#include <base/run_loop.h>
#include <base/memory/scoped_refptr.h>
#include <base/test/task_environment.h>
#include <chromeos/dbus/service_constants.h>
#include <dbus/bus.h>
#include <dbus/message.h>
#include <dbus/mock_bus.h>
#include <dbus/mock_object_proxy.h>
#include <dbus/object_path.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "missive/proto/interface.pb.h"
#include "missive/proto/record.pb.h"
#include "missive/util/test_support_callbacks.h"
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::WithArgs;
namespace reporting {
namespace {
class UploadClientProducer : public UploadClient {
public:
static scoped_refptr<UploadClient> CreateForTests(
scoped_refptr<dbus::Bus> bus, dbus::ObjectProxy* chrome_proxy) {
return UploadClient::Create(bus, chrome_proxy);
}
};
class TestRecordUploader {
public:
explicit TestRecordUploader(std::vector<EncryptedRecord> records)
: records_(std::move(records)),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT})) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
void StartUpload(
StatusOr<std::unique_ptr<UploaderInterface>> uploader_interface) {
EXPECT_TRUE(uploader_interface.ok());
uploader_interface_ = std::move(uploader_interface.ValueOrDie());
PostNextUpload(/*next=*/true);
}
private:
void Upload(bool send_next_record) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!send_next_record || records_.empty()) {
uploader_interface_->Completed(Status::StatusOK());
uploader_interface_.reset(); // Do not need it anymore.
return;
}
uploader_interface_->ProcessRecord(
records_.front(), base::BindOnce(&TestRecordUploader::PostNextUpload,
base::Unretained(this)));
records_.erase(records_.begin());
}
void PostNextUpload(bool next) {
sequenced_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&TestRecordUploader::Upload,
base::Unretained(this), next));
}
std::vector<EncryptedRecord> records_;
std::unique_ptr<UploaderInterface> uploader_interface_;
// To protect |records_| running uploads on sequence.
SEQUENCE_CHECKER(sequence_checker_);
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
};
class UploadJobTest : public ::testing::Test {
protected:
void SetUp() override {
dbus_task_runner_ = base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()});
test::TestEvent<scoped_refptr<dbus::MockBus>> dbus_waiter;
dbus_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&UploadJobTest::CreateMockDBus, dbus_waiter.cb()));
mock_bus_ = dbus_waiter.result();
EXPECT_CALL(*mock_bus_, GetDBusTaskRunner())
.WillRepeatedly(Return(dbus_task_runner_.get()));
EXPECT_CALL(*mock_bus_, GetOriginTaskRunner())
.WillRepeatedly(Return(dbus_task_runner_.get()));
// We actually want AssertOnOriginThread and AssertOnDBusThread to work
// properly (actually assert they are on dbus_thread_). If the unit tests
// end up creating calls on the wrong thread, the unit test will just hang
// anyways, and it's easier to debug if we make the program crash at that
// point. Since these are ON_CALLs, VerifyAndClearMockExpectations doesn't
// clear them.
ON_CALL(*mock_bus_, AssertOnOriginThread())
.WillByDefault(Invoke(this, &UploadJobTest::AssertOnDBusThread));
ON_CALL(*mock_bus_, AssertOnDBusThread())
.WillByDefault(Invoke(this, &UploadJobTest::AssertOnDBusThread));
mock_chrome_proxy_ = new dbus::MockObjectProxy(
mock_bus_.get(), chromeos::kChromeReportingServiceName,
dbus::ObjectPath(chromeos::kChromeReportingServicePath));
upload_client_ = UploadClientProducer::CreateForTests(
mock_bus_, mock_chrome_proxy_.get());
}
void TearDown() override {
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
}
static void CreateMockDBus(
base::OnceCallback<void(scoped_refptr<dbus::MockBus>)> ready_cb) {
dbus::Bus::Options options;
options.bus_type = dbus::Bus::SYSTEM;
std::move(ready_cb).Run(
base::WrapRefCounted<dbus::MockBus>(new dbus::MockBus(options)));
}
void AssertOnDBusThread() {
ASSERT_TRUE(dbus_task_runner_->RunsTasksInCurrentSequence());
}
base::test::TaskEnvironment task_environment_;
scoped_refptr<base::SequencedTaskRunner> dbus_task_runner_;
scoped_refptr<dbus::MockBus> mock_bus_;
scoped_refptr<dbus::MockObjectProxy> mock_chrome_proxy_;
scoped_refptr<UploadClient> upload_client_;
};
TEST_F(UploadJobTest, UploadsRecords) {
const constexpr char kTestData[] = "TEST_DATA";
const int64_t kGenerationId = 1701;
const Priority kPriority = Priority::SLOW_BATCH;
std::vector<EncryptedRecord> records;
for (size_t seq_id = 0; seq_id < 10; seq_id++) {
records.emplace_back();
EncryptedRecord& encrypted_record = records.back();
encrypted_record.set_encrypted_wrapped_record(kTestData);
SequencingInformation* sequencing_information =
encrypted_record.mutable_sequencing_information();
sequencing_information->set_sequencing_id(seq_id);
sequencing_information->set_generation_id(kGenerationId);
sequencing_information->set_priority(kPriority);
}
std::unique_ptr<dbus::Response> dbus_response = dbus::Response::CreateEmpty();
dbus::Response* dbus_response_ptr = dbus_response.get();
// Create a copy of the records to ensure they are passed correctly.
const std::vector<EncryptedRecord> expected_records(records);
EXPECT_CALL(*mock_chrome_proxy_, DoCallMethod(_, _, _))
.WillOnce(WithArgs<0, 2>(Invoke(
[&expected_records, dbus_response_ptr](
dbus::MethodCall* call,
base::OnceCallback<void(dbus::Response * response)>* response) {
ASSERT_EQ(call->GetInterface(),
chromeos::kChromeReportingServiceInterface);
ASSERT_EQ(
call->GetMember(),
chromeos::kChromeReportingServiceUploadEncryptedRecordMethod);
// Read the request.
dbus::MessageReader reader(call);
UploadEncryptedRecordRequest request;
ASSERT_TRUE(reader.PopArrayOfBytesAsProto(&request));
ASSERT_EQ(request.encrypted_record_size(), expected_records.size());
for (size_t i = 0; i < request.encrypted_record_size(); i++) {
std::string expected_serialized;
expected_records[i].SerializeToString(&expected_serialized);
std::string requested_serialized;
request.encrypted_record(i).SerializeToString(
&requested_serialized);
EXPECT_STREQ(requested_serialized.c_str(),
expected_serialized.c_str());
}
UploadEncryptedRecordResponse upload_response;
upload_response.mutable_status()->set_code(error::OK);
ASSERT_TRUE(dbus::MessageWriter(dbus_response_ptr)
.AppendProtoAsArrayOfBytes(upload_response));
std::move(*response).Run(dbus_response_ptr);
})));
TestRecordUploader record_uploader(std::move(records));
auto job_result =
UploadJob::Create(upload_client_, false,
base::BindOnce(&TestRecordUploader::StartUpload,
base::Unretained(&record_uploader)));
ASSERT_TRUE(job_result.ok()) << job_result.status();
Scheduler::Job::SmartPtr<Scheduler::Job> job =
std::move(job_result.ValueOrDie());
test::TestEvent<Status> uploaded;
job->Start(uploaded.cb());
const Status status = uploaded.result();
EXPECT_OK(status) << status;
// Let everything finish before record_uploader destructs.
task_environment_.RunUntilIdle();
}
} // namespace
} // namespace reporting