From 55fac9f5ead6d30996400e8597c969b675c5a22b Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Tue, 19 Dec 2023 18:07:22 +0530 Subject: [PATCH] fix(node): child_process IPC on Windows (#21597) This PR implements the child_process IPC pipe between parent and child. The implementation uses Windows named pipes created by parent and passes the inheritable file handle to the child. I've also replace parts of the initial implementation which passed the raw parent fd to JS with resource ids instead. This way no file handle is exposed to the JS land (both parent and child). `IpcJsonStreamResource` can stream upto 800MB/s of JSON data on Win 11 AMD Ryzen 7 16GB (without `memchr` vectorization) --- Cargo.lock | 2 + cli/args/mod.rs | 4 +- cli/worker.rs | 4 +- ext/node/Cargo.toml | 4 + ext/node/lib.rs | 2 +- ext/node/ops/ipc.rs | 180 +++++++++++-------- ext/node/polyfills/internal/child_process.ts | 5 +- runtime/Cargo.toml | 1 + runtime/ops/process.rs | 147 +++++++++++++-- runtime/worker_bootstrap.rs | 2 +- 10 files changed, 258 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6fe5d2966..384e84579b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,7 @@ dependencies = [ "typenum", "url", "winapi", + "windows-sys 0.48.0", "x25519-dalek", "x509-parser", ] @@ -1635,6 +1636,7 @@ dependencies = [ "uuid", "which", "winapi", + "windows-sys 0.48.0", "winres", ] diff --git a/cli/args/mod.rs b/cli/args/mod.rs index 187d3d604c..0c1bd6e0a1 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -917,12 +917,12 @@ impl CliOptions { .map(Some) } - pub fn node_ipc_fd(&self) -> Option { + pub fn node_ipc_fd(&self) -> Option { let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); if let Some(node_channel_fd) = maybe_node_channel_fd { // Remove so that child processes don't inherit this environment variable. std::env::remove_var("DENO_CHANNEL_FD"); - node_channel_fd.parse::().ok() + node_channel_fd.parse::().ok() } else { None } diff --git a/cli/worker.rs b/cli/worker.rs index 2f00165812..bc611a05ce 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -124,7 +124,7 @@ struct SharedWorkerState { maybe_inspector_server: Option>, maybe_lockfile: Option>>, feature_checker: Arc, - node_ipc: Option, + node_ipc: Option, } impl SharedWorkerState { @@ -404,7 +404,7 @@ impl CliMainWorkerFactory { maybe_lockfile: Option>>, feature_checker: Arc, options: CliMainWorkerOptions, - node_ipc: Option, + node_ipc: Option, ) -> Self { Self { shared: Arc::new(SharedWorkerState { diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index c7ca63097c..b72449f5c5 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -74,3 +74,7 @@ url.workspace = true winapi.workspace = true x25519-dalek = "2.0.0" x509-parser = "0.15.0" + +[target.'cfg(windows)'.dependencies] +windows-sys.workspace = true +winapi = { workspace = true, features = ["consoleapi"] } diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 42a7527913..0922c69861 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -31,6 +31,7 @@ mod polyfill; mod resolution; pub use ops::ipc::ChildPipeFd; +pub use ops::ipc::IpcJsonStreamResource; pub use ops::v8::VM_CONTEXT_INDEX; pub use package_json::PackageJson; pub use path::PathClean; @@ -307,7 +308,6 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, - ops::ipc::op_node_ipc_pipe, ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_ipc_write, ops::ipc::op_node_ipc_read, diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index afaccc49f0..9b2b27c71d 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -1,20 +1,17 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -#[cfg(unix)] -pub use unix::*; +pub use impl_::*; -#[cfg(windows)] -pub use windows::*; +pub struct ChildPipeFd(pub i64); -pub struct ChildPipeFd(pub i32); - -#[cfg(unix)] -mod unix { +mod impl_ { use std::cell::RefCell; use std::future::Future; use std::io; use std::mem; + #[cfg(unix)] use std::os::fd::FromRawFd; + #[cfg(unix)] use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; @@ -35,18 +32,16 @@ mod unix { use tokio::io::AsyncBufRead; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; + + #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; + #[cfg(unix)] use tokio::net::unix::OwnedWriteHalf; + #[cfg(unix)] use tokio::net::UnixStream; - #[op2(fast)] - #[smi] - pub fn op_node_ipc_pipe( - state: &mut OpState, - #[smi] fd: i32, - ) -> Result { - Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) - } + #[cfg(windows)] + type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; // Open IPC pipe from bootstrap options. #[op2] @@ -97,9 +92,12 @@ mod unix { Ok(msgs) } - struct IpcJsonStreamResource { + pub struct IpcJsonStreamResource { read_half: AsyncRefCell, + #[cfg(unix)] write_half: AsyncRefCell, + #[cfg(windows)] + write_half: AsyncRefCell>, cancel: Rc, } @@ -109,14 +107,35 @@ mod unix { } } + #[cfg(unix)] + fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + Ok(unix_stream.into_split()) + } + + #[cfg(windows)] + fn pipe( + handle: i64, + ) -> Result< + ( + tokio::io::ReadHalf, + tokio::io::WriteHalf, + ), + io::Error, + > { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; + Ok(tokio::io::split(pipe)) + } + impl IpcJsonStreamResource { - fn new(stream: RawFd) -> Result { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - let (read_half, write_half) = unix_stream.into_split(); + pub fn new(stream: i64) -> Result { + let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -124,8 +143,9 @@ mod unix { }) } + #[cfg(unix)] #[cfg(test)] - fn from_unix_stream(stream: UnixStream) -> Self { + fn from_stream(stream: UnixStream) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), @@ -134,6 +154,17 @@ mod unix { } } + #[cfg(windows)] + #[cfg(test)] + fn from_stream(pipe: NamedPipeClient) -> Self { + let (read_half, write_half) = tokio::io::split(pipe); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + async fn write_msg( self: Rc, msg: serde_json::Value, @@ -172,11 +203,15 @@ mod unix { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { + #[cfg(unix)] pipe: BufReader, + #[cfg(windows)] + pipe: BufReader>, buffer: Vec, } impl IpcJsonStream { + #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), @@ -184,6 +219,14 @@ mod unix { } } + #[cfg(windows)] + fn new(pipe: tokio::io::ReadHalf) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + async fn read_msg(&mut self) -> Result { let mut json = None; let nread = @@ -252,7 +295,6 @@ mod unix { std::task::Poll::Ready(t) => t?, std::task::Poll::Pending => return std::task::Poll::Pending, }; - if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. @@ -366,6 +408,35 @@ mod unix { use deno_core::RcRef; use std::rc::Rc; + #[cfg(unix)] + pub async fn pair() -> (Rc, tokio::net::UnixStream) { + let (a, b) = tokio::net::UnixStream::pair().unwrap(); + + /* Similar to how ops would use the resource */ + let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + (a, b) + } + + #[cfg(windows)] + pub async fn pair() -> ( + Rc, + tokio::net::windows::named_pipe::NamedPipeServer, + ) { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::net::windows::named_pipe::ServerOptions; + + let name = + format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::()); + + let server = ServerOptions::new().create(name.clone()).unwrap(); + let client = ClientOptions::new().open(name).unwrap(); + + server.connect().await.unwrap(); + /* Similar to how ops would use the resource */ + let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + (client, server) + } + #[tokio::test] async fn bench_ipc() -> Result<(), Box> { // A simple round trip benchmark for quick dev feedback. @@ -375,7 +446,7 @@ mod unix { return Ok(()); } - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncWriteExt; @@ -389,8 +460,6 @@ mod unix { Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - let start = std::time::Instant::now(); let mut bytes = 0; @@ -416,21 +485,20 @@ mod unix { #[tokio::test] async fn unix_ipc_json() -> Result<(), Box> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) }); - /* Similar to how ops would use the resource */ - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - ipc.clone().write_msg(json!("hello")).await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; @@ -444,19 +512,19 @@ mod unix { #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); ipc.clone().write_msg(json!("hello")).await?; ipc.clone().write_msg(json!("world")).await?; @@ -471,13 +539,12 @@ mod unix { #[tokio::test] async fn unix_ipc_json_invalid() -> Result<(), Box> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let _err = ipc.read_msg().await.unwrap_err(); @@ -499,30 +566,3 @@ mod unix { } } } - -#[cfg(windows)] -mod windows { - use deno_core::error::AnyError; - use deno_core::op2; - - #[op2(fast)] - pub fn op_node_ipc_pipe() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(fast)] - #[smi] - pub fn op_node_child_ipc_pipe() -> Result { - Ok(-1) - } - - #[op2(async)] - pub async fn op_node_ipc_write() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(async)] - pub async fn op_node_ipc_read() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } -} diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index b9bf133969..0e93e22d3d 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -45,7 +45,6 @@ import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; const core = globalThis.__bootstrap.core; -const ops = core.ops; export function mapValues( record: Readonly>, @@ -1069,9 +1068,7 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } -export function setupChannel(target, channel) { - const ipc = ops.op_node_ipc_pipe(channel); - +export function setupChannel(target, ipc) { async function readLoop() { try { while (true) { diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e47d2b59e2..ad9586390a 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -122,6 +122,7 @@ which = "4.2.5" fwdansi.workspace = true winapi = { workspace = true, features = ["commapi", "knownfolders", "mswsock", "objbase", "psapi", "shlobj", "tlhelp32", "winbase", "winerror", "winuser", "winsock2"] } ntapi = "0.4.0" +windows-sys.workspace = true [target.'cfg(unix)'.dependencies] nix.workspace = true diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 6f89e55294..e8dc9c6908 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -141,7 +141,6 @@ pub struct SpawnArgs { uid: Option, #[cfg(windows)] windows_raw_arguments: bool, - #[cfg(unix)] ipc: Option, #[serde(flatten)] @@ -207,12 +206,7 @@ pub struct SpawnOutput { stderr: Option, } -type CreateCommand = ( - std::process::Command, - // TODO(@littledivy): Ideally this would return Option but we are dealing with file descriptors - // all the way until setupChannel which makes it easier to share code between parent and child fork. - Option, -); +type CreateCommand = (std::process::Command, Option); fn create_command( state: &mut OpState, @@ -337,17 +331,144 @@ fn create_command( }); /* One end returned to parent process (this) */ - let pipe_fd = Some(fd1); + let pipe_rid = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), + ); /* The other end passed to child process via DENO_CHANNEL_FD */ command.env("DENO_CHANNEL_FD", format!("{}", ipc)); - return Ok((command, pipe_fd)); + return Ok((command, pipe_rid)); } Ok((command, None)) } + #[cfg(windows)] + // Safety: We setup a windows named pipe and pass one end to the child process. + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::DuplicateHandle; + use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + use windows_sys::Win32::System::Threading::GetCurrentProcess; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let mut hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + // Duplicating the handle to allow the child process to use it. + if DuplicateHandle( + GetCurrentProcess(), + hd2, + GetCurrentProcess(), + &mut hd2, + 0, + 1, + DUPLICATE_SAME_ACCESS, + ) == 0 + { + return Err(std::io::Error::last_os_error().into()); + } + + /* One end returned to parent process (this) */ + let pipe_fd = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), + ); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); + + return Ok((command, pipe_fd)); + } + } + #[cfg(not(unix))] return Ok((command, None)); } @@ -360,13 +481,13 @@ struct Child { stdin_rid: Option, stdout_rid: Option, stderr_rid: Option, - pipe_fd: Option, + pipe_fd: Option, } fn spawn_child( state: &mut OpState, command: std::process::Command, - pipe_fd: Option, + pipe_fd: Option, ) -> Result { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -459,8 +580,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result { - let (command, pipe_fd) = create_command(state, args, &api_name)?; - spawn_child(state, command, pipe_fd) + let (command, pipe_rid) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_rid) } #[op2(async)] diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index 97d66158b6..8f7f05888e 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -59,7 +59,7 @@ pub struct BootstrapOptions { pub inspect: bool, pub has_node_modules_dir: bool, pub maybe_binary_npm_command_name: Option, - pub node_ipc_fd: Option, + pub node_ipc_fd: Option, } impl Default for BootstrapOptions {