blob: ef16de848ad0f940f99f39d5ec973c0dc1b2778a [file] [log] [blame] [edit]
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef FACED_UTIL_QUEUEING_STREAM_H_
#define FACED_UTIL_QUEUEING_STREAM_H_
#include <cstddef>
#include <deque>
#include <memory>
#include <utility>
#include <base/functional/bind.h>
#include <base/memory/scoped_refptr.h>
#include "faced/util/stream.h"
#include "faced/util/task.h"
namespace faced {
// A Stream that allows synchronous, non-blocking writes, queueing elements if
// required.
template <typename T>
class QueueingStream {
public:
// Create a QueueingStream, with an internal queue of at most
// `max_queue_size` elements.
//
// A `max_queue_size` of zero is supported; if specified, written
// frames will be dropped unless there is a pending read of the
// frame when a `Write` takes place.
explicit QueueingStream(size_t max_queue_size);
// Close and destroy the stream.
//
// This is safe to call even if a reader is still present on the
// stream.
~QueueingStream();
// Return a StreamReader that reads from this queue.
//
// May only be called once.
typename std::unique_ptr<StreamReader<T>> GetReader();
// Write the given item to the Stream.
//
// Returns true if the write was successful, or false if the stream
// is closed (and hence `item` was discarded).
bool Write(T item);
// Close the Stream.
//
// Any items already on the queue will continue to be available by
// the reader. However, once the queue is empty, all future reads
// will immediately be cancelled.
void Close();
private:
// State shared by the reader and writer.
struct State : public base::RefCounted<State> {
// Maximum number of elements in `queue`.
size_t max_queue_size GUARDED_BY_CONTEXT(sequence_checker) = 0;
// If true, the stream is closed, and no new writes will be accepted.
bool closed GUARDED_BY_CONTEXT(sequence_checker) = false;
// The queue and any pending callback.
//
// Invariant: `pending_callback` is non-null only if `queue` is empty.
// That is, we should never have both items in the queue and a pending
// callback.
typename StreamReader<T>::ReadCallback pending_callback
GUARDED_BY_CONTEXT(sequence_checker);
std::deque<T> queue GUARDED_BY_CONTEXT(sequence_checker);
SEQUENCE_CHECKER(sequence_checker);
};
// `StreamReader` implementation for this Stream object.
class Reader final : public StreamReader<T> {
public:
explicit Reader(scoped_refptr<State> state);
~Reader();
// `StreamReader<T>` implementation.
void Read(typename StreamReader<T>::ReadCallback callback) override;
void Close() override;
private:
scoped_refptr<State> state_;
};
bool reader_created_ = false;
scoped_refptr<State> state_;
};
//
// Implementation details follow.
//
template <typename T>
QueueingStream<T>::QueueingStream(size_t max_queue_size) {
state_ = base::MakeRefCounted<State>();
state_->max_queue_size = max_queue_size;
DETACH_FROM_SEQUENCE(state_->sequence_checker);
}
template <typename T>
QueueingStream<T>::~QueueingStream() {
Close();
}
template <typename T>
bool QueueingStream<T>::Write(T item) {
DCHECK_CALLED_ON_VALID_SEQUENCE(state_->sequence_checker);
// If the reader has been closed, simply drop the item.
if (state_->closed) {
return false;
}
// If there is a pending callback, directly dispatch the item to it.
if (!state_->pending_callback.is_null()) {
// Invariant: `pending_callback` is only non-null if the queue is empty.
DCHECK(state_->queue.empty());
PostToCurrentSequence(base::BindOnce(
std::move(state_->pending_callback),
StreamValue<T>{.value = std::move(item), .expedite = false}));
return true;
}
// If we are not queueing items and the reader wasn't ready, simply drop the
// item.
if (state_->max_queue_size == 0) {
return true;
}
// Ensure there is sufficient space in the queue.
while (state_->queue.size() >= state_->max_queue_size) {
state_->queue.pop_front();
}
// Enqueue the item.
state_->queue.push_back(std::move(item));
return true;
}
template <typename T>
void QueueingStream<T>::Close() {
DCHECK_CALLED_ON_VALID_SEQUENCE(state_->sequence_checker);
// Mark the stream as closed.
state_->closed = true;
// No new elements will ever be added, so abort any pending reads.
if (!state_->pending_callback.is_null()) {
PostToCurrentSequence(
base::BindOnce(std::move(state_->pending_callback), StreamValue<T>{}));
}
}
template <typename T>
typename std::unique_ptr<StreamReader<T>> QueueingStream<T>::GetReader() {
DCHECK_CALLED_ON_VALID_SEQUENCE(state_->sequence_checker);
CHECK(!reader_created_) << "GetReader() incorrectly called more than once "
"on a single QueueingStream.";
reader_created_ = true;
return std::make_unique<QueueingStream::Reader>(state_);
}
template <typename T>
QueueingStream<T>::Reader::Reader(scoped_refptr<State> state) : state_(state) {}
template <typename T>
QueueingStream<T>::Reader::~Reader() {
Close();
}
template <typename T>
void QueueingStream<T>::Reader::Read(
typename StreamReader<T>::ReadCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(state_->sequence_checker);
// Ensure there is no existing pending read.
CHECK(state_->pending_callback.is_null())
<< "Attempted to call Read() while an existing Read() operation was "
"still in progress.";
// If there is already an item on the queue, immediately dispatch it.
if (!state_->queue.empty()) {
StreamValue<T> result;
// Fetch the first item in the queue.
result.value = std::move(state_->queue.front());
state_->queue.pop_front();
// Warn the reader they need to process items faster if there are still
// items in the queue.
result.expedite = (state_->queue.size() >= 1);
PostToCurrentSequence(
base::BindOnce(std::move(callback), std::move(result)));
return;
}
// If the stream is closed, nothing new is ever going to be added, so
// immediately call the callback.
if (state_->queue.empty() && state_->closed) {
PostToCurrentSequence(
base::BindOnce(std::move(callback), StreamValue<T>{}));
return;
}
// Otherwise, wait for the next item to arrive.
state_->pending_callback = std::move(callback);
}
template <typename T>
void QueueingStream<T>::Reader::Close() {
DCHECK_CALLED_ON_VALID_SEQUENCE(state_->sequence_checker);
// Close and clear the queue.
state_->closed = true;
state_->queue.clear();
// Abort any pending reads.
if (!state_->pending_callback.is_null()) {
PostToCurrentSequence(
base::BindOnce(std::move(state_->pending_callback), StreamValue<T>{}));
}
}
} // namespace faced
#endif // FACED_UTIL_QUEUEING_STREAM_H_