blob: 6c6415359ab2491740a1ea2442ef1634f21ffc29 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <memory>
#include <utility>
#include <base/bind.h>
#include <base/check_op.h>
#include <base/run_loop.h>
#include <base/memory/weak_ptr.h>
#include <base/test/task_environment.h>
#include <base/threading/sequenced_task_runner_handle.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "missive/scheduler/scheduler.h"
using ::testing::_;
using ::testing::Invoke;
using ::testing::StrictMock;
using ::testing::WithArgs;
namespace reporting {
namespace {
class TestCallbackWaiter {
public:
TestCallbackWaiter() : run_loop_(std::make_unique<base::RunLoop>()) {}
virtual void Signal() { run_loop_->Quit(); }
void Complete() { Signal(); }
void Wait() { run_loop_->Run(); }
protected:
std::unique_ptr<base::RunLoop> run_loop_;
};
class FakeJob : public Scheduler::Job {
public:
using StartCallback = base::OnceCallback<void()>;
using ReportCompletionCallback = base::OnceCallback<Status()>;
using CancelCallback = base::OnceCallback<Status(Status)>;
class FakeJobDelegate : public Scheduler::Job::JobDelegate {
public:
FakeJobDelegate(ReportCompletionCallback report_completion_callback,
CancelCallback cancel_callback)
: report_completion_callback_(std::move(report_completion_callback)),
cancel_callback_(std::move(cancel_callback)) {}
private:
Status Complete() override {
return std::move(report_completion_callback_).Run();
}
Status Cancel(Status status) override {
return std::move(cancel_callback_).Run(status);
}
ReportCompletionCallback report_completion_callback_;
CancelCallback cancel_callback_;
};
static SmartPtr<FakeJob> Create(
std::unique_ptr<FakeJobDelegate> fake_job_delegate) {
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()});
return std::unique_ptr<FakeJob, base::OnTaskRunnerDeleter>(
new FakeJob(std::move(fake_job_delegate), sequenced_task_runner),
base::OnTaskRunnerDeleter(sequenced_task_runner));
}
void SetFinishStatus(Status status) { finish_status_ = status; }
protected:
void StartImpl() override {
DCHECK(base::SequencedTaskRunnerHandle::IsSet());
sequenced_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&FakeJob::Finish, weak_ptr_factory_.GetWeakPtr(),
finish_status_));
}
private:
FakeJob(std::unique_ptr<FakeJobDelegate> fake_job_delegate,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: Job(std::move(fake_job_delegate), sequenced_task_runner) {}
Status finish_status_{Status::StatusOK()};
base::WeakPtrFactory<FakeJob> weak_ptr_factory_{this};
};
class JobTest : public ::testing::Test {
public:
JobTest() = default;
void SetUp() override {
report_completion_callback_ = base::BindRepeating(
[](std::atomic<size_t>* completion_counter,
TestCallbackWaiter* complete_waiter) {
*completion_counter += 1u;
complete_waiter->Signal();
return Status::StatusOK();
},
&completion_counter_, &complete_waiter_);
cancel_callback_ = base::BindRepeating(
[](std::atomic<size_t>* cancel_counter,
TestCallbackWaiter* complete_waiter, Status status) {
EXPECT_TRUE(!status.ok());
*cancel_counter += 1u;
complete_waiter->Signal();
return Status::StatusOK();
},
&cancel_counter_, &complete_waiter_);
}
protected:
base::test::TaskEnvironment task_environment_;
std::atomic<size_t> completion_counter_{0};
std::atomic<size_t> cancel_counter_{0};
TestCallbackWaiter complete_waiter_;
base::RepeatingCallback<Status()> report_completion_callback_;
base::RepeatingCallback<Status(Status)> cancel_callback_;
};
TEST_F(JobTest, WillStartOnceWithOKStatusAndReportCompletion) {
auto delegate = std::make_unique<FakeJob::FakeJobDelegate>(
report_completion_callback_, cancel_callback_);
auto job = FakeJob::Create(std::move(delegate));
TestCallbackWaiter waiter;
job->Start(base::BindOnce(
[](TestCallbackWaiter* waiter, Status status) {
EXPECT_TRUE(status.ok());
waiter->Signal();
},
&waiter));
complete_waiter_.Wait();
waiter.Wait();
// The job should have finished successfully.
EXPECT_EQ(completion_counter_, 1u);
EXPECT_EQ(cancel_counter_, 0u);
EXPECT_EQ(job->GetJobState(), Scheduler::Job::JobState::COMPLETED);
// Now that the job has completed successfully, it shouldn't be startable, or
// cancellable.
TestCallbackWaiter waiter2;
job->Start(base::BindOnce(
[](TestCallbackWaiter* waiter, Status status) {
EXPECT_TRUE(!status.ok());
waiter->Signal();
},
&waiter2));
waiter2.Wait();
// Nothing should have changed from before.
EXPECT_EQ(completion_counter_, 1u);
EXPECT_EQ(cancel_counter_, 0u);
EXPECT_EQ(job->GetJobState(), Scheduler::Job::JobState::COMPLETED);
EXPECT_FALSE(job->Cancel(Status(error::INTERNAL, "Failing for tests")).ok());
// Nothing should have changed from before.
EXPECT_EQ(completion_counter_, 1u);
EXPECT_EQ(cancel_counter_, 0u);
EXPECT_EQ(job->GetJobState(), Scheduler::Job::JobState::COMPLETED);
}
TEST_F(JobTest, CancelsWhenJobFails) {
auto job = FakeJob::Create(std::make_unique<FakeJob::FakeJobDelegate>(
report_completion_callback_, cancel_callback_));
job->SetFinishStatus(Status(error::INTERNAL, "Failing for tests"));
TestCallbackWaiter waiter;
job->Start(base::BindOnce(
[](TestCallbackWaiter* waiter, Status status) {
EXPECT_TRUE(status.ok());
waiter->Signal();
},
&waiter));
complete_waiter_.Wait();
waiter.Wait();
// The job should have finished successfully.
EXPECT_EQ(completion_counter_, 0u);
EXPECT_EQ(cancel_counter_, 1u);
EXPECT_EQ(job->GetJobState(), Scheduler::Job::JobState::CANCELLED);
}
TEST_F(JobTest, WillNotStartWithNonOKStatusAndCancels) {
auto job = FakeJob::Create(std::make_unique<FakeJob::FakeJobDelegate>(
report_completion_callback_, cancel_callback_));
EXPECT_TRUE(job->Cancel(Status(error::INTERNAL, "Failing For Tests")).ok());
TestCallbackWaiter waiter;
job->Start(base::BindOnce(
[](TestCallbackWaiter* waiter, Status status) {
EXPECT_TRUE(!status.ok());
waiter->Signal();
},
&waiter));
waiter.Wait();
}
class TestCallbackWaiterWithCounter : public TestCallbackWaiter {
public:
explicit TestCallbackWaiterWithCounter(size_t counter_limit)
: counter_limit_(counter_limit) {
DCHECK_GT(counter_limit_, 0u);
}
void Signal() override {
size_t old_limit = counter_limit_.fetch_sub(1u);
DCHECK_GT(old_limit, 0u);
if (old_limit == 1u) {
run_loop_->Quit();
}
}
private:
std::atomic<size_t> counter_limit_;
};
class TestSchedulerObserver : public Scheduler::SchedulerObserver {
public:
void Notify(Notification notification) override {
switch (notification) {
case (Notification::ACCEPTED_JOB):
accepted_jobs_ += 1u;
break;
case (Notification::REJECTED_JOB):
rejected_jobs_ += 1u;
break;
case (Notification::BLOCKED_JOB):
blocked_jobs_ += 1u;
break;
case (Notification::STARTED_JOB):
started_jobs_ += 1u;
break;
case (Notification::SUCCESSFUL_COMPLETION):
successful_jobs_ += 1u;
break;
case (Notification::UNSUCCESSFUL_COMPLETION):
unsuccessful_jobs_ += 1u;
break;
case (Notification::MEMORY_PRESSURE_CANCELLATION):
memory_pressure_cancelled_jobs_ += 1u;
break;
}
}
std::atomic<size_t> accepted_jobs_{0u};
std::atomic<size_t> rejected_jobs_{0u};
std::atomic<size_t> blocked_jobs_{0u};
std::atomic<size_t> started_jobs_{0u};
std::atomic<size_t> successful_jobs_{0u};
std::atomic<size_t> unsuccessful_jobs_{0u};
std::atomic<size_t> memory_pressure_cancelled_jobs_{0u};
};
class SchedulerTest : public ::testing::Test {
public:
SchedulerTest() = default;
void SetUp() override { scheduler_.AddObserver(&scheduler_observer_); }
void TearDown() override {
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
}
protected:
base::test::TaskEnvironment task_environment_{};
Scheduler scheduler_;
TestSchedulerObserver scheduler_observer_;
};
TEST_F(SchedulerTest, SchedulesAndRunsJobs) {
// Many tests rely on "half" of jobs failing. For this reason kNumJobs should
// be even.
const size_t kNumJobs = 10u;
TestCallbackWaiterWithCounter complete_waiter{kNumJobs};
std::atomic<size_t> completion_counter = 0;
base::RepeatingCallback<Status()> report_completion_callback =
base::BindRepeating(
[](std::atomic<size_t>* counter,
TestCallbackWaiterWithCounter* waiter) {
*counter += 1;
waiter->Signal();
return Status::StatusOK();
},
&completion_counter, &complete_waiter);
std::atomic<size_t> cancel_counter = 0;
base::RepeatingCallback<Status(Status)> cancel_callback = base::BindRepeating(
[](std::atomic<size_t>* counter, TestCallbackWaiterWithCounter* waiter,
Status status) {
*counter += 1;
waiter->Signal();
return Status(error::INTERNAL, "Failing for tests");
},
&cancel_counter, &complete_waiter);
for (size_t i = 0; i < kNumJobs; i++) {
auto sequenced_task_runner = base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()});
sequenced_task_runner->PostTask(
FROM_HERE,
base::BindOnce(
[](size_t i, Scheduler* scheduler,
base::RepeatingCallback<Status()> report_completion_callback,
base::RepeatingCallback<Status(Status)> cancel_callback) {
auto job =
FakeJob::Create(std::make_unique<FakeJob::FakeJobDelegate>(
report_completion_callback, cancel_callback));
if (i % 2u == 0) {
job->SetFinishStatus(
Status(error::INTERNAL, "Failing for tests"));
}
scheduler->EnqueueJob(std::move(job));
},
i, base::Unretained(&scheduler_), report_completion_callback,
cancel_callback));
}
complete_waiter.Wait();
task_environment_.RunUntilIdle();
ASSERT_EQ(scheduler_observer_.accepted_jobs_, kNumJobs);
// We should have at least kNumJobs * 2 blocks.
EXPECT_GE(scheduler_observer_.blocked_jobs_, kNumJobs * 2);
// We should have at least kNumJobs started.
EXPECT_EQ(scheduler_observer_.started_jobs_, kNumJobs);
// Half the jobs should complete successfully.
EXPECT_EQ(scheduler_observer_.successful_jobs_, kNumJobs / 2u);
// Half the jobs should complete unsuccessfully.
EXPECT_EQ(scheduler_observer_.unsuccessful_jobs_, kNumJobs / 2u);
// TODO(1174889) Once memory pressure is enabled, update tests to cause memory
// pressure issues and ensure jobs are cancelled. At that time we can also
// test rejected jobs.
EXPECT_EQ(scheduler_observer_.rejected_jobs_, 0u);
// Half the jobs should have been cancelled, while the other half should have
// completed successfully.
EXPECT_EQ(completion_counter, kNumJobs / 2u);
EXPECT_EQ(cancel_counter, kNumJobs / 2u);
}
// TODO(b/193577465): Add test for Scheduler been destructed before all jobs
// have been run. This might require changes in Scheduler itself.
} // namespace
} // namespace reporting