| Add support for Unix Sockets |
| |
| * add support for Unix Sockets to Stream/RefinedTcpStream. |
| * add a 'pub use' for ClientConnection and Stream |
| * convert ClientConnection::new to take Into<Stream> instead of two |
| RefinedTcpStreams. |
| * add a default SocketAddr value for if the stream returns an error. |
| |
| --- a/src/client.rs |
| +++ b/src/client.rs |
| @@ -9,7 +9,7 @@ use std::str::FromStr; |
| |
| use common::{HTTPVersion, Method}; |
| use util::{SequentialReader, SequentialReaderBuilder, SequentialWriterBuilder}; |
| -use util::RefinedTcpStream; |
| +use util::{RefinedTcpStream, Stream}; |
| |
| use Request; |
| |
| @@ -17,7 +17,7 @@ use Request; |
| /// and return Request objects. |
| pub struct ClientConnection { |
| // address of the client |
| - remote_addr: IoResult<SocketAddr>, |
| + remote_addr: SocketAddr, |
| |
| // sequence of Readers to the stream, so that the data is not read in |
| // the wrong order |
| @@ -50,10 +50,11 @@ enum ReadError { |
| |
| impl ClientConnection { |
| /// Creates a new ClientConnection that takes ownership of the TcpStream. |
| - pub fn new(write_socket: RefinedTcpStream, mut read_socket: RefinedTcpStream) |
| - -> ClientConnection |
| + pub fn new<S>(stream: S) -> ClientConnection |
| + where S: Into<Stream> |
| { |
| - let remote_addr = read_socket.peer_addr(); |
| + let (mut read_socket, write_socket) = RefinedTcpStream::new(stream); |
| + let remote_addr = read_socket.peer_addr().unwrap_or(SocketAddr::from(([0,0,0,0], 0))); |
| let secure = read_socket.secure(); |
| |
| let mut source = SequentialReaderBuilder::new(BufReader::with_capacity(1024, read_socket)); |
| @@ -145,7 +146,7 @@ impl ClientConnection { |
| |
| // building the next reader |
| let request = try!(::request::new_request(self.secure, method, path, version.clone(), |
| - headers, self.remote_addr.as_ref().unwrap().clone(), data_source, writer) |
| + headers, self.remote_addr, data_source, writer) |
| .map_err(|e| { |
| use request; |
| match e { |
| --- a/src/lib.rs |
| +++ b/src/lib.rs |
| @@ -116,8 +116,9 @@ use std::net::{ToSocketAddrs, TcpStream, Shutdown}; |
| use std::time::Duration; |
| use std::sync::atomic::Ordering::Relaxed; |
| |
| -use client::ClientConnection; |
| +pub use client::ClientConnection; |
| use util::MessagesQueue; |
| +pub use util::Stream; |
| |
| pub use common::{Header, HeaderField, HTTPVersion, Method, StatusCode}; |
| pub use request::{Request, ReadWrite}; |
| @@ -282,28 +283,23 @@ impl Server { |
| while !inside_close_trigger.load(Relaxed) { |
| let new_client = match server.accept() { |
| Ok((sock, _)) => { |
| - use util::RefinedTcpStream; |
| - let (read_closable, write_closable) = match ssl { |
| - None => { |
| - RefinedTcpStream::new(sock) |
| - }, |
| + let stream = match ssl { |
| + None => sock, |
| #[cfg(feature = "ssl")] |
| Some(ref ssl) => { |
| let ssl = openssl::ssl::Ssl::new(ssl).expect("Couldn't create ssl"); |
| // trying to apply SSL over the connection |
| // if an error occurs, we just close the socket and resume listening |
| - let sock = match ssl.accept(sock) { |
| + match ssl.accept(sock) { |
| Ok(s) => s, |
| Err(_) => continue |
| - }; |
| - |
| - RefinedTcpStream::new(sock) |
| + } |
| }, |
| #[cfg(not(feature = "ssl"))] |
| Some(_) => unreachable!(), |
| }; |
| |
| - Ok(ClientConnection::new(write_closable, read_closable)) |
| + Ok(ClientConnection::new(stream)) |
| }, |
| Err(e) => Err(e), |
| }; |
| --- a/src/util/mod.rs |
| +++ b/src/util/mod.rs |
| @@ -1,7 +1,7 @@ |
| pub use self::custom_stream::CustomStream; |
| pub use self::equal_reader::EqualReader; |
| pub use self::messages_queue::MessagesQueue; |
| -pub use self::refined_tcp_stream::RefinedTcpStream; |
| +pub use self::refined_tcp_stream::{RefinedTcpStream, Stream}; |
| pub use self::sequential::{SequentialReaderBuilder, SequentialReader}; |
| pub use self::sequential::{SequentialWriterBuilder, SequentialWriter}; |
| pub use self::task_pool::TaskPool; |
| --- a/src/util/refined_tcp_stream.rs |
| +++ b/src/util/refined_tcp_stream.rs |
| @@ -1,6 +1,7 @@ |
| use std::io::{Read, Write}; |
| use std::io::Result as IoResult; |
| use std::net::{SocketAddr, TcpStream, Shutdown}; |
| +use std::os::unix::net::UnixStream; |
| |
| #[cfg(feature = "ssl")] |
| use std::sync::{Arc, Mutex}; |
| @@ -17,6 +18,7 @@ pub enum Stream { |
| Http(TcpStream), |
| #[cfg(feature = "ssl")] |
| Https(Arc<Mutex<SslStream<TcpStream>>>), |
| + Unix(UnixStream), |
| } |
| |
| impl From<TcpStream> for Stream { |
| @@ -34,6 +36,13 @@ impl From<SslStream<TcpStream>> for Stream { |
| } |
| } |
| |
| +impl From<UnixStream> for Stream { |
| + #[inline] |
| + fn from(stream: UnixStream) -> Stream { |
| + Stream::Unix(stream) |
| + } |
| +} |
| + |
| impl RefinedTcpStream { |
| pub fn new<S>(stream: S) -> (RefinedTcpStream, RefinedTcpStream) |
| where S: Into<Stream> |
| @@ -44,6 +53,7 @@ impl RefinedTcpStream { |
| Stream::Http(ref stream) => Stream::Http(stream.try_clone().unwrap()), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref stream) => Stream::Https(stream.clone()), |
| + Stream::Unix(ref stream) => Stream::Unix(stream.try_clone().unwrap()), |
| }; |
| |
| let read = RefinedTcpStream { |
| @@ -68,6 +78,7 @@ impl RefinedTcpStream { |
| Stream::Http(_) => false, |
| #[cfg(feature = "ssl")] |
| Stream::Https(_) => true, |
| + Stream::Unix(_) => false, |
| } |
| } |
| |
| @@ -76,6 +87,7 @@ impl RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.peer_addr(), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().get_ref().peer_addr(), |
| + Stream::Unix(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "Peer addresses are not supported for Unix sockets")), |
| } |
| } |
| } |
| @@ -88,6 +100,7 @@ impl Drop for RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.shutdown(Shutdown::Read).ok(), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().get_mut().shutdown(Shutdown::Read).ok(), |
| + Stream::Unix(ref mut stream) => stream.shutdown(Shutdown::Read).ok(), |
| }; |
| } |
| |
| @@ -97,6 +110,7 @@ impl Drop for RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.shutdown(Shutdown::Write).ok(), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().get_mut().shutdown(Shutdown::Write).ok(), |
| + Stream::Unix(ref mut stream) => stream.shutdown(Shutdown::Write).ok(), |
| }; |
| } |
| } |
| @@ -108,6 +122,7 @@ impl Read for RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.read(buf), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().read(buf), |
| + Stream::Unix(ref mut stream) => stream.read(buf), |
| } |
| } |
| } |
| @@ -118,6 +133,7 @@ impl Write for RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.write(buf), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().write(buf), |
| + Stream::Unix(ref mut stream) => stream.write(buf), |
| } |
| } |
| |
| @@ -126,6 +142,7 @@ impl Write for RefinedTcpStream { |
| Stream::Http(ref mut stream) => stream.flush(), |
| #[cfg(feature = "ssl")] |
| Stream::Https(ref mut stream) => stream.lock().unwrap().flush(), |
| + Stream::Unix(ref mut stream) => stream.flush(), |
| } |
| } |
| } |