blob: ab3663b7e4b6106363221b3cc72694c25b789961 [file] [log] [blame]
Add support for Unix Sockets
* add support for Unix Sockets to Stream/RefinedTcpStream.
* add a 'pub use' for ClientConnection and Stream
* convert ClientConnection::new to take Into<Stream> instead of two
RefinedTcpStreams.
* add a default SocketAddr value for if the stream returns an error.
--- a/src/client.rs
+++ b/src/client.rs
@@ -9,7 +9,7 @@ use std::str::FromStr;
use common::{HTTPVersion, Method};
use util::{SequentialReader, SequentialReaderBuilder, SequentialWriterBuilder};
-use util::RefinedTcpStream;
+use util::{RefinedTcpStream, Stream};
use Request;
@@ -17,7 +17,7 @@ use Request;
/// and return Request objects.
pub struct ClientConnection {
// address of the client
- remote_addr: IoResult<SocketAddr>,
+ remote_addr: SocketAddr,
// sequence of Readers to the stream, so that the data is not read in
// the wrong order
@@ -50,10 +50,11 @@ enum ReadError {
impl ClientConnection {
/// Creates a new ClientConnection that takes ownership of the TcpStream.
- pub fn new(write_socket: RefinedTcpStream, mut read_socket: RefinedTcpStream)
- -> ClientConnection
+ pub fn new<S>(stream: S) -> ClientConnection
+ where S: Into<Stream>
{
- let remote_addr = read_socket.peer_addr();
+ let (mut read_socket, write_socket) = RefinedTcpStream::new(stream);
+ let remote_addr = read_socket.peer_addr().unwrap_or(SocketAddr::from(([0,0,0,0], 0)));
let secure = read_socket.secure();
let mut source = SequentialReaderBuilder::new(BufReader::with_capacity(1024, read_socket));
@@ -145,7 +146,7 @@ impl ClientConnection {
// building the next reader
let request = try!(::request::new_request(self.secure, method, path, version.clone(),
- headers, self.remote_addr.as_ref().unwrap().clone(), data_source, writer)
+ headers, self.remote_addr, data_source, writer)
.map_err(|e| {
use request;
match e {
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -116,8 +116,9 @@ use std::net::{ToSocketAddrs, TcpStream, Shutdown};
use std::time::Duration;
use std::sync::atomic::Ordering::Relaxed;
-use client::ClientConnection;
+pub use client::ClientConnection;
use util::MessagesQueue;
+pub use util::Stream;
pub use common::{Header, HeaderField, HTTPVersion, Method, StatusCode};
pub use request::{Request, ReadWrite};
@@ -282,28 +283,23 @@ impl Server {
while !inside_close_trigger.load(Relaxed) {
let new_client = match server.accept() {
Ok((sock, _)) => {
- use util::RefinedTcpStream;
- let (read_closable, write_closable) = match ssl {
- None => {
- RefinedTcpStream::new(sock)
- },
+ let stream = match ssl {
+ None => sock,
#[cfg(feature = "ssl")]
Some(ref ssl) => {
let ssl = openssl::ssl::Ssl::new(ssl).expect("Couldn't create ssl");
// trying to apply SSL over the connection
// if an error occurs, we just close the socket and resume listening
- let sock = match ssl.accept(sock) {
+ match ssl.accept(sock) {
Ok(s) => s,
Err(_) => continue
- };
-
- RefinedTcpStream::new(sock)
+ }
},
#[cfg(not(feature = "ssl"))]
Some(_) => unreachable!(),
};
- Ok(ClientConnection::new(write_closable, read_closable))
+ Ok(ClientConnection::new(stream))
},
Err(e) => Err(e),
};
--- a/src/util/mod.rs
+++ b/src/util/mod.rs
@@ -1,7 +1,7 @@
pub use self::custom_stream::CustomStream;
pub use self::equal_reader::EqualReader;
pub use self::messages_queue::MessagesQueue;
-pub use self::refined_tcp_stream::RefinedTcpStream;
+pub use self::refined_tcp_stream::{RefinedTcpStream, Stream};
pub use self::sequential::{SequentialReaderBuilder, SequentialReader};
pub use self::sequential::{SequentialWriterBuilder, SequentialWriter};
pub use self::task_pool::TaskPool;
--- a/src/util/refined_tcp_stream.rs
+++ b/src/util/refined_tcp_stream.rs
@@ -1,6 +1,7 @@
use std::io::{Read, Write};
use std::io::Result as IoResult;
use std::net::{SocketAddr, TcpStream, Shutdown};
+use std::os::unix::net::UnixStream;
#[cfg(feature = "ssl")]
use std::sync::{Arc, Mutex};
@@ -17,6 +18,7 @@ pub enum Stream {
Http(TcpStream),
#[cfg(feature = "ssl")]
Https(Arc<Mutex<SslStream<TcpStream>>>),
+ Unix(UnixStream),
}
impl From<TcpStream> for Stream {
@@ -34,6 +36,13 @@ impl From<SslStream<TcpStream>> for Stream {
}
}
+impl From<UnixStream> for Stream {
+ #[inline]
+ fn from(stream: UnixStream) -> Stream {
+ Stream::Unix(stream)
+ }
+}
+
impl RefinedTcpStream {
pub fn new<S>(stream: S) -> (RefinedTcpStream, RefinedTcpStream)
where S: Into<Stream>
@@ -44,6 +53,7 @@ impl RefinedTcpStream {
Stream::Http(ref stream) => Stream::Http(stream.try_clone().unwrap()),
#[cfg(feature = "ssl")]
Stream::Https(ref stream) => Stream::Https(stream.clone()),
+ Stream::Unix(ref stream) => Stream::Unix(stream.try_clone().unwrap()),
};
let read = RefinedTcpStream {
@@ -68,6 +78,7 @@ impl RefinedTcpStream {
Stream::Http(_) => false,
#[cfg(feature = "ssl")]
Stream::Https(_) => true,
+ Stream::Unix(_) => false,
}
}
@@ -76,6 +87,7 @@ impl RefinedTcpStream {
Stream::Http(ref mut stream) => stream.peer_addr(),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().get_ref().peer_addr(),
+ Stream::Unix(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "Peer addresses are not supported for Unix sockets")),
}
}
}
@@ -88,6 +100,7 @@ impl Drop for RefinedTcpStream {
Stream::Http(ref mut stream) => stream.shutdown(Shutdown::Read).ok(),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().get_mut().shutdown(Shutdown::Read).ok(),
+ Stream::Unix(ref mut stream) => stream.shutdown(Shutdown::Read).ok(),
};
}
@@ -97,6 +110,7 @@ impl Drop for RefinedTcpStream {
Stream::Http(ref mut stream) => stream.shutdown(Shutdown::Write).ok(),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().get_mut().shutdown(Shutdown::Write).ok(),
+ Stream::Unix(ref mut stream) => stream.shutdown(Shutdown::Write).ok(),
};
}
}
@@ -108,6 +122,7 @@ impl Read for RefinedTcpStream {
Stream::Http(ref mut stream) => stream.read(buf),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().read(buf),
+ Stream::Unix(ref mut stream) => stream.read(buf),
}
}
}
@@ -118,6 +133,7 @@ impl Write for RefinedTcpStream {
Stream::Http(ref mut stream) => stream.write(buf),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().write(buf),
+ Stream::Unix(ref mut stream) => stream.write(buf),
}
}
@@ -126,6 +142,7 @@ impl Write for RefinedTcpStream {
Stream::Http(ref mut stream) => stream.flush(),
#[cfg(feature = "ssl")]
Stream::Https(ref mut stream) => stream.lock().unwrap().flush(),
+ Stream::Unix(ref mut stream) => stream.flush(),
}
}
}