sirenia: Add EventLoop implementation.
BUG=b:168933287
TEST=cargo test
Change-Id: I38c254f1fc3970a3f2eef9612b20f465684d8c50
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2451360
Commit-Queue: Allen Webb <allenwebb@google.com>
Tested-by: Allen Webb <allenwebb@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
diff --git a/sirenia/src/lib.rs b/sirenia/src/lib.rs
index 9a79b4c..c1c9082 100644
--- a/sirenia/src/lib.rs
+++ b/sirenia/src/lib.rs
@@ -8,6 +8,7 @@
pub mod build_info; // This is generated by build.rs.
pub mod cli;
pub mod communication;
+pub mod linux;
pub mod sandbox;
pub mod to_sys_util;
pub mod transport;
diff --git a/sirenia/src/linux/events.rs b/sirenia/src/linux/events.rs
new file mode 100644
index 0000000..db57daa
--- /dev/null
+++ b/sirenia/src/linux/events.rs
@@ -0,0 +1,293 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// This is a rework of crosvm/devices/src/utils/event_loop.rs for single threaded use.
+// Notable changes:
+// * FailHandles were removed
+// * The Weak references to callbacks were upgraded to ownership. This enables functionality
+// like socket servers where the callback struct is owned by the event_loop and is dropped when
+// the fd is removed from the event loop.
+// * EventLoop::start(...) was split into EventMultiplexer::new() and
+// EventMultiplexer::run_once(). The initialization was put in EventMultiplexer::new(), and the
+// thread and loop were removed replaced with a single wait call in
+// EventMultiplexer::run_once().
+// * To make this work with a single thread without mutexes, Mutators were introduced as the
+// return type for on_event(). The mutator enables actions like removing a fd from the
+// EventMultiplexer on a recoverable error, or adding a new EventSource when a Listener accepts
+// a new stream.
+
+use std::boxed::Box;
+use std::collections::BTreeMap;
+use std::fmt::{self, Display};
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::result::Result as StdResult;
+
+use sys_util::{error, warn, Error as SysError, PollContext, PollToken, WatchingEvents};
+
+#[derive(Debug)]
+pub enum Error {
+ CreatePollContext(SysError),
+ PollContextAddFd(SysError),
+ PollContextDeleteFd(SysError),
+ PollContextWait(SysError),
+ OnEvent(String),
+ OnMutate(String),
+}
+
+impl Display for Error {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ use self::Error::*;
+
+ match self {
+ CreatePollContext(e) => write!(f, "failed to create poll context: {}", e),
+ PollContextAddFd(e) => write!(f, "failed to add fd to poll context: {}", e),
+ PollContextDeleteFd(e) => write!(f, "failed to delete fd from poll context: {}", e),
+ PollContextWait(e) => {
+ write!(f, "failed to wait for events using the poll context: {}", e)
+ }
+ OnEvent(s) => write!(f, "event failed: {}", s),
+ OnMutate(s) => write!(f, "mutate failed: {}", s),
+ }
+ }
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd.
+/// It does not own the fd, thus won't close the fd when dropped.
+struct Fd(pub RawFd);
+impl AsRawFd for Fd {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0
+ }
+}
+
+impl PollToken for Fd {
+ fn as_raw_token(&self) -> u64 {
+ self.0 as u64
+ }
+
+ fn from_raw_token(data: u64) -> Self {
+ Fd(data as RawFd)
+ }
+}
+
+pub struct EventMultiplexer {
+ poll_ctx: PollContext<Fd>,
+ handlers: BTreeMap<RawFd, Box<dyn EventSource>>,
+}
+
+pub trait Mutator {
+ fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> std::result::Result<(), String>;
+}
+
+/// Interface for event handler.
+pub trait EventSource: AsRawFd {
+ /// Callback to be executed when the event loop encounters an event for this handler.
+ fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String>;
+}
+
+/// Additional abstraction on top of PollContext to make it possible to multiplex listeners,
+/// streams, (anything with AsRawFd) on a single thread.
+impl EventMultiplexer {
+ /// Initialize the EventMultiplexer.
+ pub fn new() -> Result<EventMultiplexer> {
+ let handlers: BTreeMap<RawFd, Box<dyn EventSource>> = BTreeMap::new();
+ let poll_ctx: PollContext<Fd> = PollContext::new().map_err(Error::CreatePollContext)?;
+
+ Ok(EventMultiplexer { poll_ctx, handlers })
+ }
+
+ /// Wait until there are events to process. Then, process them. If an error is returned, there
+ /// may still events to process.
+ pub fn run_once(&mut self) -> Result<()> {
+ let mut to_remove: Vec<RawFd> = Vec::new();
+ let mut to_read: Vec<RawFd> = Vec::new();
+ for event in self.poll_ctx.wait().map_err(Error::PollContextWait)?.iter() {
+ if event.hungup() {
+ &mut to_remove
+ } else {
+ &mut to_read
+ }
+ .push(event.token().as_raw_fd());
+ }
+
+ for fd in to_read {
+ let mutator: Option<Box<dyn Mutator>> = match self.handlers.get_mut(&fd) {
+ Some(cb) => cb.on_event().map_err(Error::OnEvent)?,
+ None => {
+ warn!("callback for fd {} already removed", fd);
+ continue;
+ }
+ };
+
+ if let Some(mut m) = mutator {
+ m.mutate(self).map_err(Error::OnMutate)?;
+ }
+ }
+
+ for fd in to_remove {
+ self.remove_event_for_fd(&Fd(fd))
+ .map_err(|err| {
+ error!("failed to remove event fd: {:?}", err);
+ })
+ .ok();
+ }
+
+ Ok(())
+ }
+
+ /// Add a new event to multiplexer. The handler will be invoked when `event` happens on `fd`.
+ pub fn add_event(&mut self, handler: Box<dyn EventSource>) -> Result<()> {
+ let fd = handler.as_raw_fd();
+ self.handlers.insert(fd, handler);
+ // This might fail due to epoll syscall. Check epoll_ctl(2).
+ self.poll_ctx
+ .add_fd_with_events(&Fd(fd), WatchingEvents::empty().set_read(), Fd(fd))
+ .map_err(Error::PollContextAddFd)
+ }
+
+ /// Stops listening for events for this `fd`. This function returns an error if it fails, or the
+ /// removed EventSource if it succeeds.
+ ///
+ /// EventMultiplexer does not guarantee all events for `fd` is handled.
+ pub fn remove_event_for_fd(&mut self, fd: &dyn AsRawFd) -> Result<Box<dyn EventSource>> {
+ // This might fail due to epoll syscall. Check epoll_ctl(2).
+ self.poll_ctx
+ .delete(fd)
+ .map_err(Error::PollContextDeleteFd)?;
+ Ok(self.handlers.remove(&fd.as_raw_fd()).unwrap())
+ }
+
+ /// Returns true if there are no event sources registered.
+ pub fn is_empty(&self) -> bool {
+ self.handlers.is_empty()
+ }
+}
+
+/// Adds the specified EventSource from the EventMultiplexer when the mutator is executed.
+pub struct AddEventSourceMutator(pub Option<Box<dyn EventSource>>);
+
+impl Mutator for AddEventSourceMutator {
+ fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
+ match std::mem::replace(&mut self.0, None) {
+ Some(b) => event_loop
+ .add_event(b)
+ .map_err(|e| format!("failed to add fd: {:?}", e)),
+ None => Err("AddHandlerMutator::mutate called for empty fd".to_string()),
+ }
+ }
+}
+
+/// Removes the specified RawFd from the EventMultiplexer when the mutator is executed.
+pub struct RemoveFdMutator(pub RawFd);
+
+impl Mutator for RemoveFdMutator {
+ fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
+ match event_loop.remove_event_for_fd(self) {
+ Ok(_) => Ok(()),
+ Err(e) => Err(format!("failed to remove fd: {:?}", e)),
+ }
+ }
+}
+
+impl AsRawFd for RemoveFdMutator {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::fs::File;
+ use std::rc::Rc;
+ use std::sync::Mutex;
+
+ use std::io::{Read, Write};
+ use std::ops::{AddAssign, Deref};
+ use sys_util::{pipe, EventFd};
+
+ struct EventMultiplexerTestHandler {
+ val: Rc<Mutex<u8>>,
+ evt: File,
+ }
+
+ impl AsRawFd for EventMultiplexerTestHandler {
+ fn as_raw_fd(&self) -> i32 {
+ self.evt.as_raw_fd()
+ }
+ }
+
+ impl EventSource for EventMultiplexerTestHandler {
+ fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
+ let mut buf: [u8; 1] = [0; 1];
+ self.evt.read_exact(&mut buf).unwrap();
+ self.val.lock().unwrap().add_assign(1);
+ Ok(None)
+ }
+ }
+
+ #[test]
+ fn event_multiplexer_test() {
+ let mut l = EventMultiplexer::new().unwrap();
+ let (r, mut w) = pipe(false /*close_on_exec*/).unwrap();
+ let counter: Rc<Mutex<u8>> = Rc::new(Mutex::new(0));
+ let h = EventMultiplexerTestHandler {
+ val: Rc::clone(&counter),
+ evt: r,
+ };
+ l.add_event(Box::new(h)).unwrap();
+
+ // Check write.
+ let buf: [u8; 1] = [1; 1];
+ w.write_all(&buf).unwrap();
+ l.run_once().unwrap();
+ assert_eq!(*counter.lock().unwrap().deref(), 1);
+
+ // Check hangup.
+ drop(w);
+ l.run_once().unwrap();
+ assert!(l.handlers.is_empty());
+ }
+
+ struct MutatorTestHandler(EventFd);
+
+ impl AsRawFd for MutatorTestHandler {
+ fn as_raw_fd(&self) -> i32 {
+ self.0.as_raw_fd()
+ }
+ }
+
+ impl EventSource for MutatorTestHandler {
+ fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
+ Ok(None)
+ }
+ }
+
+ #[test]
+ fn add_event_source_mutator_test() {
+ let mut l = EventMultiplexer::new().unwrap();
+ let h = MutatorTestHandler(EventFd::new().unwrap());
+
+ assert!(l.handlers.is_empty());
+ AddEventSourceMutator(Some(Box::new(h)))
+ .mutate(&mut l)
+ .unwrap();
+ assert!(!l.handlers.is_empty());
+ }
+
+ #[test]
+ fn remove_fd_mutator_test() {
+ let mut l = EventMultiplexer::new().unwrap();
+ let h = MutatorTestHandler(EventFd::new().unwrap());
+ let mut m = RemoveFdMutator(h.as_raw_fd());
+ l.add_event(Box::new(h)).unwrap();
+
+ assert!(!l.handlers.is_empty());
+ m.mutate(&mut l).unwrap();
+ assert!(l.handlers.is_empty());
+ }
+}
diff --git a/sirenia/src/linux/mod.rs b/sirenia/src/linux/mod.rs
new file mode 100644
index 0000000..d797629
--- /dev/null
+++ b/sirenia/src/linux/mod.rs
@@ -0,0 +1,7 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! A module for Linux specific functionality like epoll and syslog handling.
+
+pub mod events;