1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

replace impl Future with poll_fn for net.rs, process.rs, tls.rs (#4158)

This commit is contained in:
Bartek Iwańczuk 2020-02-27 21:08:21 +01:00 committed by GitHub
parent ff4b7b0921
commit fa5f3aa600
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 175 deletions

View file

@ -5,13 +5,12 @@ use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use std;
use std::convert::From;
use std::future::Future;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
@ -28,55 +27,6 @@ pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_send", s.stateful_json_op(op_send));
}
#[derive(Debug, PartialEq)]
enum AcceptState {
Pending,
Done,
}
/// A future representing state of accepting a TCP connection.
pub struct Accept<'a> {
accept_state: AcceptState,
rid: ResourceId,
state: &'a State,
}
impl Future for Accept<'_> {
type Output = Result<(TcpStream, SocketAddr), OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
if inner.accept_state == AcceptState::Done {
panic!("poll Accept after it's done");
}
let mut state = inner.state.borrow_mut();
let listener_resource = state
.resource_table
.get_mut::<TcpListenerResource>(inner.rid)
.ok_or_else(|| OpError::other("Listener has been closed".to_string()))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
Poll::Ready(Err(e))
}
}
}
}
#[derive(Deserialize)]
struct AcceptArgs {
rid: i32,
@ -98,12 +48,32 @@ fn op_accept(
.ok_or_else(OpError::bad_resource)?;
}
let state = state.clone();
let op = async move {
let accept_fut = Accept {
accept_state: AcceptState::Pending,
rid,
state: &state_,
};
let accept_fut = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| {
OpError::other("Listener has been closed".to_string())
})?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
@ -129,31 +99,6 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
pub struct Receive<'a> {
state: &'a State,
rid: ResourceId,
buf: ZeroCopyBuf,
}
impl Future for Receive<'_> {
type Output = Result<(usize, SocketAddr), OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut state = inner.state.borrow_mut();
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(inner.rid)
.ok_or_else(|| OpError::other("Socket has been closed".to_string()))?;
let socket = &mut resource.socket;
socket
.poll_recv_from(cx, &mut inner.buf)
.map_err(OpError::from)
}
}
#[derive(Deserialize)]
struct ReceiveArgs {
rid: i32,
@ -165,7 +110,7 @@ fn op_receive(
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
let mut buf = zero_copy.unwrap();
let args: ReceiveArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
@ -173,11 +118,14 @@ fn op_receive(
let state_ = state.clone();
let op = async move {
let receive_fut = Receive {
state: &state_,
rid,
buf,
};
let receive_fut = poll_fn(|cx| {
let resource_table = &mut state_.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| OpError::other("Socket has been closed".to_string()))?;
let socket = &mut resource.socket;
socket.poll_recv_from(cx, &mut buf).map_err(OpError::from)
});
let (size, remote_addr) = receive_fut.await?;
Ok(json!({
"size": size,

View file

@ -6,15 +6,11 @@ use crate::signal::kill;
use crate::state::State;
use deno_core::*;
use futures;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::TryFutureExt;
use std;
use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
use tokio::process::Command;
#[cfg(unix)]
@ -172,26 +168,6 @@ fn op_run(
})))
}
pub struct ChildStatus {
rid: ResourceId,
state: State,
}
impl Future for ChildStatus {
type Output = Result<ExitStatus, OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut state = inner.state.borrow_mut();
let child_resource = state
.resource_table
.get_mut::<ChildResource>(inner.rid)
.ok_or_else(OpError::bad_resource)?;
let child = &mut child_resource.child;
child.map_err(OpError::from).poll_unpin(cx)
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RunStatusArgs {
@ -207,14 +183,19 @@ fn op_run_status(
let rid = args.rid as u32;
state.check_run()?;
let future = ChildStatus {
rid,
state: state.clone(),
};
let state = state.clone();
let future = async move {
let run_status = future.await?;
let run_status = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let child_resource = resource_table
.get_mut::<ChildResource>(rid)
.ok_or_else(OpError::bad_resource)?;
let child = &mut child_resource.child;
child.map_err(OpError::from).poll_unpin(cx)
})
.await?;
let code = run_status.code();
#[cfg(unix)]

View file

@ -5,15 +5,14 @@ use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use std;
use std::convert::From;
use std::fs::File;
use std::future::Future;
use std::io::BufReader;
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@ -265,55 +264,6 @@ fn op_listen_tls(
})))
}
#[derive(Debug, PartialEq)]
enum AcceptTlsState {
Pending,
Done,
}
/// A future representing state of accepting a TLS connection.
pub struct AcceptTls {
accept_state: AcceptTlsState,
rid: ResourceId,
state: State,
}
impl Future for AcceptTls {
type Output = Result<(TcpStream, SocketAddr), OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
if inner.accept_state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
let mut state = inner.state.borrow_mut();
let listener_resource = state
.resource_table
.get_mut::<TlsListenerResource>(inner.rid)
.ok_or_else(|| OpError::other("Listener has been closed".to_string()))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
Poll::Ready(Err(e))
}
}
}
}
#[derive(Deserialize)]
struct AcceptTlsArgs {
rid: i32,
@ -328,11 +278,29 @@ fn op_accept_tls(
let rid = args.rid as u32;
let state = state.clone();
let op = async move {
let accept_fut = AcceptTls {
accept_state: AcceptTlsState::Pending,
rid,
state: state.clone(),
};
let accept_fut = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let listener_resource = resource_table
.get_mut::<TlsListenerResource>(rid)
.ok_or_else(|| {
OpError::other("Listener has been closed".to_string())
})?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;