// Copyright 2015 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <brillo/streams/stream.h>

#include <algorithm>

#include <base/bind.h>
#include <brillo/message_loops/message_loop.h>
#include <brillo/pointer_utils.h>
#include <brillo/streams/stream_errors.h>
#include <brillo/streams/stream_utils.h>

namespace brillo {

bool Stream::TruncateBlocking(ErrorPtr* error) {
  return SetSizeBlocking(GetPosition(), error);
}

bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
  if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
    return false;
  return Seek(position, Whence::FROM_BEGIN, nullptr, error);
}

bool Stream::ReadAsync(void* buffer,
                       size_t size_to_read,
                       const base::Callback<void(size_t)>& success_callback,
                       const ErrorCallback& error_callback,
                       ErrorPtr* error) {
  if (is_async_read_pending_) {
    Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
                 errors::stream::kOperationNotSupported,
                 "Another asynchronous operation is still pending");
    return false;
  }

  auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
  // If we can read some data right away non-blocking we should still run the
  // callback from the main loop, so we pass true here for force_async_callback.
  return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
                       true);
}

bool Stream::ReadAllAsync(void* buffer,
                          size_t size_to_read,
                          const base::Closure& success_callback,
                          const ErrorCallback& error_callback,
                          ErrorPtr* error) {
  if (is_async_read_pending_) {
    Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
                 errors::stream::kOperationNotSupported,
                 "Another asynchronous operation is still pending");
    return false;
  }

  auto callback =
      base::Bind(&Stream::ReadAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(),
                 buffer, size_to_read, success_callback, error_callback);
  return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
                       true);
}

bool Stream::ReadBlocking(void* buffer,
                          size_t size_to_read,
                          size_t* size_read,
                          ErrorPtr* error) {
  for (;;) {
    bool eos = false;
    if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
      return false;

    if (*size_read > 0 || eos)
      break;

    if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
                             error)) {
      return false;
    }
  }
  return true;
}

bool Stream::ReadAllBlocking(void* buffer,
                             size_t size_to_read,
                             ErrorPtr* error) {
  while (size_to_read > 0) {
    size_t size_read = 0;
    if (!ReadBlocking(buffer, size_to_read, &size_read, error))
      return false;

    if (size_read == 0)
      return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);

    size_to_read -= size_read;
    buffer = AdvancePointer(buffer, size_read);
  }
  return true;
}

bool Stream::WriteAsync(const void* buffer,
                        size_t size_to_write,
                        const base::Callback<void(size_t)>& success_callback,
                        const ErrorCallback& error_callback,
                        ErrorPtr* error) {
  if (is_async_write_pending_) {
    Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
                 errors::stream::kOperationNotSupported,
                 "Another asynchronous operation is still pending");
    return false;
  }
  // If we can read some data right away non-blocking we should still run the
  // callback from the main loop, so we pass true here for force_async_callback.
  return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
                        error, true);
}

bool Stream::WriteAllAsync(const void* buffer,
                           size_t size_to_write,
                           const base::Closure& success_callback,
                           const ErrorCallback& error_callback,
                           ErrorPtr* error) {
  if (is_async_write_pending_) {
    Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
                 errors::stream::kOperationNotSupported,
                 "Another asynchronous operation is still pending");
    return false;
  }

  auto callback =
      base::Bind(&Stream::WriteAllAsyncCallback, weak_ptr_factory_.GetWeakPtr(),
                 buffer, size_to_write, success_callback, error_callback);
  return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
                        true);
}

bool Stream::WriteBlocking(const void* buffer,
                           size_t size_to_write,
                           size_t* size_written,
                           ErrorPtr* error) {
  for (;;) {
    if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
      return false;

    if (*size_written > 0 || size_to_write == 0)
      break;

    if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
                             error)) {
      return false;
    }
  }
  return true;
}

bool Stream::WriteAllBlocking(const void* buffer,
                              size_t size_to_write,
                              ErrorPtr* error) {
  while (size_to_write > 0) {
    size_t size_written = 0;
    if (!WriteBlocking(buffer, size_to_write, &size_written, error))
      return false;

    if (size_written == 0) {
      Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
                   errors::stream::kPartialData,
                   "Failed to write all the data");
      return false;
    }
    size_to_write -= size_written;
    buffer = AdvancePointer(buffer, size_written);
  }
  return true;
}

bool Stream::FlushAsync(const base::Closure& success_callback,
                        const ErrorCallback& error_callback,
                        ErrorPtr* /* error */) {
  auto callback =
      base::Bind(&Stream::FlushAsyncCallback, weak_ptr_factory_.GetWeakPtr(),
                 success_callback, error_callback);
  MessageLoop::current()->PostTask(FROM_HERE, callback);
  return true;
}

void Stream::IgnoreEOSCallback(
    const base::Callback<void(size_t)>& success_callback,
    size_t bytes,
    bool /* eos */) {
  success_callback.Run(bytes);
}

