From 0050857f511e886e4af0a811fef16a43eaeb6e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 17 Jan 2025 12:30:14 +0000 Subject: [PATCH] refactor: add 'deno_process' crate (#27680) Untangled the whole `runtime/ops/process.rs` from `ext/node/` and moved to a separate `ext/process` crate. --- Cargo.lock | 30 +- Cargo.toml | 1 + cli/args/mod.rs | 2 +- cli/lib/worker.rs | 2 +- cli/npm/byonm.rs | 2 +- cli/npm/installer/common/lifecycle_scripts.rs | 4 +- cli/npm/managed.rs | 2 +- cli/npm/mod.rs | 2 +- ext/node/Cargo.toml | 2 +- ext/node/lib.rs | 2 - ext/node/ops/ipc.rs | 544 +---------------- ext/node/polyfills/child_process.ts | 2 +- ext/node/polyfills/internal/child_process.ts | 2 +- {runtime/js => ext/process}/40_process.js | 0 ext/process/Cargo.toml | 41 ++ ext/process/README.md | 3 + ext/process/ipc.rs | 558 ++++++++++++++++++ runtime/ops/process.rs => ext/process/lib.rs | 25 +- runtime/Cargo.toml | 2 + runtime/js/90_deno_ns.js | 2 +- runtime/lib.rs | 3 +- runtime/ops/mod.rs | 1 - runtime/ops/tty.rs | 5 +- runtime/shared.rs | 1 - runtime/snapshot.rs | 2 +- runtime/web_worker.rs | 8 +- runtime/worker.rs | 8 +- tools/core_import_map.json | 2 +- 28 files changed, 676 insertions(+), 582 deletions(-) rename {runtime/js => ext/process}/40_process.js (100%) create mode 100644 ext/process/Cargo.toml create mode 100644 ext/process/README.md create mode 100644 ext/process/ipc.rs rename runtime/ops/process.rs => ext/process/lib.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index 3e571a2c2a..392ff5ef46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2031,6 +2031,7 @@ dependencies = [ "deno_package_json", "deno_path_util", "deno_permissions", + "deno_process", "deno_whoami", "der", "digest", @@ -2068,7 +2069,6 @@ dependencies = [ "p384", "path-clean", "pbkdf2", - "pin-project-lite", "pkcs8", "rand", "regex", @@ -2238,6 +2238,33 @@ dependencies = [ "winapi", ] +[[package]] +name = "deno_process" +version = "0.1.0" +dependencies = [ + "deno_core", + "deno_error", + "deno_fs", + "deno_io", + "deno_os", + "deno_path_util", + "deno_permissions", + "libc", + "log", + "memchr", + "nix", + "pin-project-lite", + "rand", + "serde", + "simd-json", + "tempfile", + "thiserror 2.0.3", + "tokio", + "which", + "winapi", + "windows-sys 0.59.0", +] + [[package]] name = "deno_resolver" version = "0.17.0" @@ -2290,6 +2317,7 @@ dependencies = [ "deno_os", "deno_path_util", "deno_permissions", + "deno_process", "deno_resolver", "deno_telemetry", "deno_terminal 0.2.0", diff --git a/Cargo.toml b/Cargo.toml index 46318bb828..42c2970e05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ deno_napi = { version = "0.117.0", path = "./ext/napi" } deno_net = { version = "0.178.0", path = "./ext/net" } deno_node = { version = "0.124.0", path = "./ext/node" } deno_os = { version = "0.3.0", path = "./ext/os" } +deno_process = { version = "0.1.0", path = "./ext/process" } deno_telemetry = { version = "0.8.0", path = "./ext/telemetry" } deno_tls = { version = "0.173.0", path = "./ext/tls" } deno_url = { version = "0.186.0", path = "./ext/url" } diff --git a/cli/args/mod.rs b/cli/args/mod.rs index 29b493046f..f77eedc594 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -719,7 +719,7 @@ pub enum NpmProcessStateKind { } static NPM_PROCESS_STATE: Lazy> = Lazy::new(|| { - use deno_runtime::ops::process::NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME; + use deno_runtime::deno_process::NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME; let fd = std::env::var(NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME).ok()?; std::env::remove_var(NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME); let fd = fd.parse::().ok()?; diff --git a/cli/lib/worker.rs b/cli/lib/worker.rs index 7c9071d0ba..180d3eef8c 100644 --- a/cli/lib/worker.rs +++ b/cli/lib/worker.rs @@ -25,12 +25,12 @@ use deno_runtime::deno_node::NodeExtInitServices; use deno_runtime::deno_node::NodeRequireLoader; use deno_runtime::deno_node::NodeResolver; use deno_runtime::deno_permissions::PermissionsContainer; +use deno_runtime::deno_process::NpmProcessStateProviderRc; use deno_runtime::deno_telemetry::OtelConfig; use deno_runtime::deno_tls::RootCertStoreProvider; use deno_runtime::deno_web::BlobStore; use deno_runtime::fmt_errors::format_js_error; use deno_runtime::inspector_server::InspectorServer; -use deno_runtime::ops::process::NpmProcessStateProviderRc; use deno_runtime::ops::worker_host::CreateWebWorkerCb; use deno_runtime::web_worker::WebWorker; use deno_runtime::web_worker::WebWorkerOptions; diff --git a/cli/npm/byonm.rs b/cli/npm/byonm.rs index 8dc498bb04..d52b222074 100644 --- a/cli/npm/byonm.rs +++ b/cli/npm/byonm.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use deno_core::serde_json; use deno_resolver::npm::ByonmNpmResolver; use deno_resolver::npm::ByonmNpmResolverCreateOptions; -use deno_runtime::ops::process::NpmProcessStateProvider; +use deno_runtime::deno_process::NpmProcessStateProvider; use crate::args::NpmProcessState; use crate::args::NpmProcessStateKind; diff --git a/cli/npm/installer/common/lifecycle_scripts.rs b/cli/npm/installer/common/lifecycle_scripts.rs index a0d821cdfc..3238b8d023 100644 --- a/cli/npm/installer/common/lifecycle_scripts.rs +++ b/cli/npm/installer/common/lifecycle_scripts.rs @@ -240,7 +240,7 @@ impl<'a> LifecycleScripts<'a> { // However, if we concurrently run scripts in the future we will // have to have multiple temp files. let temp_file_fd = - deno_runtime::ops::process::npm_process_state_tempfile( + deno_runtime::deno_process::npm_process_state_tempfile( process_state.as_bytes(), ) .map_err(LifecycleScriptsError::CreateNpmProcessState)?; @@ -248,7 +248,7 @@ impl<'a> LifecycleScripts<'a> { let _temp_file = unsafe { std::fs::File::from_raw_io_handle(temp_file_fd) }; // make sure the file gets closed env_vars.insert( - deno_runtime::ops::process::NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME + deno_runtime::deno_process::NPM_RESOLUTION_STATE_FD_ENV_VAR_NAME .to_string(), (temp_file_fd as usize).to_string(), ); diff --git a/cli/npm/managed.rs b/cli/npm/managed.rs index 4122c881f1..049e3541db 100644 --- a/cli/npm/managed.rs +++ b/cli/npm/managed.rs @@ -14,7 +14,7 @@ use deno_npm::resolution::ValidSerializedNpmResolutionSnapshot; use deno_resolver::npm::managed::ManagedNpmResolverCreateOptions; use deno_resolver::npm::managed::NpmResolutionCell; use deno_resolver::npm::ManagedNpmResolverRc; -use deno_runtime::ops::process::NpmProcessStateProvider; +use deno_runtime::deno_process::NpmProcessStateProvider; use thiserror::Error; use super::CliNpmRegistryInfoProvider; diff --git a/cli/npm/mod.rs b/cli/npm/mod.rs index fc0916cc18..a2cbd81d5b 100644 --- a/cli/npm/mod.rs +++ b/cli/npm/mod.rs @@ -12,7 +12,7 @@ use deno_core::url::Url; use deno_error::JsErrorBox; use deno_npm::npm_rc::ResolvedNpmRc; use deno_npm::registry::NpmPackageInfo; -use deno_runtime::ops::process::NpmProcessStateProviderRc; +use deno_runtime::deno_process::NpmProcessStateProviderRc; use deno_semver::package::PackageNv; use deno_semver::package::PackageReq; use http::HeaderName; diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index e9226163d6..d4152ce58c 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -38,6 +38,7 @@ deno_net.workspace = true deno_package_json.workspace = true deno_path_util.workspace = true deno_permissions.workspace = true +deno_process.workspace = true deno_whoami = "0.1.0" der = { version = "0.7.9", features = ["derive"] } digest = { version = "0.10.5", features = ["core-api", "std"] } @@ -75,7 +76,6 @@ p256.workspace = true p384.workspace = true path-clean = "=0.1.0" pbkdf2 = "0.12.1" -pin-project-lite = "0.2.13" pkcs8 = { version = "0.10.2", features = ["std", "pkcs5", "encryption"] } rand.workspace = true regex.workspace = true diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 325cec6f5b..702a01e447 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -31,8 +31,6 @@ pub use deno_package_json::PackageJson; use deno_permissions::PermissionCheckError; pub use node_resolver::PathClean; pub use ops::ipc::ChildPipeFd; -pub use ops::ipc::IpcJsonStreamResource; -pub use ops::ipc::IpcRefTracker; use ops::vm; pub use ops::vm::create_v8_context; pub use ops::vm::init_global_template; diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index cf5e1e97ef..0213295c5a 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -8,38 +8,24 @@ mod impl_ { use std::cell::RefCell; use std::future::Future; use std::io; - use std::mem; - use std::pin::Pin; use std::rc::Rc; - use std::sync::atomic::AtomicBool; - use std::sync::atomic::AtomicUsize; - use std::task::ready; - use std::task::Context; - use std::task::Poll; use deno_core::op2; use deno_core::serde; use deno_core::serde::Serializer; use deno_core::serde_json; use deno_core::v8; - use deno_core::AsyncRefCell; use deno_core::CancelFuture; - use deno_core::CancelHandle; - use deno_core::ExternalOpsTracker; use deno_core::OpState; use deno_core::RcRef; use deno_core::ResourceId; use deno_core::ToV8; use deno_error::JsErrorBox; - use deno_io::BiPipe; - use deno_io::BiPipeRead; - use deno_io::BiPipeWrite; - use memchr::memchr; - use pin_project_lite::pin_project; + use deno_process::ipc::IpcJsonStreamError; + pub use deno_process::ipc::IpcJsonStreamResource; + pub use deno_process::ipc::IpcRefTracker; + pub use deno_process::ipc::INITIAL_CAPACITY; use serde::Serialize; - use tokio::io::AsyncRead; - use tokio::io::AsyncWriteExt; - use tokio::io::ReadBuf; /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b>( @@ -289,534 +275,12 @@ mod impl_ { stream.ref_tracker.unref(); } - /// Tracks whether the IPC resources is currently - /// refed, and allows refing/unrefing it. - pub struct IpcRefTracker { - refed: AtomicBool, - tracker: OpsTracker, - } - - /// A little wrapper so we don't have to get an - /// `ExternalOpsTracker` for tests. When we aren't - /// cfg(test), this will get optimized out. - enum OpsTracker { - External(ExternalOpsTracker), - #[cfg(test)] - Test, - } - - impl OpsTracker { - fn ref_(&self) { - match self { - Self::External(tracker) => tracker.ref_op(), - #[cfg(test)] - Self::Test => {} - } - } - - fn unref(&self) { - match self { - Self::External(tracker) => tracker.unref_op(), - #[cfg(test)] - Self::Test => {} - } - } - } - - impl IpcRefTracker { - pub fn new(tracker: ExternalOpsTracker) -> Self { - Self { - refed: AtomicBool::new(false), - tracker: OpsTracker::External(tracker), - } - } - - #[cfg(test)] - fn new_test() -> Self { - Self { - refed: AtomicBool::new(false), - tracker: OpsTracker::Test, - } - } - - fn ref_(&self) { - if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) { - self.tracker.ref_(); - } - } - - fn unref(&self) { - if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) { - self.tracker.unref(); - } - } - } - - pub struct IpcJsonStreamResource { - read_half: AsyncRefCell, - write_half: AsyncRefCell, - cancel: Rc, - queued_bytes: AtomicUsize, - ref_tracker: IpcRefTracker, - } - - impl deno_core::Resource for IpcJsonStreamResource { - fn close(self: Rc) { - self.cancel.cancel(); - } - } - - impl IpcJsonStreamResource { - pub fn new( - stream: i64, - ref_tracker: IpcRefTracker, - ) -> Result { - let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); - Ok(Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), - cancel: Default::default(), - queued_bytes: Default::default(), - ref_tracker, - }) - } - - #[cfg(all(unix, test))] - fn from_stream( - stream: tokio::net::UnixStream, - ref_tracker: IpcRefTracker, - ) -> Self { - let (read_half, write_half) = stream.into_split(); - Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), - write_half: AsyncRefCell::new(write_half.into()), - cancel: Default::default(), - queued_bytes: Default::default(), - ref_tracker, - } - } - - #[cfg(all(windows, test))] - fn from_stream( - pipe: tokio::net::windows::named_pipe::NamedPipeClient, - ref_tracker: IpcRefTracker, - ) -> Self { - let (read_half, write_half) = tokio::io::split(pipe); - Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), - write_half: AsyncRefCell::new(write_half.into()), - cancel: Default::default(), - queued_bytes: Default::default(), - ref_tracker, - } - } - - /// writes _newline terminated_ JSON message to the IPC pipe. - async fn write_msg_bytes( - self: Rc, - msg: &[u8], - ) -> Result<(), io::Error> { - let mut write_half = - RcRef::map(self, |r| &r.write_half).borrow_mut().await; - write_half.write_all(msg).await?; - Ok(()) - } - } - - // Initial capacity of the buffered reader and the JSON backing buffer. - // - // This is a tradeoff between memory usage and performance on large messages. - // - // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. - const INITIAL_CAPACITY: usize = 1024 * 64; - - /// A buffer for reading from the IPC pipe. - /// Similar to the internal buffer of `tokio::io::BufReader`. - /// - /// This exists to provide buffered reading while granting mutable access - /// to the internal buffer (which isn't exposed through `tokio::io::BufReader` - /// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input - /// buffer for parsing, so this allows us to use the read buffer directly as the - /// input buffer without a copy (provided the message fits). - struct ReadBuffer { - buffer: Box<[u8]>, - pos: usize, - cap: usize, - } - - impl ReadBuffer { - fn new() -> Self { - Self { - buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(), - pos: 0, - cap: 0, - } - } - - fn get_mut(&mut self) -> &mut [u8] { - &mut self.buffer - } - - fn available_mut(&mut self) -> &mut [u8] { - &mut self.buffer[self.pos..self.cap] - } - - fn consume(&mut self, n: usize) { - self.pos = std::cmp::min(self.pos + n, self.cap); - } - - fn needs_fill(&self) -> bool { - self.pos >= self.cap - } - } - - #[derive(Debug, thiserror::Error, deno_error::JsError)] - pub enum IpcJsonStreamError { - #[class(inherit)] - #[error("{0}")] - Io(#[source] std::io::Error), - #[class(generic)] - #[error("{0}")] - SimdJson(#[source] simd_json::Error), - } - - // JSON serialization stream over IPC pipe. - // - // `\n` is used as a delimiter between messages. - struct IpcJsonStream { - pipe: BiPipeRead, - buffer: Vec, - read_buffer: ReadBuffer, - } - - impl IpcJsonStream { - fn new(pipe: BiPipeRead) -> Self { - Self { - pipe, - buffer: Vec::with_capacity(INITIAL_CAPACITY), - read_buffer: ReadBuffer::new(), - } - } - - async fn read_msg( - &mut self, - ) -> Result, IpcJsonStreamError> { - let mut json = None; - let nread = read_msg_inner( - &mut self.pipe, - &mut self.buffer, - &mut json, - &mut self.read_buffer, - ) - .await - .map_err(IpcJsonStreamError::Io)?; - if nread == 0 { - // EOF. - return Ok(None); - } - - let json = match json { - Some(v) => v, - None => { - // Took more than a single read and some buffering. - simd_json::from_slice(&mut self.buffer[..nread]) - .map_err(IpcJsonStreamError::SimdJson)? - } - }; - - // Safety: Same as `Vec::clear` but without the `drop_in_place` for - // each element (nop for u8). Capacity remains the same. - unsafe { - self.buffer.set_len(0); - } - - Ok(Some(json)) - } - } - - pin_project! { - #[must_use = "futures do nothing unless you `.await` or poll them"] - struct ReadMsgInner<'a, R: ?Sized> { - reader: &'a mut R, - buf: &'a mut Vec, - json: &'a mut Option, - // The number of bytes appended to buf. This can be less than buf.len() if - // the buffer was not empty when the operation was started. - read: usize, - read_buffer: &'a mut ReadBuffer, - } - } - - fn read_msg_inner<'a, R>( - reader: &'a mut R, - buf: &'a mut Vec, - json: &'a mut Option, - read_buffer: &'a mut ReadBuffer, - ) -> ReadMsgInner<'a, R> - where - R: AsyncRead + ?Sized + Unpin, - { - ReadMsgInner { - reader, - buf, - json, - read: 0, - read_buffer, - } - } - - fn read_msg_internal( - mut reader: Pin<&mut R>, - cx: &mut Context<'_>, - buf: &mut Vec, - read_buffer: &mut ReadBuffer, - json: &mut Option, - read: &mut usize, - ) -> Poll> { - loop { - let (done, used) = { - // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer. - if read_buffer.needs_fill() { - let mut read_buf = ReadBuf::new(read_buffer.get_mut()); - ready!(reader.as_mut().poll_read(cx, &mut read_buf))?; - read_buffer.cap = read_buf.filled().len(); - read_buffer.pos = 0; - } - let available = read_buffer.available_mut(); - if let Some(i) = memchr(b'\n', available) { - if *read == 0 { - // Fast path: parse and put into the json slot directly. - json.replace( - simd_json::from_slice(&mut available[..i + 1]) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, - ); - } else { - // This is not the first read, so we have to copy the data - // to make it contiguous. - buf.extend_from_slice(&available[..=i]); - } - (true, i + 1) - } else { - buf.extend_from_slice(available); - (false, available.len()) - } - }; - - read_buffer.consume(used); - *read += used; - if done || used == 0 { - return Poll::Ready(Ok(mem::replace(read, 0))); - } - } - } - - impl Future for ReadMsgInner<'_, R> { - type Output = io::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let me = self.project(); - read_msg_internal( - Pin::new(*me.reader), - cx, - me.buf, - me.read_buffer, - me.json, - me.read, - ) - } - } - #[cfg(test)] mod tests { - use std::rc::Rc; - - use deno_core::serde_json::json; use deno_core::v8; use deno_core::JsRuntime; - use deno_core::RcRef; use deno_core::RuntimeOptions; - use super::IpcJsonStreamResource; - - #[allow(clippy::unused_async)] - #[cfg(unix)] - pub async fn pair() -> (Rc, tokio::net::UnixStream) { - let (a, b) = tokio::net::UnixStream::pair().unwrap(); - - /* Similar to how ops would use the resource */ - let a = Rc::new(IpcJsonStreamResource::from_stream( - a, - super::IpcRefTracker::new_test(), - )); - (a, b) - } - - #[cfg(windows)] - pub async fn pair() -> ( - Rc, - tokio::net::windows::named_pipe::NamedPipeServer, - ) { - use tokio::net::windows::named_pipe::ClientOptions; - use tokio::net::windows::named_pipe::ServerOptions; - - let name = - format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::()); - - let server = ServerOptions::new().create(name.clone()).unwrap(); - let client = ClientOptions::new().open(name).unwrap(); - - server.connect().await.unwrap(); - /* Similar to how ops would use the resource */ - let client = Rc::new(IpcJsonStreamResource::from_stream( - client, - super::IpcRefTracker::new_test(), - )); - (client, server) - } - - #[allow(clippy::print_stdout)] - #[tokio::test] - async fn bench_ipc() -> Result<(), Box> { - // A simple round trip benchmark for quick dev feedback. - // - // Only ran when the env var is set. - if std::env::var_os("BENCH_IPC_DENO").is_none() { - return Ok(()); - } - - let (ipc, mut fd2) = pair().await; - let child = tokio::spawn(async move { - use tokio::io::AsyncWriteExt; - - let size = 1024 * 1024; - - let stri = "x".repeat(size); - let data = format!("\"{}\"\n", stri); - for _ in 0..100 { - fd2.write_all(data.as_bytes()).await?; - } - Ok::<_, std::io::Error>(()) - }); - - let start = std::time::Instant::now(); - let mut bytes = 0; - - let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - loop { - let Some(msgs) = ipc.read_msg().await? else { - break; - }; - bytes += msgs.as_str().unwrap().len(); - if start.elapsed().as_secs() > 5 { - break; - } - } - let elapsed = start.elapsed(); - let mb = bytes as f64 / 1024.0 / 1024.0; - println!("{} mb/s", mb / elapsed.as_secs_f64()); - - child.await??; - - Ok(()) - } - - #[tokio::test] - async fn unix_ipc_json() -> Result<(), Box> { - let (ipc, mut fd2) = pair().await; - let child = tokio::spawn(async move { - use tokio::io::AsyncReadExt; - use tokio::io::AsyncWriteExt; - - const EXPECTED: &[u8] = b"\"hello\"\n"; - let mut buf = [0u8; EXPECTED.len()]; - let n = fd2.read_exact(&mut buf).await?; - assert_eq!(&buf[..n], EXPECTED); - fd2.write_all(b"\"world\"\n").await?; - - Ok::<_, std::io::Error>(()) - }); - - ipc - .clone() - .write_msg_bytes(&json_to_bytes(json!("hello"))) - .await?; - - let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?.unwrap(); - assert_eq!(msgs, json!("world")); - - child.await??; - - Ok(()) - } - - fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec { - let mut buf = deno_core::serde_json::to_vec(&v).unwrap(); - buf.push(b'\n'); - buf - } - - #[tokio::test] - async fn unix_ipc_json_multi() -> Result<(), Box> { - let (ipc, mut fd2) = pair().await; - let child = tokio::spawn(async move { - use tokio::io::AsyncReadExt; - use tokio::io::AsyncWriteExt; - - const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; - let mut buf = [0u8; EXPECTED.len()]; - let n = fd2.read_exact(&mut buf).await?; - assert_eq!(&buf[..n], EXPECTED); - fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; - Ok::<_, std::io::Error>(()) - }); - - ipc - .clone() - .write_msg_bytes(&json_to_bytes(json!("hello"))) - .await?; - ipc - .clone() - .write_msg_bytes(&json_to_bytes(json!("world"))) - .await?; - - let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?.unwrap(); - assert_eq!(msgs, json!("foo")); - - child.await??; - - Ok(()) - } - - #[tokio::test] - async fn unix_ipc_json_invalid() -> Result<(), Box> { - let (ipc, mut fd2) = pair().await; - let child = tokio::spawn(async move { - tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; - Ok::<_, std::io::Error>(()) - }); - - let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let _err = ipc.read_msg().await.unwrap_err(); - - child.await??; - - Ok(()) - } - - #[test] - fn memchr() { - let str = b"hello world"; - assert_eq!(super::memchr(b'h', str), Some(0)); - assert_eq!(super::memchr(b'w', str), Some(6)); - assert_eq!(super::memchr(b'd', str), Some(10)); - assert_eq!(super::memchr(b'x', str), None); - - let empty = b""; - assert_eq!(super::memchr(b'\n', empty), None); - } - fn wrap_expr(s: &str) -> String { format!("(function () {{ return {s}; }})()") } diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts index 184b29bd2b..2bd8614275 100644 --- a/ext/node/polyfills/child_process.ts +++ b/ext/node/polyfills/child_process.ts @@ -53,7 +53,7 @@ import { convertToValidSignal, kEmptyObject, } from "ext:deno_node/internal/util.mjs"; -import { kNeedsNpmProcessState } from "ext:runtime/40_process.js"; +import { kNeedsNpmProcessState } from "ext:deno_process/40_process.js"; const MAX_BUFFER = 1024 * 1024; diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 569ee7d328..9c9e084787 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -61,7 +61,7 @@ import { kExtraStdio, kIpc, kNeedsNpmProcessState, -} from "ext:runtime/40_process.js"; +} from "ext:deno_process/40_process.js"; export function mapValues( record: Readonly>, diff --git a/runtime/js/40_process.js b/ext/process/40_process.js similarity index 100% rename from runtime/js/40_process.js rename to ext/process/40_process.js diff --git a/ext/process/Cargo.toml b/ext/process/Cargo.toml new file mode 100644 index 0000000000..26f7a41a5e --- /dev/null +++ b/ext/process/Cargo.toml @@ -0,0 +1,41 @@ +# Copyright 2018-2025 the Deno authors. MIT license. + +[package] +name = "deno_process" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +repository.workspace = true +description = "Subprocess APIs for Deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core.workspace = true +deno_error.workspace = true +deno_fs.workspace = true +deno_io.workspace = true +deno_os.workspace = true +deno_path_util.workspace = true +deno_permissions.workspace = true +libc.workspace = true +log.workspace = true +memchr = "2.7.4" +pin-project-lite = "0.2.13" +rand.workspace = true +serde.workspace = true +simd-json = "0.14.0" +tempfile.workspace = true +thiserror.workspace = true +tokio.workspace = true +which.workspace = true + +[target.'cfg(unix)'.dependencies] +nix = { workspace = true, features = ["signal", "process"] } + +[target.'cfg(windows)'.dependencies] +winapi = { workspace = true, features = [] } +windows-sys.workspace = true diff --git a/ext/process/README.md b/ext/process/README.md new file mode 100644 index 0000000000..ac39ab83d6 --- /dev/null +++ b/ext/process/README.md @@ -0,0 +1,3 @@ +# deno_process + +This crate implements subprocess APIs for Deno diff --git a/ext/process/ipc.rs b/ext/process/ipc.rs new file mode 100644 index 0000000000..3728943457 --- /dev/null +++ b/ext/process/ipc.rs @@ -0,0 +1,558 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +#![allow(unused)] + +use std::cell::RefCell; +use std::future::Future; +use std::io; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicUsize; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use deno_core::serde; +use deno_core::serde_json; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::ExternalOpsTracker; +use deno_core::RcRef; +use deno_io::BiPipe; +use deno_io::BiPipeRead; +use deno_io::BiPipeWrite; +use memchr::memchr; +use pin_project_lite::pin_project; +use tokio::io::AsyncRead; +use tokio::io::AsyncWriteExt; +use tokio::io::ReadBuf; + +/// Tracks whether the IPC resources is currently +/// refed, and allows refing/unrefing it. +pub struct IpcRefTracker { + refed: AtomicBool, + tracker: OpsTracker, +} + +/// A little wrapper so we don't have to get an +/// `ExternalOpsTracker` for tests. When we aren't +/// cfg(test), this will get optimized out. +enum OpsTracker { + External(ExternalOpsTracker), + #[cfg(test)] + Test, +} + +impl OpsTracker { + fn ref_(&self) { + match self { + Self::External(tracker) => tracker.ref_op(), + #[cfg(test)] + Self::Test => {} + } + } + + fn unref(&self) { + match self { + Self::External(tracker) => tracker.unref_op(), + #[cfg(test)] + Self::Test => {} + } + } +} + +impl IpcRefTracker { + pub fn new(tracker: ExternalOpsTracker) -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::External(tracker), + } + } + + #[cfg(test)] + fn new_test() -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::Test, + } + } + + pub fn ref_(&self) { + if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) { + self.tracker.ref_(); + } + } + + pub fn unref(&self) { + if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) { + self.tracker.unref(); + } + } +} + +pub struct IpcJsonStreamResource { + pub read_half: AsyncRefCell, + pub write_half: AsyncRefCell, + pub cancel: Rc, + pub queued_bytes: AtomicUsize, + pub ref_tracker: IpcRefTracker, +} + +impl deno_core::Resource for IpcJsonStreamResource { + fn close(self: Rc) { + self.cancel.cancel(); + } +} + +impl IpcJsonStreamResource { + pub fn new( + stream: i64, + ref_tracker: IpcRefTracker, + ) -> Result { + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, + }) + } + + #[cfg(all(unix, test))] + pub fn from_stream( + stream: tokio::net::UnixStream, + ref_tracker: IpcRefTracker, + ) -> Self { + let (read_half, write_half) = stream.into_split(); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), + cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, + } + } + + #[cfg(all(windows, test))] + pub fn from_stream( + pipe: tokio::net::windows::named_pipe::NamedPipeClient, + ref_tracker: IpcRefTracker, + ) -> Self { + let (read_half, write_half) = tokio::io::split(pipe); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), + cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, + } + } + + /// writes _newline terminated_ JSON message to the IPC pipe. + pub async fn write_msg_bytes( + self: Rc, + msg: &[u8], + ) -> Result<(), io::Error> { + let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await; + write_half.write_all(msg).await?; + Ok(()) + } +} + +// Initial capacity of the buffered reader and the JSON backing buffer. +// +// This is a tradeoff between memory usage and performance on large messages. +// +// 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. +pub const INITIAL_CAPACITY: usize = 1024 * 64; + +/// A buffer for reading from the IPC pipe. +/// Similar to the internal buffer of `tokio::io::BufReader`. +/// +/// This exists to provide buffered reading while granting mutable access +/// to the internal buffer (which isn't exposed through `tokio::io::BufReader` +/// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input +/// buffer for parsing, so this allows us to use the read buffer directly as the +/// input buffer without a copy (provided the message fits). +struct ReadBuffer { + buffer: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl ReadBuffer { + fn new() -> Self { + Self { + buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(), + pos: 0, + cap: 0, + } + } + + fn get_mut(&mut self) -> &mut [u8] { + &mut self.buffer + } + + fn available_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.pos..self.cap] + } + + fn consume(&mut self, n: usize) { + self.pos = std::cmp::min(self.pos + n, self.cap); + } + + fn needs_fill(&self) -> bool { + self.pos >= self.cap + } +} + +#[derive(Debug, thiserror::Error, deno_error::JsError)] +pub enum IpcJsonStreamError { + #[class(inherit)] + #[error("{0}")] + Io(#[source] std::io::Error), + #[class(generic)] + #[error("{0}")] + SimdJson(#[source] simd_json::Error), +} + +// JSON serialization stream over IPC pipe. +// +// `\n` is used as a delimiter between messages. +pub struct IpcJsonStream { + pipe: BiPipeRead, + buffer: Vec, + read_buffer: ReadBuffer, +} + +impl IpcJsonStream { + fn new(pipe: BiPipeRead) -> Self { + Self { + pipe, + buffer: Vec::with_capacity(INITIAL_CAPACITY), + read_buffer: ReadBuffer::new(), + } + } + + pub async fn read_msg( + &mut self, + ) -> Result, IpcJsonStreamError> { + let mut json = None; + let nread = read_msg_inner( + &mut self.pipe, + &mut self.buffer, + &mut json, + &mut self.read_buffer, + ) + .await + .map_err(IpcJsonStreamError::Io)?; + if nread == 0 { + // EOF. + return Ok(None); + } + + let json = match json { + Some(v) => v, + None => { + // Took more than a single read and some buffering. + simd_json::from_slice(&mut self.buffer[..nread]) + .map_err(IpcJsonStreamError::SimdJson)? + } + }; + + // Safety: Same as `Vec::clear` but without the `drop_in_place` for + // each element (nop for u8). Capacity remains the same. + unsafe { + self.buffer.set_len(0); + } + + Ok(Some(json)) + } +} + +pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgInner<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec, + json: &'a mut Option, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + read_buffer: &'a mut ReadBuffer, + } +} + +fn read_msg_inner<'a, R>( + reader: &'a mut R, + buf: &'a mut Vec, + json: &'a mut Option, + read_buffer: &'a mut ReadBuffer, +) -> ReadMsgInner<'a, R> +where + R: AsyncRead + ?Sized + Unpin, +{ + ReadMsgInner { + reader, + buf, + json, + read: 0, + read_buffer, + } +} + +fn read_msg_internal( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec, + read_buffer: &mut ReadBuffer, + json: &mut Option, + read: &mut usize, +) -> Poll> { + loop { + let (done, used) = { + // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer. + if read_buffer.needs_fill() { + let mut read_buf = ReadBuf::new(read_buffer.get_mut()); + ready!(reader.as_mut().poll_read(cx, &mut read_buf))?; + read_buffer.cap = read_buf.filled().len(); + read_buffer.pos = 0; + } + let available = read_buffer.available_mut(); + if let Some(i) = memchr(b'\n', available) { + if *read == 0 { + // Fast path: parse and put into the json slot directly. + json.replace( + simd_json::from_slice(&mut available[..i + 1]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } else { + // This is not the first read, so we have to copy the data + // to make it contiguous. + buf.extend_from_slice(&available[..=i]); + } + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + + read_buffer.consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } +} + +impl Future for ReadMsgInner<'_, R> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + read_msg_internal( + Pin::new(*me.reader), + cx, + me.buf, + me.read_buffer, + me.json, + me.read, + ) + } +} + +#[cfg(test)] +mod tests { + use std::rc::Rc; + + use deno_core::serde_json::json; + use deno_core::v8; + use deno_core::JsRuntime; + use deno_core::RcRef; + use deno_core::RuntimeOptions; + + use super::IpcJsonStreamResource; + + #[allow(clippy::unused_async)] + #[cfg(unix)] + pub async fn pair() -> (Rc, tokio::net::UnixStream) { + let (a, b) = tokio::net::UnixStream::pair().unwrap(); + + /* Similar to how ops would use the resource */ + let a = Rc::new(IpcJsonStreamResource::from_stream( + a, + super::IpcRefTracker::new_test(), + )); + (a, b) + } + + #[cfg(windows)] + pub async fn pair() -> ( + Rc, + tokio::net::windows::named_pipe::NamedPipeServer, + ) { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::net::windows::named_pipe::ServerOptions; + + let name = + format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::()); + + let server = ServerOptions::new().create(name.clone()).unwrap(); + let client = ClientOptions::new().open(name).unwrap(); + + server.connect().await.unwrap(); + /* Similar to how ops would use the resource */ + let client = Rc::new(IpcJsonStreamResource::from_stream( + client, + super::IpcRefTracker::new_test(), + )); + (client, server) + } + + #[allow(clippy::print_stdout)] + #[tokio::test] + async fn bench_ipc() -> Result<(), Box> { + // A simple round trip benchmark for quick dev feedback. + // + // Only ran when the env var is set. + if std::env::var_os("BENCH_IPC_DENO").is_none() { + return Ok(()); + } + + let (ipc, mut fd2) = pair().await; + let child = tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + let size = 1024 * 1024; + + let stri = "x".repeat(size); + let data = format!("\"{}\"\n", stri); + for _ in 0..100 { + fd2.write_all(data.as_bytes()).await?; + } + Ok::<_, std::io::Error>(()) + }); + + let start = std::time::Instant::now(); + let mut bytes = 0; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + loop { + let Some(msgs) = ipc.read_msg().await? else { + break; + }; + bytes += msgs.as_str().unwrap().len(); + if start.elapsed().as_secs() > 5 { + break; + } + } + let elapsed = start.elapsed(); + let mb = bytes as f64 / 1024.0 / 1024.0; + println!("{} mb/s", mb / elapsed.as_secs_f64()); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json() -> Result<(), Box> { + let (ipc, mut fd2) = pair().await; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + const EXPECTED: &[u8] = b"\"hello\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); + fd2.write_all(b"\"world\"\n").await?; + + Ok::<_, std::io::Error>(()) + }); + + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?.unwrap(); + assert_eq!(msgs, json!("world")); + + child.await??; + + Ok(()) + } + + fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec { + let mut buf = deno_core::serde_json::to_vec(&v).unwrap(); + buf.push(b'\n'); + buf + } + + #[tokio::test] + async fn unix_ipc_json_multi() -> Result<(), Box> { + let (ipc, mut fd2) = pair().await; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); + fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("world"))) + .await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?.unwrap(); + assert_eq!(msgs, json!("foo")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_invalid() -> Result<(), Box> { + let (ipc, mut fd2) = pair().await; + let child = tokio::spawn(async move { + tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let _err = ipc.read_msg().await.unwrap_err(); + + child.await??; + + Ok(()) + } + + #[test] + fn memchr() { + let str = b"hello world"; + assert_eq!(super::memchr(b'h', str), Some(0)); + assert_eq!(super::memchr(b'w', str), Some(6)); + assert_eq!(super::memchr(b'd', str), Some(10)); + assert_eq!(super::memchr(b'x', str), None); + + let empty = b""; + assert_eq!(super::memchr(b'\n', empty), None); + } +} diff --git a/runtime/ops/process.rs b/ext/process/lib.rs similarity index 98% rename from runtime/ops/process.rs rename to ext/process/lib.rs index fc32f7e066..24985a8048 100644 --- a/runtime/ops/process.rs +++ b/ext/process/lib.rs @@ -38,6 +38,10 @@ use serde::Deserialize; use serde::Serialize; use tokio::process::Command; +pub mod ipc; +use ipc::IpcJsonStreamResource; +use ipc::IpcRefTracker; + pub const UNSTABLE_FEATURE_NAME: &str = "process"; #[derive(Copy, Clone, Eq, PartialEq, Deserialize)] @@ -153,6 +157,7 @@ deno_core::extension!( deprecated::op_run_status, deprecated::op_kill, ], + esm = ["40_process.js"], options = { get_npm_process_state: Option }, state = |state, options| { state.put::(options.get_npm_process_state.unwrap_or(deno_fs::sync::MaybeArc::new(EmptyNpmProcessStateProvider))); @@ -462,13 +467,10 @@ fn create_command( fds_to_dup.push((ipc_fd2, ipc)); fds_to_close.push(ipc_fd2); /* One end returned to parent process (this) */ - let pipe_rid = - state - .resource_table - .add(deno_node::IpcJsonStreamResource::new( - ipc_fd1 as _, - deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), - )?); + let pipe_rid = state.resource_table.add(IpcJsonStreamResource::new( + ipc_fd1 as _, + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?); /* The other end passed to child process via NODE_CHANNEL_FD */ command.env("NODE_CHANNEL_FD", format!("{}", ipc)); ipc_rid = Some(pipe_rid); @@ -532,12 +534,11 @@ fn create_command( let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?; /* One end returned to parent process (this) */ - let pipe_rid = Some(state.resource_table.add( - deno_node::IpcJsonStreamResource::new( + let pipe_rid = + Some(state.resource_table.add(IpcJsonStreamResource::new( hd1 as i64, - deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), - )?, - )); + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?)); /* The other end passed to child process via NODE_CHANNEL_FD */ command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64)); diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b56b4a2b50..b87d4cfbdf 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -60,6 +60,7 @@ deno_kv.workspace = true deno_tls.workspace = true deno_url.workspace = true deno_web.workspace = true +deno_process.workspace = true deno_webgpu.workspace = true deno_webidl.workspace = true deno_websocket.workspace = true @@ -93,6 +94,7 @@ deno_node.workspace = true deno_os.workspace = true deno_path_util.workspace = true deno_permissions.workspace = true +deno_process.workspace = true deno_resolver.workspace = true deno_telemetry.workspace = true deno_terminal.workspace = true diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index b241028077..5aaf0614dc 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -23,7 +23,7 @@ import * as io from "ext:deno_io/12_io.js"; import * as fs from "ext:deno_fs/30_fs.js"; import * as os from "ext:deno_os/30_os.js"; import * as fsEvents from "ext:runtime/40_fs_events.js"; -import * as process from "ext:runtime/40_process.js"; +import * as process from "ext:deno_process/40_process.js"; import * as signals from "ext:deno_os/40_signals.js"; import * as tty from "ext:runtime/40_tty.js"; import * as kv from "ext:deno_kv/01_db.ts"; diff --git a/runtime/lib.rs b/runtime/lib.rs index 65d3e88bae..c83fe5d60b 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -18,6 +18,7 @@ pub use deno_net; pub use deno_node; pub use deno_os; pub use deno_permissions; +pub use deno_process; pub use deno_telemetry; pub use deno_terminal::colors; pub use deno_tls; @@ -115,7 +116,7 @@ pub static UNSTABLE_GRANULAR_FLAGS: &[UnstableGranularFlag] = &[ }, // TODO(bartlomieju): consider removing it UnstableGranularFlag { - name: ops::process::UNSTABLE_FEATURE_NAME, + name: deno_process::UNSTABLE_FEATURE_NAME, help_text: "Enable unstable process APIs", show_in_help: false, id: 10, diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index d131e9aab5..04065ff2f8 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -4,7 +4,6 @@ pub mod bootstrap; pub mod fs_events; pub mod http; pub mod permissions; -pub mod process; pub mod runtime; pub mod tty; pub mod web_worker; diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index d9912839b8..a929b7d18a 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -50,14 +50,13 @@ impl TtyModeStore { } } +#[cfg(unix)] +use deno_process::JsNixError; #[cfg(windows)] use winapi::shared::minwindef::DWORD; #[cfg(windows)] use winapi::um::wincon; -#[cfg(unix)] -use crate::ops::process::JsNixError; - deno_core::extension!( deno_tty, ops = [op_set_raw, op_console_size, op_read_line_prompt], diff --git a/runtime/shared.rs b/runtime/shared.rs index 0e747b0565..ecf2088fe1 100644 --- a/runtime/shared.rs +++ b/runtime/shared.rs @@ -43,7 +43,6 @@ extension!(runtime, "10_permissions.js", "11_workers.js", "40_fs_events.js", - "40_process.js", "40_tty.js", "41_prompt.js", "90_deno_ns.js", diff --git a/runtime/snapshot.rs b/runtime/snapshot.rs index 08ea17a986..eec8579e59 100644 --- a/runtime/snapshot.rs +++ b/runtime/snapshot.rs @@ -311,6 +311,7 @@ pub fn create_runtime_snapshot( deno_io::deno_io::init_ops_and_esm(Default::default()), deno_fs::deno_fs::init_ops_and_esm::(fs.clone()), deno_os::deno_os::init_ops_and_esm(Default::default()), + deno_process::deno_process::init_ops_and_esm(Default::default()), deno_node::deno_node::init_ops_and_esm::< Permissions, DenoInNpmPackageChecker, @@ -325,7 +326,6 @@ pub fn create_runtime_snapshot( ), ops::fs_events::deno_fs_events::init_ops(), ops::permissions::deno_permissions::init_ops(), - ops::process::deno_process::init_ops(None), ops::tty::deno_tty::init_ops(), ops::http::deno_http_runtime::init_ops(), ops::bootstrap::deno_bootstrap::init_ops(Some(snapshot_options)), diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index e4ea42a2f7..bb769c46a9 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -44,6 +44,7 @@ use deno_kv::dynamic::MultiBackendDbHandler; use deno_node::ExtNodeSys; use deno_node::NodeExtInitServices; use deno_permissions::PermissionsContainer; +use deno_process::NpmProcessStateProviderRc; use deno_terminal::colors; use deno_tls::RootCertStoreProvider; use deno_tls::TlsKeys; @@ -59,7 +60,6 @@ use node_resolver::NpmPackageFolderResolver; use crate::inspector_server::InspectorServer; use crate::ops; -use crate::ops::process::NpmProcessStateProviderRc; use crate::shared::maybe_transpile_source; use crate::shared::runtime; use crate::tokio_util::create_and_run_current_thread; @@ -529,6 +529,9 @@ impl WebWorker { services.fs.clone(), ), deno_os::deno_os_worker::init_ops_and_esm(), + deno_process::deno_process::init_ops_and_esm( + services.npm_process_state_provider, + ), deno_node::deno_node::init_ops_and_esm::< PermissionsContainer, TInNpmPackageChecker, @@ -543,9 +546,6 @@ impl WebWorker { ), ops::fs_events::deno_fs_events::init_ops_and_esm(), ops::permissions::deno_permissions::init_ops_and_esm(), - ops::process::deno_process::init_ops_and_esm( - services.npm_process_state_provider, - ), ops::tty::deno_tty::init_ops_and_esm(), ops::http::deno_http_runtime::init_ops_and_esm(), ops::bootstrap::deno_bootstrap::init_ops_and_esm( diff --git a/runtime/worker.rs b/runtime/worker.rs index 426383a19e..72eb54ec47 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -40,6 +40,7 @@ use deno_node::ExtNodeSys; use deno_node::NodeExtInitServices; use deno_os::ExitCode; use deno_permissions::PermissionsContainer; +use deno_process::NpmProcessStateProviderRc; use deno_tls::RootCertStoreProvider; use deno_tls::TlsKeys; use deno_web::BlobStore; @@ -51,7 +52,6 @@ use crate::code_cache::CodeCache; use crate::code_cache::CodeCacheType; use crate::inspector_server::InspectorServer; use crate::ops; -use crate::ops::process::NpmProcessStateProviderRc; use crate::shared::maybe_transpile_source; use crate::shared::runtime; use crate::BootstrapOptions; @@ -428,6 +428,9 @@ impl MainWorker { services.fs.clone(), ), deno_os::deno_os::init_ops_and_esm(exit_code.clone()), + deno_process::deno_process::init_ops_and_esm( + services.npm_process_state_provider, + ), deno_node::deno_node::init_ops_and_esm::< PermissionsContainer, TInNpmPackageChecker, @@ -442,9 +445,6 @@ impl MainWorker { ), ops::fs_events::deno_fs_events::init_ops_and_esm(), ops::permissions::deno_permissions::init_ops_and_esm(), - ops::process::deno_process::init_ops_and_esm( - services.npm_process_state_provider, - ), ops::tty::deno_tty::init_ops_and_esm(), ops::http::deno_http_runtime::init_ops_and_esm(), ops::bootstrap::deno_bootstrap::init_ops_and_esm( diff --git a/tools/core_import_map.json b/tools/core_import_map.json index 0176f47e23..935c7179a1 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -244,7 +244,7 @@ "ext:runtime/11_workers.js": "../runtime/js/11_workers.js", "ext:deno_os/30_os.js": "../ext/os/30_os.js", "ext:runtime/40_fs_events.js": "../runtime/js/40_fs_events.js", - "ext:runtime/40_process.js": "../runtime/js/40_process.js", + "ext:deno_process/40_process.js": "../ext/process/40_process.js", "ext:deno_os/40_signals.js": "../ext/os/40_signals.js", "ext:runtime/40_tty.js": "../runtime/js/40_tty.js", "ext:runtime/41_prompt.js": "../runtime/js/41_prompt.js",