blob: db57daab255845ae245f1135ca1a673016fe55e1 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This is a rework of crosvm/devices/src/utils/event_loop.rs for single threaded use.
// Notable changes:
// * FailHandles were removed
// * The Weak references to callbacks were upgraded to ownership. This enables functionality
// like socket servers where the callback struct is owned by the event_loop and is dropped when
// the fd is removed from the event loop.
// * EventLoop::start(...) was split into EventMultiplexer::new() and
// EventMultiplexer::run_once(). The initialization was put in EventMultiplexer::new(), and the
// thread and loop were removed replaced with a single wait call in
// EventMultiplexer::run_once().
// * To make this work with a single thread without mutexes, Mutators were introduced as the
// return type for on_event(). The mutator enables actions like removing a fd from the
// EventMultiplexer on a recoverable error, or adding a new EventSource when a Listener accepts
// a new stream.
use std::boxed::Box;
use std::collections::BTreeMap;
use std::fmt::{self, Display};
use std::os::unix::io::{AsRawFd, RawFd};
use std::result::Result as StdResult;
use sys_util::{error, warn, Error as SysError, PollContext, PollToken, WatchingEvents};
#[derive(Debug)]
pub enum Error {
CreatePollContext(SysError),
PollContextAddFd(SysError),
PollContextDeleteFd(SysError),
PollContextWait(SysError),
OnEvent(String),
OnMutate(String),
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::Error::*;
match self {
CreatePollContext(e) => write!(f, "failed to create poll context: {}", e),
PollContextAddFd(e) => write!(f, "failed to add fd to poll context: {}", e),
PollContextDeleteFd(e) => write!(f, "failed to delete fd from poll context: {}", e),
PollContextWait(e) => {
write!(f, "failed to wait for events using the poll context: {}", e)
}
OnEvent(s) => write!(f, "event failed: {}", s),
OnMutate(s) => write!(f, "mutate failed: {}", s),
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd.
/// It does not own the fd, thus won't close the fd when dropped.
struct Fd(pub RawFd);
impl AsRawFd for Fd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
impl PollToken for Fd {
fn as_raw_token(&self) -> u64 {
self.0 as u64
}
fn from_raw_token(data: u64) -> Self {
Fd(data as RawFd)
}
}
pub struct EventMultiplexer {
poll_ctx: PollContext<Fd>,
handlers: BTreeMap<RawFd, Box<dyn EventSource>>,
}
pub trait Mutator {
fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> std::result::Result<(), String>;
}
/// Interface for event handler.
pub trait EventSource: AsRawFd {
/// Callback to be executed when the event loop encounters an event for this handler.
fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String>;
}
/// Additional abstraction on top of PollContext to make it possible to multiplex listeners,
/// streams, (anything with AsRawFd) on a single thread.
impl EventMultiplexer {
/// Initialize the EventMultiplexer.
pub fn new() -> Result<EventMultiplexer> {
let handlers: BTreeMap<RawFd, Box<dyn EventSource>> = BTreeMap::new();
let poll_ctx: PollContext<Fd> = PollContext::new().map_err(Error::CreatePollContext)?;
Ok(EventMultiplexer { poll_ctx, handlers })
}
/// Wait until there are events to process. Then, process them. If an error is returned, there
/// may still events to process.
pub fn run_once(&mut self) -> Result<()> {
let mut to_remove: Vec<RawFd> = Vec::new();
let mut to_read: Vec<RawFd> = Vec::new();
for event in self.poll_ctx.wait().map_err(Error::PollContextWait)?.iter() {
if event.hungup() {
&mut to_remove
} else {
&mut to_read
}
.push(event.token().as_raw_fd());
}
for fd in to_read {
let mutator: Option<Box<dyn Mutator>> = match self.handlers.get_mut(&fd) {
Some(cb) => cb.on_event().map_err(Error::OnEvent)?,
None => {
warn!("callback for fd {} already removed", fd);
continue;
}
};
if let Some(mut m) = mutator {
m.mutate(self).map_err(Error::OnMutate)?;
}
}
for fd in to_remove {
self.remove_event_for_fd(&Fd(fd))
.map_err(|err| {
error!("failed to remove event fd: {:?}", err);
})
.ok();
}
Ok(())
}
/// Add a new event to multiplexer. The handler will be invoked when `event` happens on `fd`.
pub fn add_event(&mut self, handler: Box<dyn EventSource>) -> Result<()> {
let fd = handler.as_raw_fd();
self.handlers.insert(fd, handler);
// This might fail due to epoll syscall. Check epoll_ctl(2).
self.poll_ctx
.add_fd_with_events(&Fd(fd), WatchingEvents::empty().set_read(), Fd(fd))
.map_err(Error::PollContextAddFd)
}
/// Stops listening for events for this `fd`. This function returns an error if it fails, or the
/// removed EventSource if it succeeds.
///
/// EventMultiplexer does not guarantee all events for `fd` is handled.
pub fn remove_event_for_fd(&mut self, fd: &dyn AsRawFd) -> Result<Box<dyn EventSource>> {
// This might fail due to epoll syscall. Check epoll_ctl(2).
self.poll_ctx
.delete(fd)
.map_err(Error::PollContextDeleteFd)?;
Ok(self.handlers.remove(&fd.as_raw_fd()).unwrap())
}
/// Returns true if there are no event sources registered.
pub fn is_empty(&self) -> bool {
self.handlers.is_empty()
}
}
/// Adds the specified EventSource from the EventMultiplexer when the mutator is executed.
pub struct AddEventSourceMutator(pub Option<Box<dyn EventSource>>);
impl Mutator for AddEventSourceMutator {
fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
match std::mem::replace(&mut self.0, None) {
Some(b) => event_loop
.add_event(b)
.map_err(|e| format!("failed to add fd: {:?}", e)),
None => Err("AddHandlerMutator::mutate called for empty fd".to_string()),
}
}
}
/// Removes the specified RawFd from the EventMultiplexer when the mutator is executed.
pub struct RemoveFdMutator(pub RawFd);
impl Mutator for RemoveFdMutator {
fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
match event_loop.remove_event_for_fd(self) {
Ok(_) => Ok(()),
Err(e) => Err(format!("failed to remove fd: {:?}", e)),
}
}
}
impl AsRawFd for RemoveFdMutator {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::rc::Rc;
use std::sync::Mutex;
use std::io::{Read, Write};
use std::ops::{AddAssign, Deref};
use sys_util::{pipe, EventFd};
struct EventMultiplexerTestHandler {
val: Rc<Mutex<u8>>,
evt: File,
}
impl AsRawFd for EventMultiplexerTestHandler {
fn as_raw_fd(&self) -> i32 {
self.evt.as_raw_fd()
}
}
impl EventSource for EventMultiplexerTestHandler {
fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
let mut buf: [u8; 1] = [0; 1];
self.evt.read_exact(&mut buf).unwrap();
self.val.lock().unwrap().add_assign(1);
Ok(None)
}
}
#[test]
fn event_multiplexer_test() {
let mut l = EventMultiplexer::new().unwrap();
let (r, mut w) = pipe(false /*close_on_exec*/).unwrap();
let counter: Rc<Mutex<u8>> = Rc::new(Mutex::new(0));
let h = EventMultiplexerTestHandler {
val: Rc::clone(&counter),
evt: r,
};
l.add_event(Box::new(h)).unwrap();
// Check write.
let buf: [u8; 1] = [1; 1];
w.write_all(&buf).unwrap();
l.run_once().unwrap();
assert_eq!(*counter.lock().unwrap().deref(), 1);
// Check hangup.
drop(w);
l.run_once().unwrap();
assert!(l.handlers.is_empty());
}
struct MutatorTestHandler(EventFd);
impl AsRawFd for MutatorTestHandler {
fn as_raw_fd(&self) -> i32 {
self.0.as_raw_fd()
}
}
impl EventSource for MutatorTestHandler {
fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
Ok(None)
}
}
#[test]
fn add_event_source_mutator_test() {
let mut l = EventMultiplexer::new().unwrap();
let h = MutatorTestHandler(EventFd::new().unwrap());
assert!(l.handlers.is_empty());
AddEventSourceMutator(Some(Box::new(h)))
.mutate(&mut l)
.unwrap();
assert!(!l.handlers.is_empty());
}
#[test]
fn remove_fd_mutator_test() {
let mut l = EventMultiplexer::new().unwrap();
let h = MutatorTestHandler(EventFd::new().unwrap());
let mut m = RemoveFdMutator(h.as_raw_fd());
l.add_event(Box::new(h)).unwrap();
assert!(!l.handlers.is_empty());
m.mutate(&mut l).unwrap();
assert!(l.handlers.is_empty());
}
}