1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 15:10:44 -05:00

Move eager functions into eager_unix.rs

This commit is contained in:
Bert Belder 2018-10-24 03:12:21 +02:00
parent 58f0547e09
commit ae00df73a2
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461
3 changed files with 131 additions and 123 deletions

86
src/eager_unix.rs Normal file
View file

@ -0,0 +1,86 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use resources::{EagerAccept, EagerRead, EagerWrite, Resource};
use tokio_util;
use tokio_write;
use futures::future::{self, Either};
use std;
use std::io::{ErrorKind, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use tokio;
use tokio::net::{TcpListener, TcpStream};
use tokio_io;
pub fn tcp_read<T: AsMut<[u8]>>(
tcp_stream: &TcpStream,
resource: Resource,
mut buf: T,
) -> EagerRead<Resource, T> {
// Unforunately we can't just call read() on tokio::net::TcpStream
let fd = (*tcp_stream).as_raw_fd();
let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
let read_result = std_tcp_stream.read(buf.as_mut());
// std_tcp_stream will close when it gets dropped. Thus...
let _ = std_tcp_stream.into_raw_fd();
match read_result {
Ok(nread) => Either::B(future::ok((resource, buf, nread))),
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
Either::A(tokio_io::io::read(resource, buf))
} else {
Either::B(future::err(err))
}
}
}
}
pub fn tcp_write<T: AsRef<[u8]>>(
tcp_stream: &TcpStream,
resource: Resource,
buf: T,
) -> EagerWrite<Resource, T> {
let fd = (*tcp_stream).as_raw_fd();
let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
let write_result = std_tcp_stream.write(buf.as_ref());
// std_tcp_stream will close when it gets dropped. Thus...
let _ = std_tcp_stream.into_raw_fd();
match write_result {
Ok(nwrite) => Either::B(future::ok((resource, buf, nwrite))),
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
Either::A(tokio_write::write(resource, buf))
} else {
Either::B(future::err(err))
}
}
}
}
pub fn tcp_accept(
tcp_listener: &TcpListener,
resource: Resource,
) -> EagerAccept {
let fd = (*tcp_listener).as_raw_fd();
let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) };
let result = std_listener.accept();
// std_listener will close when it gets dropped. Thus...
let _ = std_listener.into_raw_fd();
match result {
Ok((std_stream, addr)) => {
let result = tokio::net::TcpStream::from_std(
std_stream,
&tokio::reactor::Handle::default(),
);
let tokio_stream = result.unwrap();
Either::B(future::ok((tokio_stream, addr)))
}
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
Either::A(tokio_util::accept(resource))
} else {
Either::B(future::err(err))
}
}
}
}

View file

@ -36,6 +36,9 @@ mod tokio_util;
mod tokio_write; mod tokio_write;
mod version; mod version;
#[cfg(unix)]
mod eager_unix;
use std::env; use std::env;
static LOGGER: Logger = Logger; static LOGGER: Logger = Logger;

View file

