From df09fbad92c4d48e58ed34000000f01ec4812e48 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 22 Sep 2018 04:42:07 -0400 Subject: [PATCH] Use the thread pool for blocking I/O --- BUILD.gn | 1 + src/handlers.rs | 166 +++++++++++++++++++++++++++++------------------- src/main.rs | 1 + 3 files changed, 104 insertions(+), 64 deletions(-) diff --git a/BUILD.gn b/BUILD.gn index f92112e19a..b901b1764b 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -55,6 +55,7 @@ main_extern = [ "$rust_build:rand", "$rust_build:tokio", "$rust_build:tokio_executor", + "$rust_build:tokio_threadpool", "$rust_build:url", "$rust_build:remove_dir_all", "$rust_build:dirs", diff --git a/src/handlers.rs b/src/handlers.rs index 42becc0b09..43097918db 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -10,7 +10,9 @@ use msg; use flatbuffers::FlatBufferBuilder; use futures; +use futures::future::poll_fn; use futures::sync::oneshot; +use futures::Poll; use hyper; use hyper::rt::{Future, Stream}; use hyper::Client; @@ -20,10 +22,12 @@ use std::fs; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio::timer::Delay; +use tokio_threadpool; type OpResult = DenoResult; @@ -91,8 +95,6 @@ pub fn msg_from_js(state: Arc, bytes: &[u8]) -> (bool, Box) { let buf = if is_sync || buf.len() > 0 { buf } else { - // async RPCs that return empty still need to - // send a message back to signal completion. let builder = &mut FlatBufferBuilder::new(); serialize_response( cmd_id, @@ -402,23 +404,61 @@ where (delay_task, cancel_tx) } +// This is just type conversion. Implement From trait? +// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 +fn convert_blocking(f: F) -> Poll +where + F: FnOnce() -> DenoResult, +{ + use futures::Async::*; + match tokio_threadpool::blocking(f) { + Ok(Ready(Ok(v))) => Ok(v.into()), + Ok(Ready(Err(err))) => Err(err), + Ok(NotReady) => Ok(NotReady), + Err(_) => panic!("blocking error"), + } +} + +// TODO Do not use macro for the blocking function.. We should instead be able +// to do this with a normal function, but there seems to some type system +// issues. The type of this function should be something like this: +// fn blocking(is_sync: bool, f: F) -> Box +// where F: FnOnce() -> DenoResult +macro_rules! blocking { + ($is_sync:expr,$fn:expr) => { + if $is_sync { + // If synchronous, execute the function immediately on the main thread. + Box::new(futures::future::result($fn())) + } else { + // Otherwise dispatch to thread pool. + Box::new(poll_fn(move || convert_blocking($fn))) + } + }; +} + fn handle_make_temp_dir(state: Arc, base: &msg::Base) -> Box { let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); - let dir = msg.dir(); - let prefix = msg.prefix(); - let suffix = msg.suffix(); if !state.flags.allow_write { return odd_future(permission_denied()); } - // TODO Use blocking() here. - Box::new(futures::future::result(|| -> OpResult { + + let dir = msg.dir().map(PathBuf::from); + let prefix = msg.prefix().map(String::from); + let suffix = msg.suffix().map(String::from); + + blocking!(base.sync(), || -> OpResult { // TODO(piscisaureus): use byte vector for paths, not a string. // See https://github.com/denoland/isolate/issues/627. // We can't assume that paths are always valid utf8 strings. - let path = deno_fs::make_temp_dir(dir.map(Path::new), prefix, suffix)?; + let path = deno_fs::make_temp_dir( + // Converting Option to Option<&str> + dir.as_ref().map(|x| &**x), + prefix.as_ref().map(|x| &**x), + suffix.as_ref().map(|x| &**x), + )?; let builder = &mut FlatBufferBuilder::new(); let path_off = builder.create_string(path.to_str().unwrap()); let msg = msg::MakeTempDirRes::create( @@ -437,59 +477,56 @@ fn handle_make_temp_dir(state: Arc, base: &msg::Base) -> Box { ..Default::default() }, )) - }())) + }) } fn handle_mkdir(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_mkdir().unwrap(); let mode = msg.mode(); - let path = msg.path().unwrap(); + let path = String::from(msg.path().unwrap()); if !state.flags.allow_write { return odd_future(permission_denied()); } - // TODO Use tokio_threadpool. - Box::new(futures::future::result(|| -> OpResult { + + blocking!(base.sync(), || { debug!("handle_mkdir {}", path); - deno_fs::mkdir(Path::new(path), mode)?; + deno_fs::mkdir(Path::new(&path), mode)?; Ok(empty_buf()) - }())) + }) } fn handle_remove(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_remove().unwrap(); - let path = msg.path().unwrap(); + let path = PathBuf::from(msg.path().unwrap()); let recursive = msg.recursive(); - if !state.flags.allow_write { return odd_future(permission_denied()); } - // TODO Use tokio_threadpool. - Box::new(futures::future::result(|| -> OpResult { - debug!("handle_remove {}", path); - let path_ = Path::new(&path); - let metadata = fs::metadata(&path_)?; + blocking!(base.sync(), || { + debug!("handle_remove {}", path.display()); + let metadata = fs::metadata(&path)?; if metadata.is_file() { - fs::remove_file(&path_)?; + fs::remove_file(&path)?; } else { if recursive { - remove_dir_all(&path_)?; + remove_dir_all(&path)?; } else { - fs::remove_dir(&path_)?; + fs::remove_dir(&path)?; } } Ok(empty_buf()) - }())) + }) } // Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184 fn handle_read_file(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_read_file().unwrap(); let cmd_id = base.cmd_id(); - let filename = String::from(msg.filename().unwrap()); - Box::new(futures::future::result(|| -> OpResult { - debug!("handle_read_file {}", filename); - let vec = fs::read(Path::new(&filename))?; + let filename = PathBuf::from(msg.filename().unwrap()); + debug!("handle_read_file {}", filename.display()); + blocking!(base.sync(), || { + let vec = fs::read(&filename)?; // Build the response message. memcpy data into msg. // TODO(ry) zero-copy. let builder = &mut FlatBufferBuilder::new(); @@ -510,7 +547,7 @@ fn handle_read_file(_config: Arc, base: &msg::Base) -> Box { ..Default::default() }, )) - }())) + }) } macro_rules! to_seconds { @@ -536,17 +573,16 @@ fn get_mode(_perm: fs::Permissions) -> u32 { fn handle_stat(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_stat().unwrap(); let cmd_id = base.cmd_id(); - let filename = String::from(msg.filename().unwrap()); + let filename = PathBuf::from(msg.filename().unwrap()); let lstat = msg.lstat(); - Box::new(futures::future::result(|| -> OpResult { + blocking!(base.sync(), || { let builder = &mut FlatBufferBuilder::new(); - debug!("handle_stat {} {}", filename, lstat); - let path = Path::new(&filename); + debug!("handle_stat {} {}", filename.display(), lstat); let metadata = if lstat { - fs::symlink_metadata(path)? + fs::symlink_metadata(&filename)? } else { - fs::metadata(path)? + fs::metadata(&filename)? }; let msg = msg::StatRes::create( @@ -573,24 +609,25 @@ fn handle_stat(_config: Arc, base: &msg::Base) -> Box { ..Default::default() }, )) - }())) + }) } fn handle_write_file(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_write_file().unwrap(); - let filename = String::from(msg.filename().unwrap()); - let data = msg.data().unwrap(); - let perm = msg.perm(); if !state.flags.allow_write { return odd_future(permission_denied()); } - Box::new(futures::future::result(|| -> OpResult { + let filename = String::from(msg.filename().unwrap()); + let data = Vec::from(msg.data().unwrap()); + let perm = msg.perm(); + + blocking!(base.sync(), || -> OpResult { debug!("handle_write_file {}", filename); - deno_fs::write_file(Path::new(&filename), data, perm)?; + deno_fs::write_file(Path::new(&filename), data.as_slice(), perm)?; Ok(empty_buf()) - }())) + }) } fn remove_timer(state: Arc, timer_id: u32) { @@ -654,13 +691,13 @@ fn handle_rename(state: Arc, base: &msg::Base) -> Box { return odd_future(permission_denied()); } let msg = base.msg_as_rename().unwrap(); - let oldpath = String::from(msg.oldpath().unwrap()); - let newpath = String::from(msg.newpath().unwrap()); - Box::new(futures::future::result(|| -> OpResult { - debug!("handle_rename {} {}", oldpath, newpath); - fs::rename(Path::new(&oldpath), Path::new(&newpath))?; + let oldpath = PathBuf::from(msg.oldpath().unwrap()); + let newpath = PathBuf::from(msg.newpath().unwrap()); + blocking!(base.sync(), || -> OpResult { + debug!("handle_rename {} {}", oldpath.display(), newpath.display()); + fs::rename(&oldpath, &newpath)?; Ok(empty_buf()) - }())) + }) } fn handle_symlink(state: Arc, base: &msg::Base) -> Box { @@ -670,26 +707,27 @@ fn handle_symlink(state: Arc, base: &msg::Base) -> Box { // TODO Use type for Windows. if cfg!(windows) { return odd_future(not_implemented()); - } else { - let msg = base.msg_as_symlink().unwrap(); - let oldname = String::from(msg.oldname().unwrap()); - let newname = String::from(msg.newname().unwrap()); - Box::new(futures::future::result(|| -> OpResult { - debug!("handle_symlink {} {}", oldname, newname); - #[cfg(any(unix))] - std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?; - Ok(empty_buf()) - }())) } + + let msg = base.msg_as_symlink().unwrap(); + let oldname = PathBuf::from(msg.oldname().unwrap()); + let newname = PathBuf::from(msg.newname().unwrap()); + blocking!(base.sync(), || -> OpResult { + debug!("handle_symlink {} {}", oldname.display(), newname.display()); + #[cfg(any(unix))] + std::os::unix::fs::symlink(&oldname, &newname)?; + Ok(empty_buf()) + }) } fn handle_read_link(_state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_readlink().unwrap(); let cmd_id = base.cmd_id(); - let name = String::from(msg.name().unwrap()); - Box::new(futures::future::result(|| -> OpResult { - debug!("handle_read_link {}", name); - let path = fs::read_link(Path::new(&name))?; + let name = PathBuf::from(msg.name().unwrap()); + + blocking!(base.sync(), || -> OpResult { + debug!("handle_read_link {}", name.display()); + let path = fs::read_link(&name)?; let builder = &mut FlatBufferBuilder::new(); let path_off = builder.create_string(path.to_str().unwrap()); let msg = msg::ReadlinkRes::create( @@ -708,5 +746,5 @@ fn handle_read_link(_state: Arc, base: &msg::Base) -> Box { ..Default::default() }, )) - }())) + }) } diff --git a/src/main.rs b/src/main.rs index b60f805137..cc762f1aef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ extern crate rand; extern crate tempfile; extern crate tokio; extern crate tokio_executor; +extern crate tokio_threadpool; extern crate url; #[macro_use] extern crate lazy_static;