From 58f0547e09090ea103279a4e04dbb6afeb3be55f Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 24 Oct 2018 01:32:55 +0200 Subject: [PATCH] Refactor eager_{read,write,accept}_tcp into separate functions --- src/resources.rs | 150 ++++++++++++++++++++++++++--------------------- 1 file changed, 84 insertions(+), 66 deletions(-) diff --git a/src/resources.rs b/src/resources.rs index b6449c37eb..41f1359c30 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -206,10 +206,37 @@ where Either::A(tokio_io::io::read(resource, buf)).into() } +#[cfg(not(windows))] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; + +#[cfg(not(windows))] +fn eager_read_tcp>( + tcp_stream: &TcpStream, + resource: Resource, + mut buf: T, +) -> EagerRead { + // Unforunately we can't just call read() on tokio::net::TcpStream + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let read_result = std_tcp_stream.read(buf.as_mut()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match read_result { + Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))), + Err(err) => { + if err.kind() == std::io::ErrorKind::WouldBlock { + Either::A(tokio_io::io::read(resource, buf)) + } else { + Either::B(futures::future::err(err)) + } + } + } +} + // This is an optimization that Tokio should do. // Attempt to call read() on the main thread. #[cfg(not(windows))] -pub fn eager_read(resource: Resource, mut buf: T) -> EagerRead +pub fn eager_read(resource: Resource, buf: T) -> EagerRead where T: AsMut<[u8]>, { @@ -219,25 +246,7 @@ where None => panic!("bad rid"), Some(repr) => match repr { Repr::TcpStream(ref mut tcp_stream) => { - // Unforunately we can't just call read() on tokio::net::TcpStream - use std::os::unix::io::AsRawFd; - use std::os::unix::io::FromRawFd; - use std::os::unix::io::IntoRawFd; - let mut std_tcp_stream = - unsafe { std::net::TcpStream::from_raw_fd(tcp_stream.as_raw_fd()) }; - let read_result = std_tcp_stream.read(buf.as_mut()); - // std_tcp_stream will close when it gets dropped. Thus... - let _ = std_tcp_stream.into_raw_fd(); - match read_result { - Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))), - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_io::io::read(resource, buf)) - } else { - Either::B(futures::future::err(err)) - } - } - } + eager_read_tcp(tcp_stream, resource, buf) } _ => Either::A(tokio_io::io::read(resource, buf)), }, @@ -255,6 +264,29 @@ where Either::A(tokio_write::write(resource, buf)).into() } +#[cfg(not(windows))] +fn eager_write_tcp>( + tcp_stream: &TcpStream, + resource: Resource, + buf: T, +) -> EagerWrite { + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let write_result = std_tcp_stream.write(buf.as_ref()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match write_result { + Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))), + Err(err) => { + if err.kind() == std::io::ErrorKind::WouldBlock { + Either::A(tokio_write::write(resource, buf)) + } else { + Either::B(futures::future::err(err)) + } + } + } +} + // This is an optimization that Tokio should do. // Attempt to call write() on the main thread. #[cfg(not(windows))] @@ -268,25 +300,7 @@ where None => panic!("bad rid"), Some(repr) => match repr { Repr::TcpStream(ref mut tcp_stream) => { - // Unforunately we can't just call write() on tokio::net::TcpStream - use std::os::unix::io::AsRawFd; - use std::os::unix::io::FromRawFd; - use std::os::unix::io::IntoRawFd; - let mut std_tcp_stream = - unsafe { std::net::TcpStream::from_raw_fd(tcp_stream.as_raw_fd()) }; - let write_result = std_tcp_stream.write(buf.as_ref()); - // std_tcp_stream will close when it gets dropped. Thus... - let _ = std_tcp_stream.into_raw_fd(); - match write_result { - Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))), - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_write::write(resource, buf)) - } else { - Either::B(futures::future::err(err)) - } - } - } + eager_write_tcp(tcp_stream, resource, buf) } _ => Either::A(tokio_write::write(resource, buf)), }, @@ -303,6 +317,35 @@ pub fn eager_accept(resource: Resource) -> EagerAccept { Either::A(tokio_util::accept(resource)).into() } +#[cfg(not(windows))] +fn eager_accept_tcp( + tcp_listener: &tokio::net::TcpListener, + resource: Resource, +) -> EagerAccept { + let fd = (*tcp_listener).as_raw_fd(); + let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) }; + let result = std_listener.accept(); + // std_listener will close when it gets dropped. Thus... + let _ = std_listener.into_raw_fd(); + match result { + Ok((std_stream, addr)) => { + let result = tokio::net::TcpStream::from_std( + std_stream, + &tokio::reactor::Handle::default(), + ); + let tokio_stream = result.unwrap(); + Either::B(futures::future::ok((tokio_stream, addr))) + } + Err(err) => { + if err.kind() == std::io::ErrorKind::WouldBlock { + Either::A(tokio_util::accept(resource)) + } else { + Either::B(futures::future::err(err)) + } + } + } +} + // This is an optimization that Tokio should do. // Attempt to call write() on the main thread. #[cfg(not(windows))] @@ -312,33 +355,8 @@ pub fn eager_accept(resource: Resource) -> EagerAccept { match maybe_repr { None => panic!("bad rid"), Some(repr) => match repr { - Repr::TcpListener(ref mut listener) => { - // Unforunately we can't just call write() on tokio::net::TcpStream - use std::os::unix::io::AsRawFd; - use std::os::unix::io::FromRawFd; - use std::os::unix::io::IntoRawFd; - let mut std_listener = - unsafe { std::net::TcpListener::from_raw_fd(listener.as_raw_fd()) }; - let result = std_listener.accept(); - // std_listener will close when it gets dropped. Thus... - let _ = std_listener.into_raw_fd(); - match result { - Ok((std_stream, addr)) => { - let result = tokio::net::TcpStream::from_std( - std_stream, - &tokio::reactor::Handle::default(), - ); - let tokio_stream = result.unwrap(); - Either::B(futures::future::ok((tokio_stream, addr))) - } - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_util::accept(resource)) - } else { - Either::B(futures::future::err(err)) - } - } - } + Repr::TcpListener(ref mut tcp_listener) => { + eager_accept_tcp(tcp_listener, resource) } _ => Either::A(tokio_util::accept(resource)), },