@ -8,18 +8,18 @@
// descriptors". This module implements a global resource table. Ops (AKA // descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here. // handlers) look up resources by their integer id here.
#[cfg(unix)]
use eager_unix as eager;
use errors::DenoError; use errors::DenoError;
use tokio_util; use tokio_util;
use tokio_write; use tokio_write;
use futures; use futures;
use futures::future::Either; use futures::future::{Either, FutureResult};
use futures::future::FutureResult;
use futures::Poll; use futures::Poll;
use std; use std;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Error; use std::io::{Error, Read, Write};
use std::io::{Read, Write};
use std::net::{Shutdown, SocketAddr}; use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize; use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -194,161 +194,80 @@ pub fn lookup(rid: ResourceId) -> Option<Resource> {
table.get(&rid).map(|_| Resource { rid }) table.get(&rid).map(|_| Resource { rid })
} }
type EagerRead<R, T> = pub type EagerRead<R, T> =
Either<tokio_io::io::Read<R, T>, FutureResult<(R, T, usize), std::io::Error>>; Either<tokio_io::io::Read<R, T>, FutureResult<(R, T, usize), std::io::Error>>;
#[cfg(windows)] pub type EagerWrite<R, T> =
Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>;
pub type EagerAccept = Either<
tokio_util::Accept,
FutureResult<(tokio::net::TcpStream, std::net::SocketAddr), std::io::Error>,
>;
#[cfg(not(unix))]
#[allow(unused_mut)] #[allow(unused_mut)]
pub fn eager_read<T>(resource: Resource, mut buf: T) -> EagerRead<Resource, T> pub fn eager_read<T: AsMut<[u8]>>(
where
T: AsMut<[u8]>,
{
Either::A(tokio_io::io::read(resource, buf)).into()
}
#[cfg(not(windows))]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
#[cfg(not(windows))]
fn eager_read_tcp<T: AsMut<[u8]>>(
tcp_stream: &TcpStream,
resource: Resource, resource: Resource,
mut buf: T, mut buf: T,
) -> EagerRead<Resource, T> { ) -> EagerRead<Resource, T> {
// Unforunately we can't just call read() on tokio::net::TcpStream Either::A(tokio_io::io::read(resource, buf)).into()
let fd = (*tcp_stream).as_raw_fd(); }
let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
let read_result = std_tcp_stream.read(buf.as_mut()); #[cfg(not(unix))]
// std_tcp_stream will close when it gets dropped. Thus... pub fn eager_write<T: AsRef<[u8]>>(
let _ = std_tcp_stream.into_raw_fd(); resource: Resource,
match read_result { buf: T,
Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))), ) -> EagerWrite<Resource, T> {
Err(err) => { Either::A(tokio_write::write(resource, buf)).into()
if err.kind() == std::io::ErrorKind::WouldBlock { }
Either::A(tokio_io::io::read(resource, buf))
} else { #[cfg(not(unix))]
Either::B(futures::future::err(err)) pub fn eager_accept(resource: Resource) -> EagerAccept {
} Either::A(tokio_util::accept(resource)).into()
}
}
} }
// This is an optimization that Tokio should do. // This is an optimization that Tokio should do.
// Attempt to call read() on the main thread. // Attempt to call read() on the main thread.
#[cfg(not(windows))] #[cfg(unix)]
pub fn eager_read<T>(resource: Resource, buf: T) -> EagerRead<Resource, T> pub fn eager_read<T: AsMut<[u8]>>(
where resource: Resource,
T: AsMut<[u8]>, buf: T,
{ ) -> EagerRead<Resource, T> {
let mut table = RESOURCE_TABLE.lock().unwrap(); let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid); let maybe_repr = table.get_mut(&resource.rid);
match maybe_repr { match maybe_repr {
None => panic!("bad rid"), None => panic!("bad rid"),
Some(repr) => match repr { Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => { Repr::TcpStream(ref mut tcp_stream) => {
eager_read_tcp(tcp_stream, resource, buf) eager::tcp_read(tcp_stream, resource, buf)
} }
_ => Either::A(tokio_io::io::read(resource, buf)), _ => Either::A(tokio_io::io::read(resource, buf)),
}, },
} }
} }
type EagerWrite<R, T> = // This is an optimization that Tokio should do.
Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>; // Attempt to call write() on the main thread.
#[cfg(unix)]
#[cfg(windows)] pub fn eager_write<T: AsRef<[u8]>>(
pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T>
where
T: AsRef<[u8]>,
{
Either::A(tokio_write::write(resource, buf)).into()
}
#[cfg(not(windows))]
fn eager_write_tcp<T: AsRef<[u8]>>(
tcp_stream: &TcpStream,
resource: Resource, resource: Resource,
buf: T, buf: T,
) -> EagerWrite<Resource, T> { ) -> EagerWrite<Resource, T> {
let fd = (*tcp_stream).as_raw_fd();
let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
let write_result = std_tcp_stream.write(buf.as_ref());
// std_tcp_stream will close when it gets dropped. Thus...
let _ = std_tcp_stream.into_raw_fd();
match write_result {
Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))),
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
Either::A(tokio_write::write(resource, buf))
} else {
Either::B(futures::future::err(err))
}
}
}
}
// This is an optimization that Tokio should do.
// Attempt to call write() on the main thread.
#[cfg(not(windows))]
pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T>
where
T: AsRef<[u8]>,
{
let mut table = RESOURCE_TABLE.lock().unwrap(); let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid); let maybe_repr = table.get_mut(&resource.rid);
match maybe_repr { match maybe_repr {
None => panic!("bad rid"), None => panic!("bad rid"),
Some(repr) => match repr { Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => { Repr::TcpStream(ref mut tcp_stream) => {
eager_write_tcp(tcp_stream, resource, buf) eager::tcp_write(tcp_stream, resource, buf)
} }
_ => Either::A(tokio_write::write(resource, buf)), _ => Either::A(tokio_write::write(resource, buf)),
}, },
} }
} }
type EagerAccept = Either< #[cfg(unix)]
tokio_util::Accept,
FutureResult<(TcpStream, SocketAddr), std::io::Error>,
>;
#[cfg(windows)]
pub fn eager_accept(resource: Resource) -> EagerAccept {
Either::A(tokio_util::accept(resource)).into()
}
#[cfg(not(windows))]
fn eager_accept_tcp(
tcp_listener: &tokio::net::TcpListener,
resource: Resource,
) -> EagerAccept {
let fd = (*tcp_listener).as_raw_fd();
let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) };
let result = std_listener.accept();
// std_listener will close when it gets dropped. Thus...
let _ = std_listener.into_raw_fd();
match result {
Ok((std_stream, addr)) => {
let result = tokio::net::TcpStream::from_std(
std_stream,
&tokio::reactor::Handle::default(),
);
let tokio_stream = result.unwrap();
Either::B(futures::future::ok((tokio_stream, addr)))
}
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
Either::A(tokio_util::accept(resource))
} else {
Either::B(futures::future::err(err))
}
}
}
}
// This is an optimization that Tokio should do.
// Attempt to call write() on the main thread.
#[cfg(not(windows))]
pub fn eager_accept(resource: Resource) -> EagerAccept { pub fn eager_accept(resource: Resource) -> EagerAccept {
let mut table = RESOURCE_TABLE.lock().unwrap(); let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid); let maybe_repr = table.get_mut(&resource.rid);
@ -356,7 +275,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
None => panic!("bad rid"), None => panic!("bad rid"),
Some(repr) => match repr { Some(repr) => match repr {
Repr::TcpListener(ref mut tcp_listener) => { Repr::TcpListener(ref mut tcp_listener) => {
eager_accept_tcp(tcp_listener, resource) eager::tcp_accept(tcp_listener, resource)
} }
_ => Either::A(tokio_util::accept(resource)), _ => Either::A(tokio_util::accept(resource)),
}, },