// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
#include "missive/scheduler/scheduler.h"

#include <memory>
#include <utility>

#include <base/bind.h>
#include <base/check.h>
#include <base/sequence_checker.h>
#include <base/sequenced_task_runner.h>
#include <base/task/thread_pool.h>
#include <base/threading/sequenced_task_runner_handle.h>

#include "missive/util/shared_queue.h"
#include "missive/util/status.h"
#include "missive/util/statusor.h"
#include "missive/util/task_runner_context.h"

namespace reporting {
namespace {

enum TaskLimit {
  NORMAL = 5,
  REDUCED = 2,
  OFF = 0,
};

}  // namespace

using CompleteJobResponse = Status;
using CompleteJobCallback = base::OnceCallback<void(CompleteJobResponse)>;
using Notification = Scheduler::SchedulerObserver::Notification;

Scheduler::Job::Job(std::unique_ptr<JobDelegate> job_response_delegate)
    : job_response_delegate_(std::move(job_response_delegate)) {}

void Scheduler::Job::Start(base::OnceCallback<void(Status)> complete_cb) {
  DCHECK(complete_cb);
  // std::atomic<T>::compare_exchange_strong expects an lvalue for the expected
  // state.
  JobState expected_job_state = JobState::NOT_RUNNING;
  if (!job_state_.compare_exchange_strong(expected_job_state,
                                          /*desired=*/JobState::RUNNING)) {
    std::move(complete_cb)
        .Run(Status(
            error::UNAVAILABLE,
            "Job can only be started when it is in the NOT_RUNNING state."));
    return;
  }

  complete_cb_ = std::move(complete_cb);
  StartImpl();
}

Status Scheduler::Job::Cancel(Status status) {
  if (status.ok()) {
    return Status(error::INVALID_ARGUMENT,
                  "Job cannot be cancelled with an OK Status");
  }

  // std::atomic<T>::compare_exchange_strong expects an lvalue for the expected
  // state.
  JobState expected_job_state = JobState::NOT_RUNNING;
  if (!job_state_.compare_exchange_strong(expected_job_state,
                                          JobState::CANCELLED)) {
    return Status(error::UNAVAILABLE,
                  "Job cannot be cancelled after it has started.");
  }

  return job_response_delegate_->Cancel(status);
}

Scheduler::Job::JobState Scheduler::Job::GetJobState() const {
  return job_state_;
}

void Scheduler::Job::Finish(Status status) {
  if (!status.ok()) {
    job_state_ = JobState::CANCELLED;
    std::move(complete_cb_).Run(job_response_delegate_->Cancel(status));
    return;
  }
  job_state_ = JobState::COMPLETED;
  std::move(complete_cb_).Run(job_response_delegate_->Complete());
}

class Scheduler::JobContext : public TaskRunnerContext<CompleteJobResponse> {
 public:
  JobContext(std::unique_ptr<Job> job,
             std::unique_ptr<JobBlocker> job_blocker,
             CompleteJobCallback job_completion_callback,
             scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
      : TaskRunnerContext<CompleteJobResponse>(
            std::move(job_completion_callback), sequenced_task_runner),
        job_(std::move(job)),
        job_blocker_(std::move(job_blocker)) {}

 private:
  ~JobContext() override { DCHECK(job_); }

  void OnStart() override {
    if (job_ == nullptr) {
      Status(error::INVALID_ARGUMENT, "Provided Job was null");
      return;
    }
    if (job_blocker_ == nullptr) {
      Complete(
          Status(error::INTERNAL, "Unable to process request at this time."));
      return;
    }
    job_->Start(base::BindOnce(&JobContext::Complete, base::Unretained(this)));
  }

  void Complete(Status status) {
    Schedule(&Scheduler::JobContext::Response, base::Unretained(this), status);
  }

  std::unique_ptr<Scheduler::Job> job_;

  // Will be held until Response() is called, the semaphore will be released
  // once it is destroyed.
  std::unique_ptr<Scheduler::JobBlocker> job_blocker_;
};

class Scheduler::JobBlocker {
 public:
  JobBlocker(scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner,
             base::OnceCallback<void()> release_cb)
      : sequenced_task_runner_(sequenced_task_runner),
        release_cb_(std::move(release_cb)) {}

