blob: 23bf499861cc2cbe38c8e2e36e0fe9d50dac0630 [file] [log] [blame] [edit]
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "faced/util/queueing_stream.h"
#include <string>
#include <tuple>
#include <utility>
#include <base/test/bind.h>
#include <base/test/task_environment.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "faced/util/blocking_future.h"
#include "faced/util/stream.h"
namespace faced {
namespace {
using testing::Eq;
using testing::Optional;
using testing::Pointee;
class QueuedStreamTest : public ::testing::Test {
protected:
// A fake task environment, required for Stream implementations
// to dispatch tasks.
base::test::TaskEnvironment task_environment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
};
// Read from the given StreamReader object, blocking until an item arrives.
template <typename T>
StreamValue<T> BlockingRead(StreamReader<T>& reader) {
BlockingFuture<StreamValue<T>> future;
reader.Read(future.PromiseCallback());
future.Wait();
return std::move(future.value());
}
TEST_F(QueuedStreamTest, WriteRead) {
QueueingStream<int> stream(/*max_queue_size=*/1);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Write an item.
stream.Write(42);
// Ensure we can read it.
EXPECT_EQ(BlockingRead(*reader).value, 42);
}
TEST_F(QueuedStreamTest, QueueOrder) {
QueueingStream<int> stream(/*max_queue_size=*/10);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Write items.
stream.Write(1);
stream.Write(2);
stream.Write(3);
// Ensure they are read in FIFO order.
EXPECT_EQ(BlockingRead(*reader).value, 1);
EXPECT_EQ(BlockingRead(*reader).value, 2);
EXPECT_EQ(BlockingRead(*reader).value, 3);
}
TEST_F(QueuedStreamTest, FiniteQueueSize) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Write enough items to exceed the queue size.
for (int i = 0; i < 10; i++) {
stream.Write(i);
}
// The queue should drop the earliest items, and return to the reader the last
// 3.
EXPECT_EQ(BlockingRead(*reader).value, 7);
EXPECT_EQ(BlockingRead(*reader).value, 8);
EXPECT_EQ(BlockingRead(*reader).value, 9);
}
TEST_F(QueuedStreamTest, ReaderCloseDiscardsQueue) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Write some items into the queue.
for (int i = 0; i < 10; i++) {
stream.Write(i);
}
// Close the reader.
reader->Close();
// Items in the queue should be dropped.
EXPECT_EQ(BlockingRead(*reader).value, std::nullopt);
}
TEST_F(QueuedStreamTest, WriterClosePreservesQueue) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Write some items into the queue.
stream.Write(1);
stream.Write(2);
stream.Write(3);
// Close the writer.
stream.Close();
// Items in the queue should be preserved in order.
EXPECT_EQ(BlockingRead(*reader).value, 1);
EXPECT_EQ(BlockingRead(*reader).value, 2);
EXPECT_EQ(BlockingRead(*reader).value, 3);
}
TEST_F(QueuedStreamTest, ReadBeforeWrite) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Start a read.
BlockingFuture<StreamValue<int>> future;
reader->Read(future.PromiseCallback());
// Next, write to the stream.
stream.Write(42);
// Ensure the value is received.
EXPECT_EQ(future.Wait().value, 42);
}
TEST_F(QueuedStreamTest, ReadCancelledOnReaderClose) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Start a read.
BlockingFuture<StreamValue<int>> future;
reader->Read(future.PromiseCallback());
// Close the reader.
reader->Close();
// Ensure the callback is called with a nullopt arg.
EXPECT_THAT(future.Wait().value, std::nullopt);
}
TEST_F(QueuedStreamTest, ReadCancelledOnWriterClose) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Start a read.
BlockingFuture<StreamValue<int>> future;
reader->Read(future.PromiseCallback());
// Close the writer.
stream.Close();
// Ensure the callback is called.
EXPECT_THAT(future.Wait().value, std::nullopt);
}
TEST_F(QueuedStreamTest, WriterDeleteDoesNotAffectReader) {
auto stream = std::make_unique<QueueingStream<int>>(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream->GetReader();
// Start a read.
BlockingFuture<StreamValue<int>> future;
reader->Read(future.PromiseCallback());
// Delete the writer.
stream.reset();
// Ensure the pending callback is called.
EXPECT_EQ(future.Wait().value, std::nullopt);
// Additional reads should just return immediately with no value.
EXPECT_EQ(BlockingRead(*reader).value, std::nullopt);
}
TEST_F(QueuedStreamTest, ExpeditedFlagSet) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// If nothing else is on the queue, the expedited flag should be clear.
stream.Write(0);
EXPECT_FALSE(BlockingRead(*reader).expedite);
// If multiple items are on the queue, the flag should be set until
// we reach the last item.
stream.Write(1);
stream.Write(2);
stream.Write(3);
EXPECT_TRUE(BlockingRead(*reader).expedite);
EXPECT_TRUE(BlockingRead(*reader).expedite);
EXPECT_FALSE(BlockingRead(*reader).expedite);
// If the read is pending when a write comes in, the expedited flag
// should be clear.
BlockingFuture<StreamValue<int>> future;
reader->Read(future.PromiseCallback());
stream.Write(4);
EXPECT_FALSE(future.Wait().expedite);
}
TEST_F(QueuedStreamTest, NewReadInReadcallback) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Start a read. On completion, start a new read from within the callback.
BlockingFuture<StreamValue<int>> future;
reader->Read(base::BindLambdaForTesting(
[&future, &reader](StreamValue<int> result) mutable {
// Ensure we got the first item.
EXPECT_EQ(result.value, 1);
// Start another read.
reader->Read(future.PromiseCallback());
}));
// Enqueue some items.
stream.Write(1);
stream.Write(2);
// Expect a second read to arrive.
EXPECT_EQ(future.Wait().value, 2);
}
TEST_F(QueuedStreamTest, DeleteReaderInReadCallback) {
QueueingStream<int> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<int>> reader = stream.GetReader();
// Start a read. On completion, delete the reader from within the callback.
BlockingFuture<void> future;
reader->Read(base::BindLambdaForTesting(
[&future, &reader](StreamValue<int> result) mutable {
// Ensure we got the correct value.
EXPECT_EQ(result.value, 1);
// Delete the reader.
reader.reset();
future.PromiseCallback().Run();
}));
// Write an item, and wait for it to be processed.
stream.Write(1);
future.Wait();
// Ensure the reader was deleted.
EXPECT_EQ(reader.get(), nullptr);
}
TEST_F(QueuedStreamTest, EnqueueMoveOnlyItem) {
QueueingStream<std::unique_ptr<int>> stream(/*max_queue_size=*/3);
std::unique_ptr<StreamReader<std::unique_ptr<int>>> reader =
stream.GetReader();
// Enqueue a move-only item, and make sure we can read it again.
stream.Write(std::make_unique<int>(1));
EXPECT_THAT(BlockingRead(*reader).value, Optional(Pointee(1)));
}
} // namespace
} // namespace faced