blob: f078c4de062694b6b37e34aa889980a223c4948c [file] [log] [blame] [edit]
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{ensure, Result};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::message::Message;
/// The writer interface for the intake queue.
///
/// This is supposed to be the sole input method for the queue, except for
/// modifying the queue directly.
/// The structure hides away any non-supported mechanisms of writing to the
/// queue that may otherwise be exposed by the underlying mechanism.
#[derive(Clone, Debug)]
pub struct Writer(Sender<Message>);
impl Writer {
pub async fn send(&self, message: Message) -> core::result::Result<(), SendError<Message>> {
self.0.send(message).await
}
}
/// A FIFO queue for all messages received by `reaper`.
///
/// The queue ensures that all messages regardless of which interface received
/// them are processed the same way.
///
/// # Example
/// ```
/// let mut in_queue = IntakeQueue::new(10);
///
/// let some_interface = SomeInterface::new(in_queue.clone_writer());
/// thread::spawn(move || some_interface.run());
///
/// while let Some(message) = in_queue.pop() {
/// println!("New message from {}", message.application_name)'
/// }
/// ```
#[derive(Debug)]
pub struct IntakeQueue {
reader: Receiver<Message>,
writer: Writer,
}
impl IntakeQueue {
/// Create a new queue with the specified `size`.
///
/// The queue will cache up to `size` messages non-blocking. After the
/// capacity is exhausted any further `push` or `send` calls will block
/// until there is at least 1 free capacity. This also means that a `size`
/// of `0` is invalid.
pub fn new(size: usize) -> Result<Self> {
ensure!(size > 0, "Queue size must be larger than 0");
let (writer, reader) = channel(size);
Ok(IntakeQueue {
reader,
writer: Writer(writer),
})
}
/// Get a clone of the writer for this queue.
///
/// The cloned writer still writes to the same queue, calling this function
/// doesn't lead to cloning the remaining data structures or creating a new
/// queue.
/// Cloned writers are useful in contexts where moving is required, e.g.
/// into other async/thread contexts to write into a single queue from
/// multiple places.
pub fn clone_writer(&self) -> Writer {
self.writer.clone()
}
/// Receive the next message from the queue.
///
/// Takes the current first message if it exists or will return one once
/// there is a message.
pub async fn next(&mut self) -> Option<Message> {
self.reader.recv().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::timeout;
use crate::message::tests::new_test_message;
#[test]
fn invalid_queue_size() {
assert_eq!(
format!("{}", IntakeQueue::new(0).unwrap_err()),
"Queue size must be larger than 0"
);
}
#[tokio::test]
async fn read_empty() {
let mut queue = IntakeQueue::new(1).unwrap();
assert_eq!(
format!(
"{}",
timeout(Duration::from_millis(10), queue.next())
.await
.unwrap_err()
),
"deadline has elapsed"
);
}
#[tokio::test]
async fn async_read_one_message() {
let mut queue = IntakeQueue::new(1).unwrap();
queue.clone_writer().send(new_test_message()).await.unwrap();
assert_ne!(queue.next().await, None);
assert_eq!(
format!(
"{}",
timeout(Duration::from_millis(10), queue.next())
.await
.unwrap_err()
),
"deadline has elapsed"
);
}
#[tokio::test]
async fn reach_queue_capacity() {
let mut queue = IntakeQueue::new(1).unwrap();
let writer = queue.clone_writer();
writer.0.try_send(new_test_message()).unwrap();
assert!(writer.0.try_send(new_test_message()).is_err());
assert_ne!(queue.next().await, None);
writer.0.try_send(new_test_message()).unwrap();
}
}