sirenia: Add EventLoop implementation.

BUG=b:168933287
TEST=cargo test

Change-Id: I38c254f1fc3970a3f2eef9612b20f465684d8c50
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2451360
Commit-Queue: Allen Webb <allenwebb@google.com>
Tested-by: Allen Webb <allenwebb@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
diff --git a/sirenia/src/lib.rs b/sirenia/src/lib.rs
index 9a79b4c..c1c9082 100644
--- a/sirenia/src/lib.rs
+++ b/sirenia/src/lib.rs
@@ -8,6 +8,7 @@
 pub mod build_info; // This is generated by build.rs.
 pub mod cli;
 pub mod communication;
+pub mod linux;
 pub mod sandbox;
 pub mod to_sys_util;
 pub mod transport;
diff --git a/sirenia/src/linux/events.rs b/sirenia/src/linux/events.rs
new file mode 100644
index 0000000..db57daa
--- /dev/null
+++ b/sirenia/src/linux/events.rs
@@ -0,0 +1,293 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// This is a rework of crosvm/devices/src/utils/event_loop.rs for single threaded use.
+// Notable changes:
+//   * FailHandles were removed
+//   * The Weak references to callbacks were upgraded to ownership. This enables functionality
+//     like socket servers where the callback struct is owned by the event_loop and is dropped when
+//     the fd is removed from the event loop.
+//   * EventLoop::start(...) was split into EventMultiplexer::new() and
+//     EventMultiplexer::run_once(). The initialization was put in EventMultiplexer::new(), and the
+//     thread and loop were removed replaced with a single wait call in
+//     EventMultiplexer::run_once().
+//   * To make this work with a single thread without mutexes, Mutators were introduced as the
+//     return type for on_event(). The mutator enables actions like removing a fd from the
+//     EventMultiplexer on a recoverable error, or adding a new EventSource when a Listener accepts
+//     a new stream.
+
+use std::boxed::Box;
+use std::collections::BTreeMap;
+use std::fmt::{self, Display};
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::result::Result as StdResult;
+
+use sys_util::{error, warn, Error as SysError, PollContext, PollToken, WatchingEvents};
+
+#[derive(Debug)]
+pub enum Error {
+    CreatePollContext(SysError),
+    PollContextAddFd(SysError),
+    PollContextDeleteFd(SysError),
+    PollContextWait(SysError),
+    OnEvent(String),
+    OnMutate(String),
+}
+
+impl Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            CreatePollContext(e) => write!(f, "failed to create poll context: {}", e),
+            PollContextAddFd(e) => write!(f, "failed to add fd to poll context: {}", e),
+            PollContextDeleteFd(e) => write!(f, "failed to delete fd from poll context: {}", e),
+            PollContextWait(e) => {
+                write!(f, "failed to wait for events using the poll context: {}", e)
+            }
+            OnEvent(s) => write!(f, "event failed: {}", s),
+            OnMutate(s) => write!(f, "mutate failed: {}", s),
+        }
+    }
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd.
+/// It does not own the fd, thus won't close the fd when dropped.
+struct Fd(pub RawFd);
+impl AsRawFd for Fd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+impl PollToken for Fd {
+    fn as_raw_token(&self) -> u64 {
+        self.0 as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        Fd(data as RawFd)
+    }
+}
+
+pub struct EventMultiplexer {
+    poll_ctx: PollContext<Fd>,
+    handlers: BTreeMap<RawFd, Box<dyn EventSource>>,
+}
+
+pub trait Mutator {
+    fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> std::result::Result<(), String>;
+}
+
+/// Interface for event handler.
+pub trait EventSource: AsRawFd {
+    /// Callback to be executed when the event loop encounters an event for this handler.
+    fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String>;
+}
+
+/// Additional abstraction on top of PollContext to make it possible to multiplex listeners,
+/// streams, (anything with AsRawFd) on a single thread.
+impl EventMultiplexer {
+    /// Initialize the EventMultiplexer.
+    pub fn new() -> Result<EventMultiplexer> {
+        let handlers: BTreeMap<RawFd, Box<dyn EventSource>> = BTreeMap::new();
+        let poll_ctx: PollContext<Fd> = PollContext::new().map_err(Error::CreatePollContext)?;
+
+        Ok(EventMultiplexer { poll_ctx, handlers })
+    }
+
+    /// Wait until there are events to process. Then, process them. If an error is returned, there
+    /// may still events to process.
+    pub fn run_once(&mut self) -> Result<()> {
+        let mut to_remove: Vec<RawFd> = Vec::new();
+        let mut to_read: Vec<RawFd> = Vec::new();
+        for event in self.poll_ctx.wait().map_err(Error::PollContextWait)?.iter() {
+            if event.hungup() {
+                &mut to_remove
+            } else {
+                &mut to_read
+            }
+            .push(event.token().as_raw_fd());
+        }
+
+        for fd in to_read {
+            let mutator: Option<Box<dyn Mutator>> = match self.handlers.get_mut(&fd) {
+                Some(cb) => cb.on_event().map_err(Error::OnEvent)?,
+                None => {
+                    warn!("callback for fd {} already removed", fd);
+                    continue;
+                }
+            };
+
+            if let Some(mut m) = mutator {
+                m.mutate(self).map_err(Error::OnMutate)?;
+            }
+        }
+
+        for fd in to_remove {
+            self.remove_event_for_fd(&Fd(fd))
+                .map_err(|err| {
+                    error!("failed to remove event fd: {:?}", err);
+                })
+                .ok();
+        }
+
+        Ok(())
+    }
+
+    /// Add a new event to multiplexer. The handler will be invoked when `event` happens on `fd`.
+    pub fn add_event(&mut self, handler: Box<dyn EventSource>) -> Result<()> {
+        let fd = handler.as_raw_fd();
+        self.handlers.insert(fd, handler);
+        // This might fail due to epoll syscall. Check epoll_ctl(2).
+        self.poll_ctx
+            .add_fd_with_events(&Fd(fd), WatchingEvents::empty().set_read(), Fd(fd))
+            .map_err(Error::PollContextAddFd)
+    }
+
+    /// Stops listening for events for this `fd`. This function returns an error if it fails, or the
+    /// removed EventSource if it succeeds.
+    ///
+    /// EventMultiplexer does not guarantee all events for `fd` is handled.
+    pub fn remove_event_for_fd(&mut self, fd: &dyn AsRawFd) -> Result<Box<dyn EventSource>> {
+        // This might fail due to epoll syscall. Check epoll_ctl(2).
+        self.poll_ctx
+            .delete(fd)
+            .map_err(Error::PollContextDeleteFd)?;
+        Ok(self.handlers.remove(&fd.as_raw_fd()).unwrap())
+    }
+
+    /// Returns true if there are no event sources registered.
+    pub fn is_empty(&self) -> bool {
+        self.handlers.is_empty()
+    }
+}
+
+/// Adds the specified EventSource from the EventMultiplexer when the mutator is executed.
+pub struct AddEventSourceMutator(pub Option<Box<dyn EventSource>>);
+
+impl Mutator for AddEventSourceMutator {
+    fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
+        match std::mem::replace(&mut self.0, None) {
+            Some(b) => event_loop
+                .add_event(b)
+                .map_err(|e| format!("failed to add fd: {:?}", e)),
+            None => Err("AddHandlerMutator::mutate called for empty fd".to_string()),
+        }
+    }
+}
+
+/// Removes the specified RawFd from the EventMultiplexer when the mutator is executed.
+pub struct RemoveFdMutator(pub RawFd);
+
+impl Mutator for RemoveFdMutator {
+    fn mutate(&mut self, event_loop: &mut EventMultiplexer) -> StdResult<(), String> {
+        match event_loop.remove_event_for_fd(self) {
+            Ok(_) => Ok(()),
+            Err(e) => Err(format!("failed to remove fd: {:?}", e)),
+        }
+    }
+}
+
+impl AsRawFd for RemoveFdMutator {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::fs::File;
+    use std::rc::Rc;
+    use std::sync::Mutex;
+
+    use std::io::{Read, Write};
+    use std::ops::{AddAssign, Deref};
+    use sys_util::{pipe, EventFd};
+
+    struct EventMultiplexerTestHandler {
+        val: Rc<Mutex<u8>>,
+        evt: File,
+    }
+
+    impl AsRawFd for EventMultiplexerTestHandler {
+        fn as_raw_fd(&self) -> i32 {
+            self.evt.as_raw_fd()
+        }
+    }
+
+    impl EventSource for EventMultiplexerTestHandler {
+        fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
+            let mut buf: [u8; 1] = [0; 1];
+            self.evt.read_exact(&mut buf).unwrap();
+            self.val.lock().unwrap().add_assign(1);
+            Ok(None)
+        }
+    }
+
+    #[test]
+    fn event_multiplexer_test() {
+        let mut l = EventMultiplexer::new().unwrap();
+        let (r, mut w) = pipe(false /*close_on_exec*/).unwrap();
+        let counter: Rc<Mutex<u8>> = Rc::new(Mutex::new(0));
+        let h = EventMultiplexerTestHandler {
+            val: Rc::clone(&counter),
+            evt: r,
+        };
+        l.add_event(Box::new(h)).unwrap();
+
+        // Check write.
+        let buf: [u8; 1] = [1; 1];
+        w.write_all(&buf).unwrap();
+        l.run_once().unwrap();
+        assert_eq!(*counter.lock().unwrap().deref(), 1);
+
+        // Check hangup.
+        drop(w);
+        l.run_once().unwrap();
+        assert!(l.handlers.is_empty());
+    }
+
+    struct MutatorTestHandler(EventFd);
+
+    impl AsRawFd for MutatorTestHandler {
+        fn as_raw_fd(&self) -> i32 {
+            self.0.as_raw_fd()
+        }
+    }
+
+    impl EventSource for MutatorTestHandler {
+        fn on_event(&mut self) -> std::result::Result<Option<Box<dyn Mutator>>, String> {
+            Ok(None)
+        }
+    }
+
+    #[test]
+    fn add_event_source_mutator_test() {
+        let mut l = EventMultiplexer::new().unwrap();
+        let h = MutatorTestHandler(EventFd::new().unwrap());
+
+        assert!(l.handlers.is_empty());
+        AddEventSourceMutator(Some(Box::new(h)))
+            .mutate(&mut l)
+            .unwrap();
+        assert!(!l.handlers.is_empty());
+    }
+
+    #[test]
+    fn remove_fd_mutator_test() {
+        let mut l = EventMultiplexer::new().unwrap();
+        let h = MutatorTestHandler(EventFd::new().unwrap());
+        let mut m = RemoveFdMutator(h.as_raw_fd());
+        l.add_event(Box::new(h)).unwrap();
+
+        assert!(!l.handlers.is_empty());
+        m.mutate(&mut l).unwrap();
+        assert!(l.handlers.is_empty());
+    }
+}
diff --git a/sirenia/src/linux/mod.rs b/sirenia/src/linux/mod.rs
new file mode 100644
index 0000000..d797629
--- /dev/null
+++ b/sirenia/src/linux/mod.rs
@@ -0,0 +1,7 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! A module for Linux specific functionality like epoll and syslog handling.
+
+pub mod events;