  ~JobBlocker() {
    sequenced_task_runner_->PostTask(
        FROM_HERE, base::BindOnce(
                       [](base::OnceCallback<void()> release_cb) {
                         std::move(release_cb).Run();
                       },
                       std::move(release_cb_)));
  }

  JobBlocker(const JobBlocker&) = delete;
  JobBlocker& operator=(const JobBlocker&) = delete;

 private:
  scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
  base::OnceCallback<void()> release_cb_;
};

class Scheduler::JobSemaphore {
 public:
  JobSemaphore(scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner,
               TaskLimit task_limit)
      : sequenced_task_runner_(sequenced_task_runner), task_limit_(task_limit) {
    DETACH_FROM_SEQUENCE(sequence_checker_);
  }

  ~JobSemaphore() {
    if (running_jobs_ > 0) {
      LOG(ERROR) << "JobSemaphore destructing with active jobs.";
    }
  }

  JobSemaphore(const JobSemaphore&) = delete;
  JobSemaphore& operator=(const JobSemaphore&) = delete;

  StatusOr<std::unique_ptr<JobBlocker>> AcquireJobBlocker() {
    DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    if (running_jobs_ >= task_limit_) {
      return Status(error::RESOURCE_EXHAUSTED, "Currently at job limit");
    }
    running_jobs_++;

    return std::make_unique<JobBlocker>(
        sequenced_task_runner_,
        base::BindOnce(&JobSemaphore::Release, weak_ptr_factory_.GetWeakPtr()));
  }

  void UpdateTaskLimit(TaskLimit task_limit) {
    DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    task_limit_ = task_limit;
  }

  bool IsAcceptingJobs() const { return task_limit_ != TaskLimit::OFF; }

 private:
  void Release() {
    DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    running_jobs_--;
  }

  SEQUENCE_CHECKER(sequence_checker_);

  scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;

  std::atomic<TaskLimit> task_limit_{TaskLimit::OFF};
  uint32_t running_jobs_{0};

