1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 13:00:36 -05:00

fix(node): Rework node:child_process IPC (#24763)

Fixes https://github.com/denoland/deno/issues/24756. Fixes
https://github.com/denoland/deno/issues/24796.

This also gets vitest working when using
[`--pool=forks`](https://vitest.dev/guide/improving-performance#pool)
(which is the default as of vitest 2.0). Ref
https://github.com/denoland/deno/issues/23882.

---

This PR resolves a handful of issues with child_process IPC. In
particular:

- We didn't support sending typed array views over IPC
- Opening an IPC channel resulted in the event loop never exiting
- Sending a `null` over IPC would terminate the channel
- There was some UB in the read implementation (transmuting an `&[u8]`
to `&mut [u8]`)
- The `send` method wasn't returning anything, so there was no way to
signal backpressure (this also resulted in the benchmark
`child_process_ipc.mjs` being misleading, as it tried to respect
backpressure. That gave node much worse results at larger message sizes,
and gave us much worse results at smaller message sizes).
- We weren't setting up the `channel` property on the `process` global
(or on the `ChildProcess` object), and also didn't have a way to
ref/unref the channel
- Calling `kill` multiple times (or disconnecting the channel, then
calling kill) would throw an error
- Node couldn't spawn a deno subprocess and communicate with it over IPC
This commit is contained in:
Nathan Whitaker 2024-07-30 16:13:24 -07:00 committed by GitHub
parent 3659781f88
commit cd59fc53a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 897 additions and 178 deletions

5
Cargo.lock generated
View file

@ -1770,6 +1770,7 @@ dependencies = [
"libz-sys", "libz-sys",
"md-5", "md-5",
"md4", "md4",
"memchr",
"node_resolver", "node_resolver",
"num-bigint", "num-bigint",
"num-bigint-dig", "num-bigint-dig",
@ -4152,9 +4153,9 @@ dependencies = [
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.2" version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]] [[package]]
name = "memmap2" name = "memmap2"

View file

@ -1068,10 +1068,10 @@ impl CliOptions {
} }
pub fn node_ipc_fd(&self) -> Option<i64> { pub fn node_ipc_fd(&self) -> Option<i64> {
let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); let maybe_node_channel_fd = std::env::var("NODE_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd { if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable. // Remove so that child processes don't inherit this environment variable.
std::env::remove_var("DENO_CHANNEL_FD"); std::env::remove_var("NODE_CHANNEL_FD");
node_channel_fd.parse::<i64>().ok() node_channel_fd.parse::<i64>().ok()
} else { } else {
None None

View file

@ -57,6 +57,7 @@ libc.workspace = true
libz-sys.workspace = true libz-sys.workspace = true
md-5 = { version = "0.10.5", features = ["oid"] } md-5 = { version = "0.10.5", features = ["oid"] }
md4 = "0.10.2" md4 = "0.10.2"
memchr = "2.7.4"
node_resolver.workspace = true node_resolver.workspace = true
num-bigint.workspace = true num-bigint.workspace = true
num-bigint-dig = "0.8.2" num-bigint-dig = "0.8.2"

View file

@ -5,10 +5,20 @@ import { setImmediate } from "node:timers";
if (process.env.CHILD) { if (process.env.CHILD) {
const len = +process.env.CHILD; const len = +process.env.CHILD;
const msg = ".".repeat(len); const msg = ".".repeat(len);
let waiting = false;
const send = () => { const send = () => {
while (process.send(msg)); while (
process.send(msg, undefined, undefined, (_e) => {
if (waiting) {
waiting = false;
setImmediate(send);
}
})
);
// Wait: backlog of unsent messages exceeds threshold // Wait: backlog of unsent messages exceeds threshold
setImmediate(send); // once the message is sent, the callback will be called
// and we'll resume
waiting = true;
}; };
send(); send();
} else { } else {

View file

@ -30,6 +30,7 @@ pub use deno_package_json::PackageJson;
pub use node_resolver::PathClean; pub use node_resolver::PathClean;
pub use ops::ipc::ChildPipeFd; pub use ops::ipc::ChildPipeFd;
pub use ops::ipc::IpcJsonStreamResource; pub use ops::ipc::IpcJsonStreamResource;
pub use ops::ipc::IpcRefTracker;
use ops::vm; use ops::vm;
pub use ops::vm::create_v8_context; pub use ops::vm::create_v8_context;
pub use ops::vm::init_global_template; pub use ops::vm::init_global_template;
@ -380,6 +381,8 @@ deno_core::extension!(deno_node,
ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write, ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read, ops::ipc::op_node_ipc_read,
ops::ipc::op_node_ipc_ref,
ops::ipc::op_node_ipc_unref,
ops::process::op_node_process_kill, ops::process::op_node_process_kill,
ops::process::op_process_abort, ops::process::op_process_abort,
], ],

View file

@ -15,23 +15,33 @@ mod impl_ {
use std::os::fd::RawFd; use std::os::fd::RawFd;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::task::ready;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use deno_core::error::bad_resource_id; use deno_core::error::bad_resource_id;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::op2; use deno_core::op2;
use deno_core::serde;
use deno_core::serde::Serializer;
use deno_core::serde_json; use deno_core::serde_json;
use deno_core::v8;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
use deno_core::CancelFuture; use deno_core::CancelFuture;
use deno_core::CancelHandle; use deno_core::CancelHandle;
use deno_core::ExternalOpsTracker;
use deno_core::OpState; use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_core::ToV8;
use memchr::memchr;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use tokio::io::AsyncBufRead; use serde::Serialize;
use tokio::io::AsyncRead;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::io::BufReader; use tokio::io::ReadBuf;
#[cfg(unix)] #[cfg(unix)]
use tokio::net::unix::OwnedReadHalf; use tokio::net::unix::OwnedReadHalf;
@ -43,6 +53,116 @@ mod impl_ {
#[cfg(windows)] #[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b>(
RefCell<&'b mut v8::HandleScope<'a>>,
v8::Local<'a, v8::Value>,
);
impl<'a, 'b> Serialize for SerializeWrapper<'a, 'b> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
}
}
/// Serialize a v8 value directly into a serde serializer.
/// This allows us to go from v8 values to JSON without having to
/// deserialize into a `serde_json::Value` and then reserialize to JSON
fn serialize_v8_value<'a, S: Serializer>(
scope: &mut v8::HandleScope<'a>,
value: v8::Local<'a, v8::Value>,
ser: S,
) -> Result<S::Ok, S::Error> {
use serde::ser::Error;
if value.is_null_or_undefined() {
ser.serialize_unit()
} else if value.is_number() || value.is_number_object() {
let num_value = value.number_value(scope).unwrap();
if (num_value as i64 as f64) == num_value {
ser.serialize_i64(num_value as i64)
} else {
ser.serialize_f64(num_value)
}
} else if value.is_string() {
let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
ser.serialize_str(&str)
} else if value.is_string_object() {
let str =
deno_core::serde_v8::to_utf8(value.to_string(scope).unwrap(), scope);
ser.serialize_str(&str)
} else if value.is_boolean() {
ser.serialize_bool(value.is_true())
} else if value.is_boolean_object() {
ser.serialize_bool(value.boolean_value(scope))
} else if value.is_array() {
use serde::ser::SerializeSeq;
let array = v8::Local::<v8::Array>::try_from(value).unwrap();
let length = array.length();
let mut seq = ser.serialize_seq(Some(length as usize))?;
for i in 0..length {
let element = array.get_index(scope, i).unwrap();
seq
.serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
}
seq.end()
} else if value.is_object() {
use serde::ser::SerializeMap;
if value.is_array_buffer_view() {
let buffer = v8::Local::<v8::ArrayBufferView>::try_from(value).unwrap();
let mut buf = vec![0u8; buffer.byte_length()];
let copied = buffer.copy_contents(&mut buf);
assert_eq!(copied, buf.len());
return ser.serialize_bytes(&buf);
}
let object = value.to_object(scope).unwrap();
// node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects)
// we need to respect the `toJSON` method if it exists.
let to_json_key = v8::String::new_from_utf8(
scope,
b"toJSON",
v8::NewStringType::Internalized,
)
.unwrap()
.into();
if let Some(to_json) = object.get(scope, to_json_key) {
if to_json.is_function() {
let to_json = v8::Local::<v8::Function>::try_from(to_json).unwrap();
let json_value = to_json.call(scope, object.into(), &[]).unwrap();
return serialize_v8_value(scope, json_value, ser);
}
}
let keys = object
.get_own_property_names(
scope,
v8::GetPropertyNamesArgs {
..Default::default()
},
)
.unwrap();
let num_keys = keys.length();
let mut map = ser.serialize_map(Some(num_keys as usize))?;
for i in 0..num_keys {
let key = keys.get_index(scope, i).unwrap();
let key_str = key.to_rust_string_lossy(scope);
let value = object.get(scope, key).unwrap();
map.serialize_entry(
&key_str,
&SerializeWrapper(RefCell::new(scope), value),
)?;
}
map.end()
} else {
// TODO(nathanwhit): better error message
Err(S::Error::custom(deno_core::error::type_error(
"Unsupported type",
)))
}
}
// Open IPC pipe from bootstrap options. // Open IPC pipe from bootstrap options.
#[op2] #[op2]
#[smi] #[smi]
@ -53,25 +173,66 @@ mod impl_ {
Some(child_pipe_fd) => child_pipe_fd.0, Some(child_pipe_fd) => child_pipe_fd.0,
None => return Ok(None), None => return Ok(None),
}; };
let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
Ok(Some( Ok(Some(
state.resource_table.add(IpcJsonStreamResource::new(fd)?), state
.resource_table
.add(IpcJsonStreamResource::new(fd, ref_tracker)?),
)) ))
} }
#[op2(async)] #[op2(async)]
pub async fn op_node_ipc_write( pub fn op_node_ipc_write<'a>(
scope: &mut v8::HandleScope<'a>,
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
#[serde] value: serde_json::Value, value: v8::Local<'a, v8::Value>,
) -> Result<(), AnyError> { // using an array as an "out parameter".
// index 0 is a boolean indicating whether the queue is under the limit.
//
// ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
// supported by `op2` currently.
queue_ok: v8::Local<'a, v8::Array>,
) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> {
let mut serialized = Vec::with_capacity(64);
let mut ser = serde_json::Serializer::new(&mut serialized);
serialize_v8_value(scope, value, &mut ser).map_err(|e| {
deno_core::error::type_error(format!(
"failed to serialize json value: {e}"
))
})?;
serialized.push(b'\n');
let stream = state let stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<IpcJsonStreamResource>(rid) .get::<IpcJsonStreamResource>(rid)
.map_err(|_| bad_resource_id())?; .map_err(|_| bad_resource_id())?;
stream.write_msg(value).await?; let old = stream
Ok(()) .queued_bytes
.fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
if old + serialized.len() > 2 * INITIAL_CAPACITY {
// sending messages too fast
let v = false.to_v8(scope)?;
queue_ok.set_index(scope, 0, v);
}
Ok(async move {
stream.clone().write_msg_bytes(&serialized).await?;
stream
.queued_bytes
.fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
Ok(())
})
}
/// Value signaling that the other end ipc channel has closed.
///
/// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`)
/// for internal use, so we use it here as well to avoid breaking anyone.
fn stop_sentinel() -> serde_json::Value {
serde_json::json!({
"cmd": "NODE_CLOSE"
})
} }
#[op2(async)] #[op2(async)]
@ -89,7 +250,92 @@ mod impl_ {
let cancel = stream.cancel.clone(); let cancel = stream.cancel.clone();
let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
let msgs = stream.read_msg().or_cancel(cancel).await??; let msgs = stream.read_msg().or_cancel(cancel).await??;
Ok(msgs) if let Some(msg) = msgs {
Ok(msg)
} else {
Ok(stop_sentinel())
}
}
#[op2(fast)]
pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.ref_();
}
#[op2(fast)]
pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.unref();
}
/// Tracks whether the IPC resources is currently
/// refed, and allows refing/unrefing it.
pub struct IpcRefTracker {
refed: AtomicBool,
tracker: OpsTracker,
}
/// A little wrapper so we don't have to get an
/// `ExternalOpsTracker` for tests. When we aren't
/// cfg(test), this will get optimized out.
enum OpsTracker {
External(ExternalOpsTracker),
#[cfg(test)]
Test,
}
impl OpsTracker {
fn ref_(&self) {
match self {
Self::External(tracker) => tracker.ref_op(),
#[cfg(test)]
Self::Test => {}
}
}
fn unref(&self) {
match self {
Self::External(tracker) => tracker.unref_op(),
#[cfg(test)]
Self::Test => {}
}
}
}
impl IpcRefTracker {
pub fn new(tracker: ExternalOpsTracker) -> Self {
Self {
refed: AtomicBool::new(false),
tracker: OpsTracker::External(tracker),
}
}
#[cfg(test)]
fn new_test() -> Self {
Self {
refed: AtomicBool::new(false),
tracker: OpsTracker::Test,
}
}
fn ref_(&self) {
if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) {
self.tracker.ref_();
}
}
fn unref(&self) {
if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) {
self.tracker.unref();
}
}
} }
pub struct IpcJsonStreamResource { pub struct IpcJsonStreamResource {
@ -99,6 +345,8 @@ mod impl_ {
#[cfg(windows)] #[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
queued_bytes: AtomicUsize,
ref_tracker: IpcRefTracker,
} }
impl deno_core::Resource for IpcJsonStreamResource { impl deno_core::Resource for IpcJsonStreamResource {
@ -134,64 +382,56 @@ mod impl_ {
} }
impl IpcJsonStreamResource { impl IpcJsonStreamResource {
pub fn new(stream: i64) -> Result<Self, std::io::Error> { pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?; let (read_half, write_half) = pipe(stream as _)?;
Ok(Self { Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half), write_half: AsyncRefCell::new(write_half),
cancel: Default::default(), cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
}) })
} }
#[cfg(unix)] #[cfg(all(unix, test))]
#[cfg(test)] fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
fn from_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
Self { Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half), write_half: AsyncRefCell::new(write_half),
cancel: Default::default(), cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
} }
} }
#[cfg(windows)] #[cfg(all(windows, test))]
#[cfg(test)] fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
fn from_stream(pipe: NamedPipeClient) -> Self {
let (read_half, write_half) = tokio::io::split(pipe); let (read_half, write_half) = tokio::io::split(pipe);
Self { Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half), write_half: AsyncRefCell::new(write_half),
cancel: Default::default(), cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
} }
} }
async fn write_msg( /// writes _newline terminated_ JSON message to the IPC pipe.
async fn write_msg_bytes(
self: Rc<Self>, self: Rc<Self>,
msg: serde_json::Value, msg: &[u8],
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let mut write_half = let mut write_half =
RcRef::map(self, |r| &r.write_half).borrow_mut().await; RcRef::map(self, |r| &r.write_half).borrow_mut().await;
// Perf note: We do not benefit from writev here because write_half.write_all(msg).await?;
// we are always allocating a buffer for serialization anyways.
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &msg)?;
buf.push(b'\n');
write_half.write_all(&buf).await?;
Ok(()) Ok(())
} }
} }
#[inline]
fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
// Safety: haystack of valid length. neon_memchr can handle unaligned
// data.
return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) };
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
return haystack.iter().position(|&b| b == needle);
}
// Initial capacity of the buffered reader and the JSON backing buffer. // Initial capacity of the buffered reader and the JSON backing buffer.
// //
// This is a tradeoff between memory usage and performance on large messages. // This is a tradeoff between memory usage and performance on large messages.
@ -199,41 +439,91 @@ mod impl_ {
// 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message.
const INITIAL_CAPACITY: usize = 1024 * 64; const INITIAL_CAPACITY: usize = 1024 * 64;
/// A buffer for reading from the IPC pipe.
/// Similar to the internal buffer of `tokio::io::BufReader`.
///
/// This exists to provide buffered reading while granting mutable access
/// to the internal buffer (which isn't exposed through `tokio::io::BufReader`
/// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input
/// buffer for parsing, so this allows us to use the read buffer directly as the
/// input buffer without a copy (provided the message fits).
struct ReadBuffer {
buffer: Box<[u8]>,
pos: usize,
cap: usize,
}
impl ReadBuffer {
fn new() -> Self {
Self {
buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(),
pos: 0,
cap: 0,
}
}
fn get_mut(&mut self) -> &mut [u8] {
&mut self.buffer
}
fn available_mut(&mut self) -> &mut [u8] {
&mut self.buffer[self.pos..self.cap]
}
fn consume(&mut self, n: usize) {
self.pos = std::cmp::min(self.pos + n, self.cap);
}
fn needs_fill(&self) -> bool {
self.pos >= self.cap
}
}
// JSON serialization stream over IPC pipe. // JSON serialization stream over IPC pipe.
// //
// `\n` is used as a delimiter between messages. // `\n` is used as a delimiter between messages.
struct IpcJsonStream { struct IpcJsonStream {
#[cfg(unix)] #[cfg(unix)]
pipe: BufReader<OwnedReadHalf>, pipe: OwnedReadHalf,
#[cfg(windows)] #[cfg(windows)]
pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>, pipe: tokio::io::ReadHalf<NamedPipeClient>,
buffer: Vec<u8>, buffer: Vec<u8>,
read_buffer: ReadBuffer,
} }
impl IpcJsonStream { impl IpcJsonStream {
#[cfg(unix)] #[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self { fn new(pipe: OwnedReadHalf) -> Self {
Self { Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY), buffer: Vec::with_capacity(INITIAL_CAPACITY),
read_buffer: ReadBuffer::new(),
} }
} }
#[cfg(windows)] #[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
Self { Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY), buffer: Vec::with_capacity(INITIAL_CAPACITY),
read_buffer: ReadBuffer::new(),
} }
} }
async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { async fn read_msg(
&mut self,
) -> Result<Option<serde_json::Value>, AnyError> {
let mut json = None; let mut json = None;
let nread = let nread = read_msg_inner(
read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; &mut self.pipe,
&mut self.buffer,
&mut json,
&mut self.read_buffer,
)
.await?;
if nread == 0 { if nread == 0 {
// EOF. // EOF.
return Ok(serde_json::Value::Null); return Ok(None);
} }
let json = match json { let json = match json {
@ -250,7 +540,7 @@ mod impl_ {
self.buffer.set_len(0); self.buffer.set_len(0);
} }
Ok(json) Ok(Some(json))
} }
} }
@ -263,6 +553,7 @@ mod impl_ {
// The number of bytes appended to buf. This can be less than buf.len() if // The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started. // the buffer was not empty when the operation was started.
read: usize, read: usize,
read_buffer: &'a mut ReadBuffer,
} }
} }
@ -270,43 +561,41 @@ mod impl_ {
reader: &'a mut R, reader: &'a mut R,
buf: &'a mut Vec<u8>, buf: &'a mut Vec<u8>,
json: &'a mut Option<serde_json::Value>, json: &'a mut Option<serde_json::Value>,
read_buffer: &'a mut ReadBuffer,
) -> ReadMsgInner<'a, R> ) -> ReadMsgInner<'a, R>
where where
R: AsyncBufRead + ?Sized + Unpin, R: AsyncRead + ?Sized + Unpin,
{ {
ReadMsgInner { ReadMsgInner {
reader, reader,
buf, buf,
json, json,
read: 0, read: 0,
read_buffer,
} }
} }
fn read_msg_internal<R: AsyncBufRead + ?Sized>( fn read_msg_internal<R: AsyncRead + ?Sized>(
mut reader: Pin<&mut R>, mut reader: Pin<&mut R>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut Vec<u8>, buf: &mut Vec<u8>,
read_buffer: &mut ReadBuffer,
json: &mut Option<serde_json::Value>, json: &mut Option<serde_json::Value>,
read: &mut usize, read: &mut usize,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
loop { loop {
let (done, used) = { let (done, used) = {
let available = match reader.as_mut().poll_fill_buf(cx) { // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer.
std::task::Poll::Ready(t) => t?, if read_buffer.needs_fill() {
std::task::Poll::Pending => return std::task::Poll::Pending, let mut read_buf = ReadBuf::new(read_buffer.get_mut());
}; ready!(reader.as_mut().poll_read(cx, &mut read_buf))?;
read_buffer.cap = read_buf.filled().len();
read_buffer.pos = 0;
}
let available = read_buffer.available_mut();
if let Some(i) = memchr(b'\n', available) { if let Some(i) = memchr(b'\n', available) {
if *read == 0 { if *read == 0 {
// Fast path: parse and put into the json slot directly. // Fast path: parse and put into the json slot directly.
//
// Safety: It is ok to overwrite the contents because
// we don't need to copy it into the buffer and the length will be reset.
let available = unsafe {
std::slice::from_raw_parts_mut(
available.as_ptr() as *mut u8,
available.len(),
)
};
json.replace( json.replace(
simd_json::from_slice(&mut available[..i + 1]) simd_json::from_slice(&mut available[..i + 1])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
@ -323,7 +612,7 @@ mod impl_ {
} }
}; };
reader.as_mut().consume(used); read_buffer.consume(used);
*read += used; *read += used;
if done || used == 0 { if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0))); return Poll::Ready(Ok(mem::replace(read, 0)));
@ -331,81 +620,30 @@ mod impl_ {
} }
} }
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project(); let me = self.project();
read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) read_msg_internal(
} Pin::new(*me.reader),
} cx,
me.buf,
#[cfg(all(target_os = "macos", target_arch = "aarch64"))] me.read_buffer,
mod neon { me.json,
use std::arch::aarch64::*; me.read,
)
pub unsafe fn neon_memchr(
str: &[u8],
c: u8,
length: usize,
) -> Option<usize> {
let end = str.as_ptr().wrapping_add(length);
// Alignment handling
let mut ptr = str.as_ptr();
while ptr < end && (ptr as usize) & 0xF != 0 {
if *ptr == c {
return Some(ptr as usize - str.as_ptr() as usize);
}
ptr = ptr.wrapping_add(1);
}
let search_char = vdupq_n_u8(c);
while ptr.wrapping_add(16) <= end {
let chunk = vld1q_u8(ptr);
let comparison = vceqq_u8(chunk, search_char);
// Check first 64 bits
let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0);
if result0 != 0 {
return Some(
(ptr as usize - str.as_ptr() as usize)
+ result0.trailing_zeros() as usize / 8,
);
}
// Check second 64 bits
let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1);
if result1 != 0 {
return Some(
(ptr as usize - str.as_ptr() as usize)
+ 8
+ result1.trailing_zeros() as usize / 8,
);
}
ptr = ptr.wrapping_add(16);
}
// Handle remaining unaligned characters
while ptr < end {
if *ptr == c {
return Some(ptr as usize - str.as_ptr() as usize);
}
ptr = ptr.wrapping_add(1);
}
None
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::IpcJsonStreamResource; use super::IpcJsonStreamResource;
use deno_core::serde_json;
use deno_core::serde_json::json; use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::JsRuntime;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::RuntimeOptions;
use std::rc::Rc; use std::rc::Rc;
#[allow(clippy::unused_async)] #[allow(clippy::unused_async)]
@ -414,7 +652,10 @@ mod impl_ {
let (a, b) = tokio::net::UnixStream::pair().unwrap(); let (a, b) = tokio::net::UnixStream::pair().unwrap();
/* Similar to how ops would use the resource */ /* Similar to how ops would use the resource */
let a = Rc::new(IpcJsonStreamResource::from_stream(a)); let a = Rc::new(IpcJsonStreamResource::from_stream(
a,
super::IpcRefTracker::new_test(),
));
(a, b) (a, b)
} }
@ -434,7 +675,10 @@ mod impl_ {
server.connect().await.unwrap(); server.connect().await.unwrap();
/* Similar to how ops would use the resource */ /* Similar to how ops would use the resource */
let client = Rc::new(IpcJsonStreamResource::from_stream(client)); let client = Rc::new(IpcJsonStreamResource::from_stream(
client,
super::IpcRefTracker::new_test(),
));
(client, server) (client, server)
} }
@ -467,10 +711,9 @@ mod impl_ {
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
loop { loop {
let msgs = ipc.read_msg().await?; let Some(msgs) = ipc.read_msg().await? else {
if msgs == serde_json::Value::Null {
break; break;
} };
bytes += msgs.as_str().unwrap().len(); bytes += msgs.as_str().unwrap().len();
if start.elapsed().as_secs() > 5 { if start.elapsed().as_secs() > 5 {
break; break;
@ -501,10 +744,13 @@ mod impl_ {
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
}); });
ipc.clone().write_msg(json!("hello")).await?; ipc
.clone()
.write_msg_bytes(&json_to_bytes(json!("hello")))
.await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let msgs = ipc.read_msg().await?; let msgs = ipc.read_msg().await?.unwrap();
assert_eq!(msgs, json!("world")); assert_eq!(msgs, json!("world"));
child.await??; child.await??;
@ -512,6 +758,12 @@ mod impl_ {
Ok(()) Ok(())
} }
fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec<u8> {
let mut buf = deno_core::serde_json::to_vec(&v).unwrap();
buf.push(b'\n');
buf
}
#[tokio::test] #[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
let (ipc, mut fd2) = pair().await; let (ipc, mut fd2) = pair().await;
@ -527,11 +779,17 @@ mod impl_ {
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
}); });
ipc.clone().write_msg(json!("hello")).await?; ipc
ipc.clone().write_msg(json!("world")).await?; .clone()
.write_msg_bytes(&json_to_bytes(json!("hello")))
.await?;
ipc
.clone()
.write_msg_bytes(&json_to_bytes(json!("world")))
.await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let msgs = ipc.read_msg().await?; let msgs = ipc.read_msg().await?.unwrap();
assert_eq!(msgs, json!("foo")); assert_eq!(msgs, json!("foo"));
child.await??; child.await??;
@ -566,5 +824,58 @@ mod impl_ {
let empty = b""; let empty = b"";
assert_eq!(super::memchr(b'\n', empty), None); assert_eq!(super::memchr(b'\n', empty), None);
} }
fn wrap_expr(s: &str) -> String {
format!("(function () {{ return {s}; }})()")
}
fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
let val = runtime.execute_script("", js).unwrap();
let scope = &mut runtime.handle_scope();
let val = v8::Local::new(scope, val);
let mut buf = Vec::new();
let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
super::serialize_v8_value(scope, val, &mut ser).unwrap();
String::from_utf8(buf).unwrap()
}
#[test]
fn ipc_serialization() {
let mut runtime = JsRuntime::new(RuntimeOptions::default());
let cases = [
("'hello'", "\"hello\""),
("1", "1"),
("1.5", "1.5"),
("Number.NaN", "null"),
("Infinity", "null"),
("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
(
"Number.MIN_SAFE_INTEGER",
&(-(2i64.pow(53) - 1)).to_string(),
),
("[1, 2, 3]", "[1,2,3]"),
("new Uint8Array([1,2,3])", "[1,2,3]"),
(
"{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
r#"{"a":1.5,"b":{"c":{}}}"#,
),
("new Number(1)", "1"),
("new Boolean(true)", "true"),
("true", "true"),
(r#"new String("foo")"#, "\"foo\""),
("null", "null"),
(
r#"{ a: "field", toJSON() { return "custom"; } }"#,
"\"custom\"",
),
];
for (input, expect) in cases {
let js = wrap_expr(input);
let actual = serialize_js_to_json(&mut runtime, js);
assert_eq!(actual, expect);
}
}
} }
} }

