0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

Make writeSync, readSync, seekSync, openSync, isatty proper synchronous syscalls (#4762)

This commit is contained in:
Ryan Dahl 2020-04-15 20:43:19 -04:00 committed by GitHub
parent 7cfd094359
commit fab0204cbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 350 additions and 196 deletions

View file

@ -23,6 +23,7 @@ export enum ErrorKind {
URIError = 20, URIError = 20,
TypeError = 21, TypeError = 21,
Other = 22, Other = 22,
Busy = 23,
} }
export function getErrorClass(kind: ErrorKind): { new (msg: string): Error } { export function getErrorClass(kind: ErrorKind): { new (msg: string): Error } {
@ -67,6 +68,8 @@ export function getErrorClass(kind: ErrorKind): { new (msg: string): Error } {
return BadResource; return BadResource;
case ErrorKind.Http: case ErrorKind.Http:
return Http; return Http;
case ErrorKind.Busy:
return Busy;
} }
} }
@ -172,6 +175,12 @@ class Http extends Error {
this.name = "Http"; this.name = "Http";
} }
} }
class Busy extends Error {
constructor(msg: string) {
super(msg);
this.name = "Busy";
}
}
export const errors = { export const errors = {
NotFound: NotFound, NotFound: NotFound,
@ -191,4 +200,5 @@ export const errors = {
UnexpectedEof: UnexpectedEof, UnexpectedEof: UnexpectedEof,
BadResource: BadResource, BadResource: BadResource,
Http: Http, Http: Http,
Busy: Busy,
}; };

View file

@ -1663,6 +1663,7 @@ declare namespace Deno {
UnexpectedEof: ErrorConstructor; UnexpectedEof: ErrorConstructor;
BadResource: ErrorConstructor; BadResource: ErrorConstructor;
Http: ErrorConstructor; Http: ErrorConstructor;
Busy: ErrorConstructor;
}; };
/** **UNSTABLE**: potentially want names to overlap more with browser. /** **UNSTABLE**: potentially want names to overlap more with browser.

View file

@ -57,3 +57,9 @@ unitTest({ perms: { read: false } }, async function readFilePerm(): Promise<
} }
assert(caughtError); assert(caughtError);
}); });
unitTest({ perms: { read: true } }, function readFileSyncLoop(): void {
for (let i = 0; i < 256; i++) {
Deno.readFileSync("cli/tests/fixture.json");
}
});

View file

@ -50,6 +50,7 @@ pub enum ErrorKind {
/// if no better context is available. /// if no better context is available.
/// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error /// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
Other = 22, Other = 22,
Busy = 23,
} }
#[derive(Debug)] #[derive(Debug)]
@ -103,6 +104,13 @@ impl OpError {
pub fn invalid_utf8() -> OpError { pub fn invalid_utf8() -> OpError {
Self::new(ErrorKind::InvalidData, "invalid utf8".to_string()) Self::new(ErrorKind::InvalidData, "invalid utf8".to_string())
} }
pub fn resource_unavailable() -> OpError {
Self::new(
ErrorKind::Busy,
"resource is unavailable because it is in use by a promise".to_string(),
)
}
} }
impl Error for OpError {} impl Error for OpError {}

View file

@ -14,7 +14,10 @@ use futures::future::FutureExt;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
pub type MinimalOp = dyn Future<Output = Result<i32, OpError>>; pub enum MinimalOp {
Sync(Result<i32, OpError>),
Async(Pin<Box<dyn Future<Output = Result<i32, OpError>>>>),
}
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side. // This corresponds to RecordMinimal on the TS side.
@ -113,7 +116,7 @@ fn test_parse_min_record() {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
where where
D: Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{ {
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| { move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| {
let mut record = match parse_min_record(control) { let mut record = match parse_min_record(control) {
@ -131,14 +134,13 @@ where
}; };
let is_sync = record.promise_id == 0; let is_sync = record.promise_id == 0;
let rid = record.arg; let rid = record.arg;
let min_op = d(rid, zero_copy); let min_op = d(is_sync, rid, zero_copy);
// Convert to CoreOp match min_op {
let fut = async move { MinimalOp::Sync(sync_result) => Op::Sync(match sync_result {
match min_op.await {
Ok(r) => { Ok(r) => {
record.result = r; record.result = r;
Ok(record.into()) record.into()
} }
Err(err) => { Err(err) => {
let error_record = ErrorRecord { let error_record = ErrorRecord {
@ -147,20 +149,30 @@ where
error_code: err.kind as i32, error_code: err.kind as i32,
error_message: err.msg.as_bytes().to_owned(), error_message: err.msg.as_bytes().to_owned(),
}; };
Ok(error_record.into()) error_record.into()
} }
}),
MinimalOp::Async(min_fut) => {
// Convert to CoreOp
let core_fut = async move {
match min_fut.await {
Ok(r) => {
record.result = r;
Ok(record.into())
}
Err(err) => {
let error_record = ErrorRecord {
promise_id: record.promise_id,
arg: -1,
error_code: err.kind as i32,
error_message: err.msg.as_bytes().to_owned(),
};
Ok(error_record.into())
}
}
};
Op::Async(core_fut.boxed_local())
} }
};
if is_sync {
// Warning! Possible deadlocks can occur if we try to wait for a future
// while in a future. The safe but expensive alternative is to use
// tokio_util::block_on.
// This block is only exercised for readSync and writeSync, which I think
// works since they're simple polling futures.
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed_local())
} }
} }
} }

View file

@ -1,16 +1,17 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it // Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value}; use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use super::io::std_file_resource;
use super::io::{FileMetadata, StreamResource, StreamResourceHolder}; use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs::resolve_from_cwd; use crate::fs::resolve_from_cwd;
use crate::op_error::OpError; use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult; use crate::ops::dispatch_json::JsonResult;
use crate::state::State; use crate::state::State;
use deno_core::*; use deno_core::Isolate;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt; use futures::future::FutureExt;
use std::convert::From; use std::convert::From;
use std::env::{current_dir, set_current_dir, temp_dir}; use std::env::{current_dir, set_current_dir, temp_dir};
use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
@ -75,22 +76,19 @@ fn op_open(
let path = resolve_from_cwd(Path::new(&args.path))?; let path = resolve_from_cwd(Path::new(&args.path))?;
let state_ = state.clone(); let state_ = state.clone();
let mut open_options = if let Some(mode) = args.mode { let mut open_options = std::fs::OpenOptions::new();
#[allow(unused_mut)]
let mut std_options = std::fs::OpenOptions::new(); if let Some(mode) = args.mode {
// mode only used if creating the file on Unix // mode only used if creating the file on Unix
// if not specified, defaults to 0o666 // if not specified, defaults to 0o666
#[cfg(unix)] #[cfg(unix)]
{ {
use std::os::unix::fs::OpenOptionsExt; use std::os::unix::fs::OpenOptionsExt;
std_options.mode(mode & 0o777); open_options.mode(mode & 0o777);
} }
#[cfg(not(unix))] #[cfg(not(unix))]
let _ = mode; // avoid unused warning let _ = mode; // avoid unused warning
tokio::fs::OpenOptions::from(std_options) }
} else {
tokio::fs::OpenOptions::new()
};
if let Some(options) = args.options { if let Some(options) = args.options {
if options.read { if options.read {
@ -165,23 +163,33 @@ fn op_open(
let is_sync = args.promise_id.is_none(); let is_sync = args.promise_id.is_none();
let fut = async move { if is_sync {
let fs_file = open_options.open(path).await?; let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
let mut state = state_.borrow_mut(); let mut state = state_.borrow_mut();
let rid = state.resource_table.add( let rid = state.resource_table.add(
"fsFile", "fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile( Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
fs_file, tokio_file,
FileMetadata::default(), FileMetadata::default(),
))), ))))),
); );
Ok(json!(rid)) Ok(JsonOp::Sync(json!(rid)))
};
if is_sync {
let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else { } else {
let fut = async move {
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
tokio_file,
FileMetadata::default(),
))))),
);
Ok(json!(rid))
};
Ok(JsonOp::Async(fut.boxed_local())) Ok(JsonOp::Async(fut.boxed_local()))
} }
} }
@ -200,6 +208,7 @@ fn op_seek(
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> { ) -> Result<JsonOp, OpError> {
use std::io::{Seek, SeekFrom};
let args: SeekArgs = serde_json::from_value(args)?; let args: SeekArgs = serde_json::from_value(args)?;
let rid = args.rid as u32; let rid = args.rid as u32;
let offset = args.offset; let offset = args.offset;
@ -217,29 +226,31 @@ fn op_seek(
} }
}; };
let state = state.borrow(); let state = state.clone();
let resource_holder = state
.resource_table
.get::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
let tokio_file = match resource_holder.resource {
StreamResource::FsFile(ref file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
let mut file = futures::executor::block_on(tokio_file.try_clone())?;
let is_sync = args.promise_id.is_none(); let is_sync = args.promise_id.is_none();
let fut = async move {
debug!("op_seek {} {} {}", rid, offset, whence);
let pos = file.seek(seek_from).await?;
Ok(json!(pos))
};
if is_sync { if is_sync {
let buf = futures::executor::block_on(fut)?; let mut s = state.borrow_mut();
Ok(JsonOp::Sync(buf)) let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
Err(_) => Err(OpError::type_error(
"cannot seek on this type of resource".to_string(),
)),
})?;
Ok(JsonOp::Sync(json!(pos)))
} else { } else {
// TODO(ry) This is a fake async op. We need to use poll_fn,
// tokio::fs::File::start_seek and tokio::fs::File::poll_complete
let fut = async move {
let mut s = state.borrow_mut();
let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
Err(_) => Err(OpError::type_error(
"cannot seek on this type of resource".to_string(),
)),
})?;
Ok(json!(pos))
};
Ok(JsonOp::Async(fut.boxed_local())) Ok(JsonOp::Async(fut.boxed_local()))
} }
} }

View file

@ -27,12 +27,14 @@ use std::os::windows::io::FromRawHandle;
extern crate winapi; extern crate winapi;
lazy_static! { lazy_static! {
/// Due to portability issues on Windows handle to stdout is created from raw file descriptor. /// Due to portability issues on Windows handle to stdout is created from raw
/// The caveat of that approach is fact that when this handle is dropped underlying /// file descriptor. The caveat of that approach is fact that when this
/// file descriptor is closed - that is highly not desirable in case of stdout. /// handle is dropped underlying file descriptor is closed - that is highly
/// That's why we store this global handle that is then cloned when obtaining stdio /// not desirable in case of stdout. That's why we store this global handle
/// for process. In turn when resource table is dropped storing reference to that handle, /// that is then cloned when obtaining stdio for process. In turn when
/// the handle itself won't be closed (so Deno.core.print) will still work. /// resource table is dropped storing reference to that handle, the handle
/// itself won't be closed (so Deno.core.print) will still work.
// TODO(ry) It should be possible to close stdout.
static ref STDOUT_HANDLE: std::fs::File = { static ref STDOUT_HANDLE: std::fs::File = {
#[cfg(not(windows))] #[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) }; let stdout = unsafe { std::fs::File::from_raw_fd(1) };
@ -42,9 +44,19 @@ lazy_static! {
winapi::um::winbase::STD_OUTPUT_HANDLE, winapi::um::winbase::STD_OUTPUT_HANDLE,
)) ))
}; };
stdout stdout
}; };
static ref STDERR_HANDLE: std::fs::File = {
#[cfg(not(windows))]
let stderr = unsafe { std::fs::File::from_raw_fd(2) };
#[cfg(windows)]
let stderr = unsafe {
std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
winapi::um::winbase::STD_ERROR_HANDLE,
))
};
stderr
};
} }
pub fn init(i: &mut Isolate, s: &State) { pub fn init(i: &mut Isolate, s: &State) {
@ -67,14 +79,14 @@ pub fn get_stdio() -> (
tokio::io::stdin(), tokio::io::stdin(),
TTYMetadata::default(), TTYMetadata::default(),
)); ));
let stdout = StreamResourceHolder::new(StreamResource::Stdout({ let stdout = StreamResourceHolder::new(StreamResource::FsFile(Some({
let stdout = STDOUT_HANDLE let stdout = STDOUT_HANDLE.try_clone().unwrap();
.try_clone() (tokio::fs::File::from_std(stdout), FileMetadata::default())
.expect("Unable to clone stdout handle"); })));
tokio::fs::File::from_std(stdout) let stderr = StreamResourceHolder::new(StreamResource::FsFile(Some({
})); let stderr = STDERR_HANDLE.try_clone().unwrap();
let stderr = (tokio::fs::File::from_std(stderr), FileMetadata::default())
StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr())); })));
(stdin, stdout, stderr) (stdin, stdout, stderr)
} }
@ -144,9 +156,7 @@ impl StreamResourceHolder {
pub enum StreamResource { pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata), Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File), FsFile(Option<(tokio::fs::File, FileMetadata)>),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File, FileMetadata),
TcpStream(tokio::net::TcpStream), TcpStream(tokio::net::TcpStream),
#[cfg(not(windows))] #[cfg(not(windows))]
UnixStream(tokio::net::UnixStream), UnixStream(tokio::net::UnixStream),
@ -182,7 +192,8 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, OpError>> { ) -> Poll<Result<usize, OpError>> {
use StreamResource::*; use StreamResource::*;
let f: &mut dyn UnpinAsyncRead = match self { let f: &mut dyn UnpinAsyncRead = match self {
FsFile(f, _) => f, FsFile(Some((f, _))) => f,
FsFile(None) => return Poll::Ready(Err(OpError::resource_unavailable())),
Stdin(f, _) => f, Stdin(f, _) => f,
TcpStream(f) => f, TcpStream(f) => f,
#[cfg(not(windows))] #[cfg(not(windows))]
@ -201,43 +212,65 @@ impl DenoAsyncRead for StreamResource {
pub fn op_read( pub fn op_read(
state: &State, state: &State,
is_sync: bool,
rid: i32, rid: i32,
zero_copy: Option<ZeroCopyBuf>, zero_copy: Option<ZeroCopyBuf>,
) -> Pin<Box<MinimalOp>> { ) -> MinimalOp {
debug!("read rid={}", rid); debug!("read rid={}", rid);
if zero_copy.is_none() { if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local(); return MinimalOp::Sync(Err(no_buffer_specified()));
} }
let state = state.clone(); let state = state.clone();
let mut buf = zero_copy.unwrap(); let mut buf = zero_copy.unwrap();
poll_fn(move |cx| { if is_sync {
let resource_table = &mut state.borrow_mut().resource_table; MinimalOp::Sync({
let resource_holder = resource_table // First we look up the rid in the resource table.
.get_mut::<StreamResourceHolder>(rid as u32) let resource_table = &mut state.borrow_mut().resource_table;
.ok_or_else(OpError::bad_resource_id)?; std_file_resource(resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
let mut task_tracker_id: Option<usize> = None; use std::io::Read;
let nread = match resource_holder std_file
.resource .read(&mut buf)
.poll_read(cx, &mut buf.as_mut()[..]) .map(|n: usize| n as i32)
.map_err(OpError::from) .map_err(OpError::from)
{
Poll::Ready(t) => {
if let Some(id) = task_tracker_id {
resource_holder.untrack_task(id);
} }
t Err(_) => Err(OpError::type_error(
} "sync read not allowed on this resource".to_string(),
Poll::Pending => { )),
task_tracker_id.replace(resource_holder.track_task(cx)?); })
return Poll::Pending; })
} } else {
}?; MinimalOp::Async(
Poll::Ready(Ok(nread as i32)) poll_fn(move |cx| {
}) let resource_table = &mut state.borrow_mut().resource_table;
.boxed_local() let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
let mut task_tracker_id: Option<usize> = None;
let nread = match resource_holder
.resource
.poll_read(cx, &mut buf.as_mut()[..])
.map_err(OpError::from)
{
Poll::Ready(t) => {
if let Some(id) = task_tracker_id {
resource_holder.untrack_task(id);
}
t
}
Poll::Pending => {
task_tracker_id.replace(resource_holder.track_task(cx)?);
return Poll::Pending;
}
}?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local(),
)
}
} }
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
@ -262,9 +295,8 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, OpError>> { ) -> Poll<Result<usize, OpError>> {
use StreamResource::*; use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self { let f: &mut dyn UnpinAsyncWrite = match self {
FsFile(f, _) => f, FsFile(Some((f, _))) => f,
Stdout(f) => f, FsFile(None) => return Poll::Pending,
Stderr(f) => f,
TcpStream(f) => f, TcpStream(f) => f,
#[cfg(not(windows))] #[cfg(not(windows))]
UnixStream(f) => f, UnixStream(f) => f,
@ -281,9 +313,8 @@ impl DenoAsyncWrite for StreamResource {
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), OpError>> { fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), OpError>> {
use StreamResource::*; use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self { let f: &mut dyn UnpinAsyncWrite = match self {
FsFile(f, _) => f, FsFile(Some((f, _))) => f,
Stdout(f) => f, FsFile(None) => return Poll::Pending,
Stderr(f) => f,
TcpStream(f) => f, TcpStream(f) => f,
#[cfg(not(windows))] #[cfg(not(windows))]
UnixStream(f) => f, UnixStream(f) => f,
@ -304,41 +335,121 @@ impl DenoAsyncWrite for StreamResource {
pub fn op_write( pub fn op_write(
state: &State, state: &State,
is_sync: bool,
rid: i32, rid: i32,
zero_copy: Option<ZeroCopyBuf>, zero_copy: Option<ZeroCopyBuf>,
) -> Pin<Box<MinimalOp>> { ) -> MinimalOp {
debug!("write rid={}", rid); debug!("write rid={}", rid);
if zero_copy.is_none() { if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local(); return MinimalOp::Sync(Err(no_buffer_specified()));
} }
let state = state.clone(); let state = state.clone();
let buf = zero_copy.unwrap(); let buf = zero_copy.unwrap();
async move { if is_sync {
let nwritten = poll_fn(|cx| { MinimalOp::Sync({
// First we look up the rid in the resource table.
let resource_table = &mut state.borrow_mut().resource_table; let resource_table = &mut state.borrow_mut().resource_table;
let resource_holder = resource_table std_file_resource(resource_table, rid as u32, move |r| match r {
.get_mut::<StreamResourceHolder>(rid as u32) Ok(std_file) => {
.ok_or_else(OpError::bad_resource_id)?; use std::io::Write;
resource_holder.resource.poll_write(cx, &buf.as_ref()[..]) std_file
.write(&buf)
.map(|nwritten: usize| nwritten as i32)
.map_err(OpError::from)
}
Err(_) => Err(OpError::type_error(
"sync read not allowed on this resource".to_string(),
)),
})
}) })
.await?; } else {
MinimalOp::Async(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
// and the reasons for the need to explicitly flush are not fully known. // and the reasons for the need to explicitly flush are not fully known.
// Figure out why it's needed and preferably remove it. // Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565 // https://github.com/denoland/deno/issues/3565
poll_fn(|cx| { poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table; let resource_table = &mut state.borrow_mut().resource_table;
let resource_holder = resource_table let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32) .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?; .ok_or_else(OpError::bad_resource_id)?;
resource_holder.resource.poll_flush(cx) resource_holder.resource.poll_flush(cx)
}) })
.await?; .await?;
Ok(nwritten as i32) Ok(nwritten as i32)
}
.boxed_local(),
)
}
}
/// Helper function for operating on a std::fs::File stored in the resource table.
///
/// We store file system file resources as tokio::fs::File, so this is a little
/// utility function that gets a std::fs:File when you need to do blocking
/// operations.
///
/// Returns ErrorKind::Busy if the resource is being used by another op.
pub fn std_file_resource<F, T>(
resource_table: &mut ResourceTable,
rid: u32,
mut f: F,
) -> Result<T, OpError>
where
F: FnMut(
Result<&mut std::fs::File, &mut StreamResource>,
) -> Result<T, OpError>,
{
// First we look up the rid in the resource table.
let mut r = resource_table.get_mut::<StreamResourceHolder>(rid);
if let Some(ref mut resource_holder) = r {
// Sync write only works for FsFile. It doesn't make sense to do this
// for non-blocking sockets. So we error out if not FsFile.
match &mut resource_holder.resource {
StreamResource::FsFile(option_file_metadata) => {
// The object in the resource table is a tokio::fs::File - but in
// order to do a blocking write on it, we must turn it into a
// std::fs::File. Hopefully this code compiles down to nothing.
if let Some((tokio_file, metadata)) = option_file_metadata.take() {
match tokio_file.try_into_std() {
Ok(mut std_file) => {
let result = f(Ok(&mut std_file));
// Turn the std_file handle back into a tokio file, put it back
// in the resource table.
let tokio_file = tokio::fs::File::from_std(std_file);
resource_holder.resource =
StreamResource::FsFile(Some((tokio_file, metadata)));
// return the result.
result
}
Err(tokio_file) => {
// This function will return an error containing the file if
// some operation is in-flight.
resource_holder.resource =
StreamResource::FsFile(Some((tokio_file, metadata)));
Err(OpError::resource_unavailable())
}
}
} else {
Err(OpError::resource_unavailable())
}
}
_ => f(Err(&mut resource_holder.resource)),
}
} else {
Err(OpError::bad_resource_id())
} }
.boxed_local()
} }

View file

@ -26,13 +26,15 @@ fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
.resource_table .resource_table
.get_mut::<StreamResourceHolder>(rid) .get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?; .ok_or_else(OpError::bad_resource_id)?;
let file = match repr_holder.resource { match repr_holder.resource {
StreamResource::FsFile(ref mut file, _) => file, StreamResource::FsFile(Some((ref mut file, _))) => {
_ => return Err(OpError::bad_resource_id()), let tokio_file = futures::executor::block_on(file.try_clone())?;
}; let std_file = futures::executor::block_on(tokio_file.into_std());
let tokio_file = futures::executor::block_on(file.try_clone())?; Ok(std_file)
let std_file = futures::executor::block_on(tokio_file.into_std()); }
Ok(std_file) StreamResource::FsFile(None) => Err(OpError::resource_unavailable()),
_ => Err(OpError::bad_resource_id()),
}
} }
fn subprocess_stdio_map(s: &str) -> std::process::Stdio { fn subprocess_stdio_map(s: &str) -> std::process::Stdio {

View file

@ -1,4 +1,5 @@
use super::dispatch_json::JsonOp; use super::dispatch_json::JsonOp;
use super::io::std_file_resource;
use super::io::{StreamResource, StreamResourceHolder}; use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError; use crate::op_error::OpError;
use crate::ops::json_op; use crate::ops::json_op;
@ -73,13 +74,16 @@ pub fn op_set_raw(
// For now, only stdin. // For now, only stdin.
let handle = match &resource_holder.unwrap().resource { let handle = match &resource_holder.unwrap().resource {
StreamResource::Stdin(_, _) => std::io::stdin().as_raw_handle(), StreamResource::Stdin(_, _) => std::io::stdin().as_raw_handle(),
StreamResource::FsFile(f, _) => { StreamResource::FsFile(None) => {
return Err(OpError::resource_unavailable())
}
StreamResource::FsFile(Some((f, _))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?; let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std()); let std_file = futures::executor::block_on(tokio_file.into_std());
std_file.as_raw_handle() std_file.as_raw_handle()
} }
_ => { _ => {
return Err(OpError::other("Not supported".to_owned())); return Err(OpError::bad_resource_id());
} }
}; };
@ -122,11 +126,14 @@ pub fn op_set_raw(
StreamResource::Stdin(_, ref mut metadata) => { StreamResource::Stdin(_, ref mut metadata) => {
(std::io::stdin().as_raw_fd(), &mut metadata.mode) (std::io::stdin().as_raw_fd(), &mut metadata.mode)
} }
StreamResource::FsFile(f, ref mut metadata) => { StreamResource::FsFile(Some((f, ref mut metadata))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?; let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std()); let std_file = futures::executor::block_on(tokio_file.into_std());
(std_file.as_raw_fd(), &mut metadata.tty.mode) (std_file.as_raw_fd(), &mut metadata.tty.mode)
} }
StreamResource::FsFile(None) => {
return Err(OpError::resource_unavailable())
}
_ => { _ => {
return Err(OpError::other("Not supported".to_owned())); return Err(OpError::other("Not supported".to_owned()));
} }
@ -165,13 +172,16 @@ pub fn op_set_raw(
StreamResource::Stdin(_, ref mut metadata) => { StreamResource::Stdin(_, ref mut metadata) => {
(std::io::stdin().as_raw_fd(), &mut metadata.mode) (std::io::stdin().as_raw_fd(), &mut metadata.mode)
} }
StreamResource::FsFile(f, ref mut metadata) => { StreamResource::FsFile(Some((f, ref mut metadata))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?; let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std()); let std_file = futures::executor::block_on(tokio_file.into_std());
(std_file.as_raw_fd(), &mut metadata.tty.mode) (std_file.as_raw_fd(), &mut metadata.tty.mode)
} }
StreamResource::FsFile(None) => {
return Err(OpError::resource_unavailable());
}
_ => { _ => {
return Err(OpError::other("Not supported".to_owned())); return Err(OpError::bad_resource_id());
} }
}; };
@ -190,55 +200,36 @@ struct IsattyArgs {
} }
pub fn op_isatty( pub fn op_isatty(
state_: &State, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> { ) -> Result<JsonOp, OpError> {
let args: IsattyArgs = serde_json::from_value(args)?; let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid; let rid = args.rid;
let state = state_.borrow_mut(); let resource_table = &mut state.borrow_mut().resource_table;
if !state.resource_table.has(rid) { let isatty: bool =
return Err(OpError::bad_resource_id()); std_file_resource(resource_table, rid as u32, move |r| match r {
} Ok(std_file) => {
#[cfg(windows)]
{
use winapi::um::consoleapi;
let resource_holder = state.resource_table.get::<StreamResourceHolder>(rid); let handle = get_windows_handle(&std_file)?;
if resource_holder.is_none() { let mut test_mode: DWORD = 0;
return Ok(JsonOp::Sync(json!(false))); // If I cannot get mode out of console, it is not a console.
} Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 })
}
match &resource_holder.unwrap().resource { #[cfg(unix)]
StreamResource::Stdin(_, _) => { {
Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdin)))) use std::os::unix::io::AsRawFd;
} let raw_fd = std_file.as_raw_fd();
StreamResource::Stdout(_) => { Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdout)))) }
}
StreamResource::Stderr(_) => {
Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stderr))))
}
StreamResource::FsFile(f, _) => {
let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std());
#[cfg(windows)]
{
use winapi::um::consoleapi;
let handle = get_windows_handle(&std_file)?;
let mut test_mode: DWORD = 0;
// If I cannot get mode out of console, it is not a console.
let result =
unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 };
Ok(JsonOp::Sync(json!(result)))
} }
#[cfg(unix)] Err(StreamResource::FsFile(_)) => unreachable!(),
{ Err(StreamResource::Stdin(_, _)) => Ok(atty::is(atty::Stream::Stdin)),
use std::os::unix::io::AsRawFd; _ => Ok(false),
let raw_fd = std_file.as_raw_fd(); })?;
let result = unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }; Ok(JsonOp::Sync(json!(isatty)))
Ok(JsonOp::Sync(json!(result)))
}
}
_ => Ok(JsonOp::Sync(json!(false))),
}
} }

View file

@ -145,15 +145,15 @@ impl State {
pub fn stateful_minimal_op<D>( pub fn stateful_minimal_op<D>(
&self, &self,
dispatcher: D, dispatcher: D,
) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>> ) -> impl Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp
where where
D: Fn(&State, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, D: Fn(&State, bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{ {
let state = self.clone(); let state = self.clone();
move |is_sync: bool,
move |rid: i32, zero_copy: Option<ZeroCopyBuf>| -> Pin<Box<MinimalOp>> { rid: i32,
dispatcher(&state, rid, zero_copy) zero_copy: Option<ZeroCopyBuf>|
} -> MinimalOp { dispatcher(&state, is_sync, rid, zero_copy) }
} }
/// This is a special function that provides `state` argument to dispatcher. /// This is a special function that provides `state` argument to dispatcher.
@ -169,7 +169,6 @@ impl State {
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>, D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{ {
let state = self.clone(); let state = self.clone();
move |args: Value, move |args: Value,
zero_copy: Option<ZeroCopyBuf>| zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) } -> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) }

View file

@ -17,6 +17,9 @@ pub(crate) type PendingOpFuture =
pub type OpResult<E> = Result<Op<E>, E>; pub type OpResult<E> = Result<Op<E>, E>;
// TODO(ry) Op::Async should be Op::Async(Pin<Box<dyn Future<Output = Buf>>>)
// The error should be encoded in the Buf. Notice how Sync ops do not return a
// result. The Sync and Async should be symmetrical!
pub enum Op<E> { pub enum Op<E> {
Sync(Buf), Sync(Buf),
Async(OpAsyncFuture<E>), Async(OpAsyncFuture<E>),