  base::WeakPtrFactory<JobSemaphore> weak_ptr_factory_{this};
};

Scheduler::Scheduler()
    : sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
      job_semaphore_(std::make_unique<JobSemaphore>(sequenced_task_runner_,
                                                    TaskLimit::NORMAL)),
      jobs_queue_(SharedQueue<std::unique_ptr<Job>>::Create()) {}

Scheduler::~Scheduler() = default;

void Scheduler::AddObserver(SchedulerObserver* observer) {
  observers_.push_back(observer);
}

void Scheduler::NotifyObservers(Notification notification) {
  for (auto* observer : observers_) {
    observer->Notify(notification);
  }
}

void Scheduler::EnqueueJob(std::unique_ptr<Job> job) {
  if (!job_semaphore_->IsAcceptingJobs()) {
    NotifyObservers(Notification::REJECTED_JOB);
    Status cancel_status =
        job->Cancel(Status(error::RESOURCE_EXHAUSTED,
                           "Unable to process due to low system memory"));
    if (!cancel_status.ok()) {
      LOG(ERROR) << "Was unable to successfully cancel a job: "
                 << cancel_status;
    }
    return;
  }
  jobs_queue_->Push(std::move(job), base::BindOnce(&Scheduler::OnJobEnqueued,
                                                   base::Unretained(this)));
}

void Scheduler::OnJobEnqueued() {
  NotifyObservers(Notification::ACCEPTED_JOB);
  sequenced_task_runner_->PostTask(
      FROM_HERE, base::BindOnce(&Scheduler::StartJobs, base::Unretained(this)));
}

void Scheduler::StartJobs() {
  DCHECK(base::SequencedTaskRunnerHandle::IsSet());

  // Get JobBlockers and assign them to jobs until job_semaphore_ returns a
  // non-OK status.
  StatusOr<std::unique_ptr<JobBlocker>> blocker_result =
      job_semaphore_->AcquireJobBlocker();
  while (blocker_result.ok()) {
    jobs_queue_->Pop(base::BindOnce(&Scheduler::OnJobPop,
                                    base::Unretained(this),
                                    std::move(blocker_result.ValueOrDie())));

    blocker_result = job_semaphore_->AcquireJobBlocker();
  }
  NotifyObservers(Notification::BLOCKED_JOB);
}

void Scheduler::OnJobPop(std::unique_ptr<JobBlocker> job_blocker,
                         StatusOr<std::unique_ptr<Job>> job_result) {
  // job_result may be empty, if so just drop the request, releasing the
  // blocker.
  if (!job_result.ok()) {
    return;
  }

  auto completion_cb = base::BindOnce(
      [](scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner,
         base::OnceCallback<void()> start_jobs_cb,
         base::OnceCallback<void(Notification)> notify_observers,
         Status job_result) {
        sequenced_task_runner->PostTask(FROM_HERE, std::move(start_jobs_cb));

        if (!job_result.ok()) {
          std::move(notify_observers)
              .Run(Notification::UNSUCCESSFUL_COMPLETION);
          LOG(ERROR) << job_result;
          return;
        }
        std::move(notify_observers).Run(Notification::SUCCESSFUL_COMPLETION);
      },
      sequenced_task_runner_,
      base::BindOnce(&Scheduler::StartJobs, weak_ptr_factory_.GetWeakPtr()),
      base::BindOnce(&Scheduler::NotifyObservers,
                     weak_ptr_factory_.GetWeakPtr()));

  Start<JobContext>(std::move(job_result).ValueOrDie(), std::move(job_blocker),
                    std::move(completion_cb), sequenced_task_runner_);
  NotifyObservers(Notification::STARTED_JOB);
}

void Scheduler::ClearQueue() {
  jobs_queue_->Swap(
      base::queue<std::unique_ptr<Job>>(),
      base::BindOnce(&Scheduler::OnJobQueueSwap, base::Unretained(this)));
}

void Scheduler::OnJobQueueSwap(
    base::queue<std::unique_ptr<Job>> job_queue) const {
  while (!job_queue.empty()) {
    auto& job = job_queue.front();
    Status cancel_status =
        job->Cancel(Status(error::RESOURCE_EXHAUSTED,
                           "Unable to process due to low system memory"));
    if (!cancel_status.ok()) {
      LOG(ERROR) << "Was unable to successfully cancel a job: "
                 << cancel_status;
    }
    job_queue.pop();
  }
}

#ifdef MEMORY_PRESSURE_LEVEL_ENABLED
// TODO(1174889) Currently unused, once resourced implements
// MemoryPressureLevels update. Also initialize JobSemaphorePool at
// TaskLimit::OFF instead of NORMAL, so that it is off until we know the
// memory pressure level.
void Scheduler::UpdateMemoryPressureLevel(
    base::MemoryPressureLevel memory_pressure_level) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  switch (memory_pressure_level) {
    case base::MemoryPressureLevel::MEMORY_PRESSURE_LEVEL_NONE:
      job_semaphore_->UpdateTaskLimit(TaskLimit::NORMAL);
      StartJobs();
      return;
    case base::MemoryPressureLevel::MEMORY_PRESSURE_LEVEL_MODERATE:
      job_semaphore_->UpdateTaskLimit(TaskLimit::REDUCED);
      StartJobs();
      return;
    case base::MemoryPressureLevel::MEMORY_PRESSURE_LEVEL_CRITICAL:
      job_semaphore_->UpdateTaskLimit(TaskLimit::OFF);
      ClearQueue();
      return;
  }
}
#endif

}  // namespace reporting