bool Stream::ReadAsyncImpl(
    void* buffer,
    size_t size_to_read,
    const base::Callback<void(size_t, bool)>& success_callback,
    const ErrorCallback& error_callback,
    ErrorPtr* error,
    bool force_async_callback) {
  CHECK(!is_async_read_pending_);
  // We set this value to true early in the function so calling others will
  // prevent us from calling WaitForData() to make calls to
  // ReadAsync() fail while we run WaitForData().
  is_async_read_pending_ = true;

  size_t read = 0;
  bool eos = false;
  if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
    return false;

  if (read > 0 || eos) {
    if (force_async_callback) {
      MessageLoop::current()->PostTask(
          FROM_HERE, base::BindOnce(&Stream::OnReadAsyncDone,
                                    weak_ptr_factory_.GetWeakPtr(),
                                    success_callback, read, eos));
    } else {
      is_async_read_pending_ = false;
      success_callback.Run(read, eos);
    }
    return true;
  }

  is_async_read_pending_ = WaitForData(
      AccessMode::READ,
      base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
                 buffer, size_to_read, success_callback, error_callback),
      error);
  return is_async_read_pending_;
}

void Stream::OnReadAsyncDone(
    const base::Callback<void(size_t, bool)>& success_callback,
    size_t bytes_read,
    bool eos) {
  is_async_read_pending_ = false;
  success_callback.Run(bytes_read, eos);
}

void Stream::OnReadAvailable(
    void* buffer,
    size_t size_to_read,
    const base::Callback<void(size_t, bool)>& success_callback,
    const ErrorCallback& error_callback,
    AccessMode mode) {
  CHECK(stream_utils::IsReadAccessMode(mode));
  CHECK(is_async_read_pending_);
  is_async_read_pending_ = false;
  ErrorPtr error;
  // Just reschedule the read operation but don't need to run the callback from
  // the main loop since we are already running on a callback.
  if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
                     &error, false)) {
    error_callback.Run(error.get());
  }
}

bool Stream::WriteAsyncImpl(
    const void* buffer,
    size_t size_to_write,
    const base::Callback<void(size_t)>& success_callback,
    const ErrorCallback& error_callback,
    ErrorPtr* error,
    bool force_async_callback) {
  CHECK(!is_async_write_pending_);
  // We set this value to true early in the function so calling others will
  // prevent us from calling WaitForData() to make calls to
  // ReadAsync() fail while we run WaitForData().
  is_async_write_pending_ = true;

  size_t written = 0;
  if (!WriteNonBlocking(buffer, size_to_write, &written, error))
    return false;

  if (written > 0) {
    if (force_async_callback) {
      MessageLoop::current()->PostTask(
          FROM_HERE, base::BindOnce(&Stream::OnWriteAsyncDone,
                                    weak_ptr_factory_.GetWeakPtr(),
                                    success_callback, written));
    } else {
      is_async_write_pending_ = false;
      success_callback.Run(written);
    }
    return true;
  }
  is_async_write_pending_ = WaitForData(
      AccessMode::WRITE,
      base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
                 buffer, size_to_write, success_callback, error_callback),
      error);
  return is_async_write_pending_;
}

void Stream::OnWriteAsyncDone(
    const base::Callback<void(size_t)>& success_callback, size_t size_written) {
  is_async_write_pending_ = false;
  success_callback.Run(size_written);
}

void Stream::OnWriteAvailable(
    const void* buffer,
    size_t size,
    const base::Callback<void(size_t)>& success_callback,
    const ErrorCallback& error_callback,
    AccessMode mode) {
  CHECK(stream_utils::IsWriteAccessMode(mode));
  CHECK(is_async_write_pending_);
  is_async_write_pending_ = false;
  ErrorPtr error;
  // Just reschedule the read operation but don't need to run the callback from
  // the main loop since we are already running on a callback.
  if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
                      false)) {
    error_callback.Run(error.get());
  }
}

void Stream::ReadAllAsyncCallback(void* buffer,
                                  size_t size_to_read,
                                  const base::Closure& success_callback,
                                  const ErrorCallback& error_callback,
                                  size_t size_read,
                                  bool eos) {
  ErrorPtr error;
  size_to_read -= size_read;
  if (size_to_read != 0 && eos) {
    stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
    error_callback.Run(error.get());
    return;
  }

  if (size_to_read) {
    buffer = AdvancePointer(buffer, size_read);
    auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
                               weak_ptr_factory_.GetWeakPtr(), buffer,
                               size_to_read, success_callback, error_callback);
    if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
                       false)) {
      error_callback.Run(error.get());
    }
  } else {
    success_callback.Run();
  }
}

void Stream::WriteAllAsyncCallback(const void* buffer,
                                   size_t size_to_write,
                                   const base::Closure& success_callback,
                                   const ErrorCallback& error_callback,
                                   size_t size_written) {
  ErrorPtr error;
  if (size_to_write != 0 && size_written == 0) {
    Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
                 errors::stream::kPartialData, "Failed to write all the data");
    error_callback.Run(error.get());
    return;
  }
  size_to_write -= size_written;
  if (size_to_write) {
    buffer = AdvancePointer(buffer, size_written);
    auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
                               weak_ptr_factory_.GetWeakPtr(), buffer,
                               size_to_write, success_callback, error_callback);
    if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
                        false)) {
      error_callback.Run(error.get());
    }
  } else {
    success_callback.Run();
  }
}

void Stream::FlushAsyncCallback(const base::Closure& success_callback,
                                const ErrorCallback& error_callback) {
  ErrorPtr error;
  if (FlushBlocking(&error)) {
    success_callback.Run();
  } else {
    error_callback.Run(error.get());
  }
}

void Stream::CancelPendingAsyncOperations() {
  weak_ptr_factory_.InvalidateWeakPtrs();
  is_async_read_pending_ = false;
  is_async_write_pending_ = false;
}

}  // namespace brillo