View file

@ -140,8 +140,7 @@ mod tests {
#[test] #[test]
fn test_run_in_this_context() { fn test_run_in_this_context() {
let platform = v8::new_default_platform(0, false).make_shared(); let platform = v8::new_default_platform(0, false).make_shared();
v8::V8::initialize_platform(platform); deno_core::JsRuntime::init_platform(Some(platform));
v8::V8::initialize();
let isolate = &mut v8::Isolate::new(Default::default()); let isolate = &mut v8::Isolate::new(Default::default());

View file

@ -115,7 +115,8 @@ export function fork(
// more // more
const v8Flags: string[] = []; const v8Flags: string[] = [];
if (Array.isArray(execArgv)) { if (Array.isArray(execArgv)) {
for (let index = 0; index < execArgv.length; index++) { let index = 0;
while (index < execArgv.length) {
const flag = execArgv[index]; const flag = execArgv[index];
if (flag.startsWith("--max-old-space-size")) { if (flag.startsWith("--max-old-space-size")) {
execArgv.splice(index, 1); execArgv.splice(index, 1);
@ -123,6 +124,16 @@ export function fork(
} else if (flag.startsWith("--enable-source-maps")) { } else if (flag.startsWith("--enable-source-maps")) {
// https://github.com/denoland/deno/issues/21750 // https://github.com/denoland/deno/issues/21750
execArgv.splice(index, 1); execArgv.splice(index, 1);
} else if (flag.startsWith("-C") || flag.startsWith("--conditions")) {
let rm = 1;
if (flag.indexOf("=") === -1) {
// --conditions foo
// so remove the next argument as well.
rm = 2;
}
execArgv.splice(index, rm);
} else {
index++;
} }
} }
} }
@ -825,7 +836,17 @@ export function execFileSync(
function setupChildProcessIpcChannel() { function setupChildProcessIpcChannel() {
const fd = op_node_child_ipc_pipe(); const fd = op_node_child_ipc_pipe();
if (typeof fd != "number" || fd < 0) return; if (typeof fd != "number" || fd < 0) return;
setupChannel(process, fd); const control = setupChannel(process, fd);
process.on("newListener", (name: string) => {
if (name === "message" || name === "disconnect") {
control.refCounted();
}
});
process.on("removeListener", (name: string) => {
if (name === "message" || name === "disconnect") {
control.unrefCounted();
}
});
} }
internals.__setupChildProcessIpcChannel = setupChildProcessIpcChannel; internals.__setupChildProcessIpcChannel = setupChildProcessIpcChannel;

View file

@ -7,7 +7,12 @@
// deno-lint-ignore-file prefer-primordials // deno-lint-ignore-file prefer-primordials
import { core, internals } from "ext:core/mod.js"; import { core, internals } from "ext:core/mod.js";
import { op_node_ipc_read, op_node_ipc_write } from "ext:core/ops"; import {
op_node_ipc_read,
op_node_ipc_ref,
op_node_ipc_unref,
op_node_ipc_write,
} from "ext:core/ops";
import { import {
ArrayIsArray, ArrayIsArray,
ArrayPrototypeFilter, ArrayPrototypeFilter,
@ -17,13 +22,14 @@ import {
ArrayPrototypeSort, ArrayPrototypeSort,
ArrayPrototypeUnshift, ArrayPrototypeUnshift,
ObjectHasOwn, ObjectHasOwn,
StringPrototypeStartsWith,
StringPrototypeToUpperCase, StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs"; } from "ext:deno_node/internal/primordials.mjs";
import { assert } from "ext:deno_node/_util/asserts.ts"; import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts"; import { os } from "ext:deno_node/internal_binding/constants.ts";
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { notImplemented } from "ext:deno_node/_utils.ts";
import { Readable, Stream, Writable } from "node:stream"; import { Readable, Stream, Writable } from "node:stream";
import { isWindows } from "ext:deno_node/_util/os.ts"; import { isWindows } from "ext:deno_node/_util/os.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts";
@ -31,6 +37,7 @@ import {
AbortError, AbortError,
ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_VALUE,
ERR_IPC_CHANNEL_CLOSED,
ERR_UNKNOWN_SIGNAL, ERR_UNKNOWN_SIGNAL,
} from "ext:deno_node/internal/errors.ts"; } from "ext:deno_node/internal/errors.ts";
import { Buffer } from "node:buffer"; import { Buffer } from "node:buffer";
@ -46,6 +53,7 @@ import {
import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process"; import process from "node:process";
import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
export function mapValues<T, O>( export function mapValues<T, O>(
record: Readonly<Record<string, T>>, record: Readonly<Record<string, T>>,
@ -97,6 +105,19 @@ export function stdioStringToArray(
return options; return options;
} }
const kClosesNeeded = Symbol("_closesNeeded");
const kClosesReceived = Symbol("_closesReceived");
// We only want to emit a close event for the child process when all of
// the writable streams have closed. The value of `child[kClosesNeeded]` should be 1 +
// the number of opened writable streams (note this excludes `stdin`).
function maybeClose(child: ChildProcess) {
child[kClosesReceived]++;
if (child[kClosesNeeded] === child[kClosesReceived]) {
child.emit("close", child.exitCode, child.signalCode);
}
}
export class ChildProcess extends EventEmitter { export class ChildProcess extends EventEmitter {
/** /**
* The exit code of the child process. This property will be `null` until the child process exits. * The exit code of the child process. This property will be `null` until the child process exits.
@ -152,8 +173,13 @@ export class ChildProcess extends EventEmitter {
null, null,
]; ];
disconnect?: () => void;
#process!: Deno.ChildProcess; #process!: Deno.ChildProcess;
#spawned = Promise.withResolvers<void>(); #spawned = Promise.withResolvers<void>();
[kClosesNeeded] = 1;
[kClosesReceived] = 0;
canDisconnect = false;
constructor( constructor(
command: string, command: string,
@ -218,13 +244,23 @@ export class ChildProcess extends EventEmitter {
if (stdout === "pipe") { if (stdout === "pipe") {
assert(this.#process.stdout); assert(this.#process.stdout);
this[kClosesNeeded]++;
this.stdout = Readable.fromWeb(this.#process.stdout); this.stdout = Readable.fromWeb(this.#process.stdout);
this.stdout.on("close", () => {
maybeClose(this);
});
} }
if (stderr === "pipe") { if (stderr === "pipe") {
assert(this.#process.stderr); assert(this.#process.stderr);
this[kClosesNeeded]++;
this.stderr = Readable.fromWeb(this.#process.stderr); this.stderr = Readable.fromWeb(this.#process.stderr);
this.stderr.on("close", () => {
maybeClose(this);
});
} }
// TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
// close events (like above)
this.stdio[0] = this.stdin; this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout; this.stdio[1] = this.stdout;
@ -259,6 +295,10 @@ export class ChildProcess extends EventEmitter {
const pipeFd = internals.getPipeFd(this.#process); const pipeFd = internals.getPipeFd(this.#process);
if (typeof pipeFd == "number") { if (typeof pipeFd == "number") {
setupChannel(this, pipeFd); setupChannel(this, pipeFd);
this[kClosesNeeded]++;
this.on("disconnect", () => {
maybeClose(this);
});
} }
(async () => { (async () => {
@ -271,7 +311,7 @@ export class ChildProcess extends EventEmitter {
this.emit("exit", exitCode, signalCode); this.emit("exit", exitCode, signalCode);
await this.#_waitForChildStreamsToClose(); await this.#_waitForChildStreamsToClose();
this.#closePipes(); this.#closePipes();
this.emit("close", exitCode, signalCode); maybeClose(this);
}); });
})(); })();
} catch (err) { } catch (err) {
@ -304,7 +344,7 @@ export class ChildProcess extends EventEmitter {
} }
/* Cancel any pending IPC I/O */ /* Cancel any pending IPC I/O */
if (this.implementsDisconnect) { if (this.canDisconnect) {
this.disconnect?.(); this.disconnect?.();
} }
@ -321,10 +361,6 @@ export class ChildProcess extends EventEmitter {
this.#process.unref(); this.#process.unref();
} }
disconnect() {
warnNotImplemented("ChildProcess.prototype.disconnect");
}
async #_waitForChildStreamsToClose() { async #_waitForChildStreamsToClose() {
const promises = [] as Array<Promise<void>>; const promises = [] as Array<Promise<void>>;
// Don't close parent process stdin if that's passed through // Don't close parent process stdin if that's passed through
@ -359,6 +395,16 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin); assert(this.stdin);
this.stdin.destroy(); this.stdin.destroy();
} }
/// TODO(nathanwhit): for some reason when the child process exits
/// and the child end of the named pipe closes, reads still just return `Pending`
/// instead of returning that 0 bytes were read (to signal the pipe died).
/// For now, just forcibly disconnect, but in theory I think we could miss messages
/// that haven't been read yet.
if (Deno.build.os === "windows") {
if (this.canDisconnect) {
this.disconnect?.();
}
}
} }
} }
@ -1099,18 +1145,109 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs; return denoArgs;
} }
export function setupChannel(target, ipc) { const kControlDisconnect = Symbol("kControlDisconnect");
const kPendingMessages = Symbol("kPendingMessages");
// controls refcounting for the IPC channel
class Control extends EventEmitter {
#channel: number;
#refs: number = 0;
#refExplicitlySet = false;
#connected = true;
[kPendingMessages] = [];
constructor(channel: number) {
super();
this.#channel = channel;
}
#ref() {
if (this.#connected) {
op_node_ipc_ref(this.#channel);
}
}
#unref() {
if (this.#connected) {
op_node_ipc_unref(this.#channel);
}
}
[kControlDisconnect]() {
this.#unref();
this.#connected = false;
}
refCounted() {
if (++this.#refs === 1 && !this.#refExplicitlySet) {
this.#ref();
}
}
unrefCounted() {
if (--this.#refs === 0 && !this.#refExplicitlySet) {
this.#unref();
this.emit("unref");
}
}
ref() {
this.#refExplicitlySet = true;
this.#ref();
}
unref() {
this.#refExplicitlySet = false;
this.#unref();
}
}
type InternalMessage = {
cmd: `NODE_${string}`;
};
// deno-lint-ignore no-explicit-any
function isInternal(msg: any): msg is InternalMessage {
if (msg && typeof msg === "object") {
const cmd = msg["cmd"];
if (typeof cmd === "string") {
return StringPrototypeStartsWith(cmd, "NODE_");
}
}
return false;
}
function internalCmdName(msg: InternalMessage): string {
return StringPrototypeSlice(msg.cmd, 5);
}
// deno-lint-ignore no-explicit-any
export function setupChannel(target: any, ipc: number) {
const control = new Control(ipc);
target.channel = control;
async function readLoop() { async function readLoop() {
try { try {
while (true) { while (true) {
if (!target.connected || target.killed) { if (!target.connected || target.killed) {
return; return;
} }
const msg = await op_node_ipc_read(ipc); const prom = op_node_ipc_read(ipc);
if (msg == null) { // there will always be a pending read promise,
// Channel closed. // but it shouldn't keep the event loop from exiting
target.disconnect(); core.unrefOpPromise(prom);
return; const msg = await prom;
if (isInternal(msg)) {
const cmd = internalCmdName(msg);
if (cmd === "CLOSE") {
// Channel closed.
target.disconnect();
return;
} else {
// TODO(nathanwhit): once we add support for sending
// handles, if we want to support deno-node IPC interop,
// we'll need to handle the NODE_HANDLE_* messages here.
continue;
}
} }
process.nextTick(handleMessage, msg); process.nextTick(handleMessage, msg);
@ -1126,9 +1263,29 @@ export function setupChannel(target, ipc) {
} }
function handleMessage(msg) { function handleMessage(msg) {
target.emit("message", msg); if (!target.channel) {
return;
}
if (target.listenerCount("message") !== 0) {
target.emit("message", msg);
return;
}
ArrayPrototypePush(target.channel[kPendingMessages], msg);
} }
target.on("newListener", () => {
nextTick(() => {
if (!target.channel || !target.listenerCount("message")) {
return;
}
for (const msg of target.channel[kPendingMessages]) {
target.emit("message", msg);
}
target.channel[kPendingMessages] = [];
});
});
target.send = function (message, handle, options, callback) { target.send = function (message, handle, options, callback) {
if (typeof handle === "function") { if (typeof handle === "function") {
callback = handle; callback = handle;
@ -1151,32 +1308,55 @@ export function setupChannel(target, ipc) {
notImplemented("ChildProcess.send with handle"); notImplemented("ChildProcess.send with handle");
} }
op_node_ipc_write(ipc, message) if (!target.connected) {
const err = new ERR_IPC_CHANNEL_CLOSED();
if (typeof callback === "function") {
console.error("ChildProcess.send with callback");
process.nextTick(callback, err);
} else {
nextTick(() => target.emit("error", err));
}
return false;
}
// signals whether the queue is within the limit.
// if false, the sender should slow down.
// this acts as a backpressure mechanism.
const queueOk = [true];
control.refCounted();
op_node_ipc_write(ipc, message, queueOk)
.then(() => { .then(() => {
control.unrefCounted();
if (callback) { if (callback) {
process.nextTick(callback, null); process.nextTick(callback, null);
} }
}); });
return queueOk[0];
}; };
target.connected = true; target.connected = true;
target.disconnect = function () { target.disconnect = function () {
if (!this.connected) { if (!target.connected) {
this.emit("error", new Error("IPC channel is already disconnected")); target.emit("error", new Error("IPC channel is already disconnected"));
return; return;
} }
this.connected = false; target.connected = false;
target.canDisconnect = false;
control[kControlDisconnect]();
process.nextTick(() => { process.nextTick(() => {
target.channel = null;
core.close(ipc); core.close(ipc);
target.emit("disconnect"); target.emit("disconnect");
}); });
}; };
target.implementsDisconnect = true; target.canDisconnect = true;
// Start reading messages from the channel. // Start reading messages from the channel.
readLoop(); readLoop();
return control;
} }
export default { export default {

View file

@ -345,14 +345,15 @@ fn create_command(
}); });
/* One end returned to parent process (this) */ /* One end returned to parent process (this) */
let pipe_rid = Some( let pipe_rid = Some(state.resource_table.add(
state deno_node::IpcJsonStreamResource::new(
.resource_table fd1 as _,
.add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
); )?,
));
/* The other end passed to child process via DENO_CHANNEL_FD */ /* The other end passed to child process via NODE_CHANNEL_FD */
command.env("DENO_CHANNEL_FD", format!("{}", ipc)); command.env("NODE_CHANNEL_FD", format!("{}", ipc));
return Ok((command, pipe_rid)); return Ok((command, pipe_rid));
} }
@ -470,14 +471,15 @@ fn create_command(
} }
/* One end returned to parent process (this) */ /* One end returned to parent process (this) */
let pipe_fd = Some( let pipe_fd = Some(state.resource_table.add(
state deno_node::IpcJsonStreamResource::new(
.resource_table hd1 as i64,
.add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
); )?,
));
/* The other end passed to child process via DENO_CHANNEL_FD */ /* The other end passed to child process via NODE_CHANNEL_FD */
command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
return Ok((command, pipe_fd)); return Ok((command, pipe_fd));
} }

View file

@ -9,8 +9,10 @@ import {
assertNotStrictEquals, assertNotStrictEquals,
assertStrictEquals, assertStrictEquals,
assertStringIncludes, assertStringIncludes,
assertThrows,
} from "@std/assert"; } from "@std/assert";
import * as path from "@std/path"; import * as path from "@std/path";
import { setTimeout } from "node:timers";
const { spawn, spawnSync, execFile, execFileSync, ChildProcess } = CP; const { spawn, spawnSync, execFile, execFileSync, ChildProcess } = CP;
@ -63,6 +65,7 @@ Deno.test("[node/child_process disconnect] the method exists", async () => {
const deferred = withTimeout<void>(); const deferred = withTimeout<void>();
const childProcess = spawn(Deno.execPath(), ["--help"], { const childProcess = spawn(Deno.execPath(), ["--help"], {
env: { NO_COLOR: "true" }, env: { NO_COLOR: "true" },
stdio: ["pipe", "pipe", "pipe", "ipc"],
}); });
try { try {
childProcess.disconnect(); childProcess.disconnect();
@ -855,3 +858,191 @@ Deno.test(
assertEquals(output.stderr, null); assertEquals(output.stderr, null);
}, },
); );
Deno.test(
async function ipcSerialization() {
const timeout = withTimeout<void>();
const script = `
if (typeof process.send !== "function") {
console.error("process.send is not a function");
process.exit(1);
}
class BigIntWrapper {
constructor(value) {
this.value = value;
}
toJSON() {
return this.value.toString();
}
}
const makeSab = (arr) => {
const sab = new SharedArrayBuffer(arr.length);
const buf = new Uint8Array(sab);
for (let i = 0; i < arr.length; i++) {
buf[i] = arr[i];
}
return buf;
};
const inputs = [
"foo",
{
foo: "bar",
},
42,
true,
null,
new Uint8Array([1, 2, 3]),
{
foo: new Uint8Array([1, 2, 3]),
bar: makeSab([4, 5, 6]),
},
[1, { foo: 2 }, [3, 4]],
new BigIntWrapper(42n),
];
for (const input of inputs) {
process.send(input);
}
`;
const file = await Deno.makeTempFile();
await Deno.writeTextFile(file, script);
const child = CP.fork(file, [], {
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
const expect = [
"foo",
{
foo: "bar",
},
42,
true,
null,
[1, 2, 3],
{
foo: [1, 2, 3],
bar: [4, 5, 6],
},
[1, { foo: 2 }, [3, 4]],
"42",
];
let i = 0;
child.on("message", (message) => {
assertEquals(message, expect[i]);
i++;
});
child.on("close", () => timeout.resolve());
await timeout.promise;
assertEquals(i, expect.length);
},
);
Deno.test(async function childProcessExitsGracefully() {
const testdataDir = path.join(
path.dirname(path.fromFileUrl(import.meta.url)),
"testdata",
);
const script = path.join(
testdataDir,
"node_modules",
"foo",
"index.js",
);
const p = Promise.withResolvers<void>();
const cp = CP.fork(script, [], {
cwd: testdataDir,
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
cp.on("close", () => p.resolve());
await p.promise;
});
Deno.test(async function killMultipleTimesNoError() {
const loop = `
while (true) {
await new Promise((resolve) => setTimeout(resolve, 10000));
}
`;
const timeout = withTimeout<void>();
const file = await Deno.makeTempFile();
await Deno.writeTextFile(file, loop);
const child = CP.fork(file, [], {
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
child.on("close", () => {
timeout.resolve();
});
child.kill();
child.kill();
// explicitly calling disconnect after kill should throw
assertThrows(() => child.disconnect());
await timeout.promise;
});
// Make sure that you receive messages sent before a "message" event listener is set up
Deno.test(async function bufferMessagesIfNoListener() {
const code = `
process.on("message", (_) => {
process.channel.unref();
});
process.send("hello");
process.send("world");
console.error("sent messages");
`;
const file = await Deno.makeTempFile();
await Deno.writeTextFile(file, code);
const timeout = withTimeout<void>();
const child = CP.fork(file, [], {
stdio: ["inherit", "inherit", "pipe", "ipc"],
});
let got = 0;
child.on("message", (message) => {
if (got++ === 0) {
assertEquals(message, "hello");
} else {
assertEquals(message, "world");
}
});
child.on("close", () => {
timeout.resolve();
});
let stderr = "";
child.stderr?.on("data", (data) => {
stderr += data;
if (stderr.includes("sent messages")) {
// now that we've set up the listeners, and the child
// has sent the messages, we can let it exit
child.send("ready");
}
});
await timeout.promise;
assertEquals(got, 2);
});
Deno.test(async function sendAfterClosedThrows() {
const code = ``;
const file = await Deno.makeTempFile();
await Deno.writeTextFile(file, code);
const timeout = withTimeout<void>();
const child = CP.fork(file, [], {
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
child.on("error", (err) => {
assert("code" in err);
assertEquals(err.code, "ERR_IPC_CHANNEL_CLOSED");
timeout.resolve();
});
child.on("close", () => {
child.send("ready");
});
await timeout.promise;
});