From 5193834cf23e3521b3afd3f5f54eb00daa23c88d Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 23 Feb 2024 11:11:15 -0700 Subject: [PATCH] refactor(cli): clean up test runner channels (#22422) Gets us closer to solving #20707. Rewrites the `TestEventSender`: - Allow for explicit creation of multiple streams. This will allow for one-std{out,err}-per-worker - All test events are received along with a worker ID, allowing for eventual, proper parallel threading of test events. In theory this should open up proper interleaving of test output, however that is left for a future PR. I had some plans for a better performing synchronization primitive, but the inter-thread communication is tricky. This does, however, speed up the processing of large numbers of tests 15-25% (possibly even more on 100,000+). Before ``` ok | 1000 passed | 0 failed (32ms) ok | 10000 passed | 0 failed (276ms) ``` After ``` ok | 1000 passed | 0 failed (25ms) ok | 10000 passed | 0 failed (230ms) ``` --- cli/lsp/testing/execution.rs | 30 +-- cli/tools/jupyter/mod.rs | 26 +- cli/tools/repl/mod.rs | 9 +- cli/tools/repl/session.rs | 5 +- cli/tools/test/channel.rs | 491 +++++++++++++++++++++++++++++++++++ cli/tools/test/mod.rs | 222 ++++------------ ext/io/lib.rs | 57 ++-- ext/io/pipe.rs | 161 +++++++++--- 8 files changed, 729 insertions(+), 272 deletions(-) create mode 100644 cli/tools/test/channel.rs diff --git a/cli/lsp/testing/execution.rs b/cli/lsp/testing/execution.rs index 11882e6af3..5fd1feb5aa 100644 --- a/cli/lsp/testing/execution.rs +++ b/cli/lsp/testing/execution.rs @@ -12,8 +12,8 @@ use crate::lsp::client::TestingNotification; use crate::lsp::config; use crate::lsp::logging::lsp_log; use crate::tools::test; +use crate::tools::test::create_test_event_channel; use crate::tools::test::FailFastTracker; -use crate::tools::test::TestEventSender; use deno_core::anyhow::anyhow; use deno_core::error::AnyError; @@ -35,7 +35,6 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tower_lsp::lsp_types as lsp; @@ -246,8 +245,7 @@ impl TestRun { unreachable!("Should always be Test subcommand."); }; - let (sender, mut receiver) = mpsc::unbounded_channel::(); - let sender = TestEventSender::new(sender); + let (test_event_sender_factory, mut receiver) = create_test_event_channel(); let fail_fast_tracker = FailFastTracker::new(fail_fast); let mut queue = self.queue.iter().collect::>(); @@ -263,7 +261,7 @@ impl TestRun { let specifier = specifier.clone(); let worker_factory = worker_factory.clone(); let permissions = permissions.clone(); - let mut sender = sender.clone(); + let worker_sender = test_event_sender_factory.worker(); let fail_fast_tracker = fail_fast_tracker.clone(); let lsp_filter = self.filters.get(&specifier); let filter = test::TestFilter { @@ -284,15 +282,16 @@ impl TestRun { if fail_fast_tracker.should_stop() { return Ok(()); } - let origin = specifier.to_string(); - let file_result = if token.is_cancelled() { + if token.is_cancelled() { Ok(()) } else { + // All JsErrors are handled by test_specifier and piped into the test + // channel. create_and_run_current_thread(test::test_specifier( worker_factory, permissions, specifier, - sender.clone(), + worker_sender, fail_fast_tracker, test::TestSpecifierOptions { filter, @@ -300,18 +299,7 @@ impl TestRun { trace_ops: false, }, )) - }; - if let Err(error) = file_result { - if error.is::() { - sender.send(test::TestEvent::UncaughtError( - origin, - Box::new(error.downcast::().unwrap()), - ))?; - } else { - return Err(error); - } } - Ok(()) }) }); @@ -333,7 +321,7 @@ impl TestRun { let mut tests_with_result = HashSet::new(); let mut used_only = false; - while let Some(event) = receiver.recv().await { + while let Some((_, event)) = receiver.recv().await { match event { test::TestEvent::Register(description) => { reporter.report_register(&description); @@ -352,7 +340,7 @@ impl TestRun { test::TestEvent::Wait(id) => { reporter.report_wait(tests.read().get(&id).unwrap()); } - test::TestEvent::Output(output) => { + test::TestEvent::Output(_, output) => { reporter.report_output(&output); } test::TestEvent::Result(id, result, elapsed) => { diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index ea58328bb0..cf1a44ea5c 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -5,6 +5,9 @@ use crate::args::JupyterFlags; use crate::ops; use crate::tools::jupyter::server::StdioMsg; use crate::tools::repl; +use crate::tools::test::create_single_test_event_channel; +use crate::tools::test::reporters::PrettyTestReporter; +use crate::tools::test::TestEventWorkerSender; use crate::util::logger; use crate::CliFactory; use deno_core::anyhow::Context; @@ -19,13 +22,8 @@ use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use deno_terminal::colors; use tokio::sync::mpsc; -use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedSender; -use super::test::reporters::PrettyTestReporter; -use super::test::TestEvent; -use super::test::TestEventSender; - mod install; pub(crate) mod jupyter_msg; pub(crate) mod server; @@ -79,11 +77,13 @@ pub async fn kernel( connection_filepath ) })?; - let (test_event_sender, test_event_receiver) = - unbounded_channel::(); - let test_event_sender = TestEventSender::new(test_event_sender); - let stdout = StdioPipe::File(test_event_sender.stdout()); - let stderr = StdioPipe::File(test_event_sender.stderr()); + let (worker, test_event_receiver) = create_single_test_event_channel(); + let TestEventWorkerSender { + sender: test_event_sender, + stdout, + stderr, + } = worker; + let mut worker = worker_factory .create_custom_worker( main_module.clone(), @@ -94,9 +94,9 @@ pub async fn kernel( ], // FIXME(nayeemrmn): Test output capturing currently doesn't work. Stdio { - stdin: StdioPipe::Inherit, - stdout, - stderr, + stdin: StdioPipe::inherit(), + stdout: StdioPipe::file(stdout), + stderr: StdioPipe::file(stderr), }, ) .await?; diff --git a/cli/tools/repl/mod.rs b/cli/tools/repl/mod.rs index e40c6362a1..98519b60dc 100644 --- a/cli/tools/repl/mod.rs +++ b/cli/tools/repl/mod.rs @@ -16,7 +16,6 @@ use deno_core::unsync::spawn_blocking; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use rustyline::error::ReadlineError; -use tokio::sync::mpsc::unbounded_channel; mod channel; mod editor; @@ -32,8 +31,7 @@ pub use session::EvaluationOutput; pub use session::ReplSession; pub use session::REPL_INTERNALS_NAME; -use super::test::TestEvent; -use super::test::TestEventSender; +use super::test::create_single_test_event_channel; struct Repl { session: ReplSession, @@ -168,9 +166,8 @@ pub async fn run(flags: Flags, repl_flags: ReplFlags) -> Result { .deno_dir() .ok() .and_then(|dir| dir.repl_history_file_path()); - let (test_event_sender, test_event_receiver) = - unbounded_channel::(); - let test_event_sender = TestEventSender::new(test_event_sender); + let (worker, test_event_receiver) = create_single_test_event_channel(); + let test_event_sender = worker.sender; let mut worker = worker_factory .create_custom_worker( main_module.clone(), diff --git a/cli/tools/repl/session.rs b/cli/tools/repl/session.rs index a52eb095f6..65e27136f0 100644 --- a/cli/tools/repl/session.rs +++ b/cli/tools/repl/session.rs @@ -14,6 +14,7 @@ use crate::tools::test::reporters::TestReporter; use crate::tools::test::run_tests_for_worker; use crate::tools::test::worker_has_tests; use crate::tools::test::TestEvent; +use crate::tools::test::TestEventReceiver; use crate::tools::test::TestEventSender; use deno_ast::diagnostics::Diagnostic; @@ -183,7 +184,7 @@ pub struct ReplSession { test_reporter_factory: Box Box>, test_event_sender: TestEventSender, /// This is only optional because it's temporarily taken when evaluating. - test_event_receiver: Option>, + test_event_receiver: Option, jsx: ReplJsxState, experimental_decorators: bool, } @@ -196,7 +197,7 @@ impl ReplSession { mut worker: MainWorker, main_module: ModuleSpecifier, test_event_sender: TestEventSender, - test_event_receiver: tokio::sync::mpsc::UnboundedReceiver, + test_event_receiver: TestEventReceiver, ) -> Result { let language_server = ReplLanguageServer::new_initialized().await?; let mut session = worker.create_inspector_session().await; diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs new file mode 100644 index 0000000000..07fee65214 --- /dev/null +++ b/cli/tools/test/channel.rs @@ -0,0 +1,491 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use super::TestEvent; +use super::TestStdioStream; +use deno_core::futures::future::poll_fn; +use deno_core::parking_lot; +use deno_core::parking_lot::lock_api::RawMutex; +use deno_runtime::deno_io::pipe; +use deno_runtime::deno_io::AsyncPipeRead; +use deno_runtime::deno_io::PipeRead; +use deno_runtime::deno_io::PipeWrite; +use std::fmt::Display; +use std::io::Write; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::ReadBuf; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::WeakUnboundedSender; + +/// 8-byte sync marker that is unlikely to appear in normal output. Equivalent +/// to the string `"\u{200B}\0\u{200B}\0"`. +const SYNC_MARKER: &[u8; 8] = &[226, 128, 139, 0, 226, 128, 139, 0]; + +const BUFFER_SIZE: usize = 4096; + +/// The test channel has been closed and cannot be used to send further messages. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct ChannelClosedError; + +impl std::error::Error for ChannelClosedError {} + +impl Display for ChannelClosedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Test channel closed") + } +} + +impl From> for ChannelClosedError { + fn from(_: SendError) -> Self { + Self + } +} + +#[repr(transparent)] +struct SendMutex(*const parking_lot::RawMutex); +impl Drop for SendMutex { + fn drop(&mut self) { + // SAFETY: We know this was locked by the sender + unsafe { + (*self.0).unlock(); + } + } +} + +// SAFETY: This is a mutex, so it's safe to send a pointer to it +unsafe impl Send for SendMutex {} + +/// Create a [`TestEventSenderFactory`] and [`TestEventReceiver`] pair. The [`TestEventSenderFactory`] may be +/// used to create [`TestEventSender`]s and stdio streams for multiple workers in the system. The [`TestEventReceiver`] +/// will be kept alive until the final [`TestEventSender`] is dropped. +pub fn create_test_event_channel() -> (TestEventSenderFactory, TestEventReceiver) +{ + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + ( + TestEventSenderFactory { + sender, + worker_id: Default::default(), + }, + TestEventReceiver { receiver }, + ) +} + +/// Create a [`TestEventWorkerSender`] and [`TestEventReceiver`] pair.The [`TestEventReceiver`] +/// will be kept alive until the [`TestEventSender`] is dropped. +pub fn create_single_test_event_channel( +) -> (TestEventWorkerSender, TestEventReceiver) { + let (factory, receiver) = create_test_event_channel(); + (factory.worker(), receiver) +} + +/// Polls for the next [`TestEvent`] from any worker. Events from multiple worker +/// streams may be interleaved. +pub struct TestEventReceiver { + receiver: UnboundedReceiver<(usize, TestEvent)>, +} + +impl TestEventReceiver { + /// Receive a single test event, or `None` if no workers are alive. + pub async fn recv(&mut self) -> Option<(usize, TestEvent)> { + self.receiver.recv().await + } +} + +struct TestStream { + id: usize, + which: TestStdioStream, + read_opt: Option, + sender: UnboundedSender<(usize, TestEvent)>, +} + +impl TestStream { + fn new( + id: usize, + which: TestStdioStream, + pipe_reader: PipeRead, + sender: UnboundedSender<(usize, TestEvent)>, + ) -> std::io::Result { + // This may fail if the tokio runtime is shutting down + let read_opt = Some(pipe_reader.into_async()?); + Ok(Self { + id, + which, + read_opt, + sender, + }) + } + + /// Send a buffer to the test event channel. If the channel no longer exists, shut down the stream + /// because we can't do anything. + #[must_use = "If this returns false, don't keep reading because we cannot send"] + fn send(&mut self, buffer: Vec) -> bool { + if buffer.is_empty() { + true + } else if self + .sender + .send((self.id, TestEvent::Output(self.which, buffer))) + .is_err() + { + self.read_opt.take(); + false + } else { + true + } + } + + fn is_alive(&self) -> bool { + self.read_opt.is_some() + } + + /// Attempt to read from a given stream, pushing all of the data in it into the given + /// [`UnboundedSender`] before returning. + async fn pipe(&mut self) { + let mut buffer = [0_u8; BUFFER_SIZE]; + let mut buf = ReadBuf::new(&mut buffer); + let res = { + // No more stream, so just return. + let Some(stream) = &mut self.read_opt else { + return; + }; + poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await + }; + match res { + Ok(_) => { + let buf = buf.filled().to_vec(); + if buf.is_empty() { + // The buffer may return empty in EOF conditions and never return an error, + // so we need to treat this as EOF + self.read_opt.take(); + } else { + // Attempt to send the buffer, marking as not alive if the channel is closed + _ = self.send(buf); + } + } + Err(_) => { + // Stream errored, so just return and mark this stream as not alive. + _ = self.send(buf.filled().to_vec()); + self.read_opt.take(); + } + } + } + + /// Read and "block" until the sync markers have been read. + async fn read_until_sync_marker(&mut self) { + let Some(file) = &mut self.read_opt else { + return; + }; + let mut flush = Vec::with_capacity(BUFFER_SIZE); + loop { + let mut buffer = [0_u8; BUFFER_SIZE]; + match file.read(&mut buffer).await { + Err(_) | Ok(0) => { + // EOF or error, just return. We make no guarantees about unflushed data at shutdown. + self.read_opt.take(); + return; + } + Ok(read) => { + flush.extend(&buffer[0..read]); + if flush.ends_with(SYNC_MARKER) { + flush.truncate(flush.len() - SYNC_MARKER.len()); + // Try to send our flushed buffer. If the channel is closed, this stream will + // be marked as not alive. + _ = self.send(flush); + return; + } + } + } + } + } +} + +/// A factory for creating [`TestEventSender`]s. This factory must be dropped +/// before the [`TestEventReceiver`] will complete. +pub struct TestEventSenderFactory { + sender: UnboundedSender<(usize, TestEvent)>, + worker_id: AtomicUsize, +} + +impl TestEventSenderFactory { + /// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream. + pub fn worker(&self) -> TestEventWorkerSender { + let id = self.worker_id.fetch_add(1, Ordering::AcqRel); + let (stdout_reader, mut stdout_writer) = pipe().unwrap(); + let (stderr_reader, mut stderr_writer) = pipe().unwrap(); + let (sync_sender, mut sync_receiver) = + tokio::sync::mpsc::unbounded_channel::(); + let stdout = stdout_writer.try_clone().unwrap(); + let stderr = stderr_writer.try_clone().unwrap(); + let sender = self.sender.clone(); + + // Each worker spawns its own output monitoring and serialization task. This task will + // poll the stdout/stderr streams and interleave that data with `TestEvents` generated + // by the test runner worker. + // + // Note that this _must_ be a separate thread! Flushing requires locking coördination + // on two threads and if we're blocking-locked on the mutex we've sent down the sync_receiver, + // there's no way for us to process the actual flush operation here. + // + // Creating a mini-runtime to flush the stdout/stderr is the easiest way to do this, but + // there's no reason we couldn't do it with non-blocking I/O, other than the difficulty + // of setting up an I/O reactor in Windows. + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap(); + runtime.block_on(tokio::task::unconstrained(async move { + let mut test_stdout = TestStream::new( + id, + TestStdioStream::Stdout, + stdout_reader, + sender.clone(), + )?; + let mut test_stderr = + TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?; + + // This function will be woken whenever a stream or the receiver is ready + loop { + tokio::select! { + _ = test_stdout.pipe(), if test_stdout.is_alive() => {}, + _ = test_stderr.pipe(), if test_stdout.is_alive() => {}, + recv = sync_receiver.recv() => { + match recv { + // If the channel closed, we assume that all important data from the streams was synced, + // so we just end this task immediately. + None => { break }, + Some(mutex) => { + // If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down), + // we instead just release the mutex and bail. + let success = stdout_writer.write_all(SYNC_MARKER).is_ok() + && stderr_writer.write_all(SYNC_MARKER).is_ok(); + if success { + for stream in [&mut test_stdout, &mut test_stderr] { + stream.read_until_sync_marker().await; + } + } + drop(mutex); + } + } + } + } + } + + Ok::<_, std::io::Error>(()) + }))?; + + Ok::<_, std::io::Error>(()) + }); + + let sender = TestEventSender { + id, + ref_count: Default::default(), + sender: self.sender.clone(), + sync_sender, + }; + + TestEventWorkerSender { + sender, + stdout, + stderr, + } + } + + /// A [`TestEventWeakSender`] has a unique ID, but will not keep the [`TestEventReceiver`] alive. + /// This may be useful to add a `SIGINT` or other break handler to tests that isn't part of a + /// specific test, but handles the overall orchestration of running tests: + /// + /// ```nocompile + /// let mut cancel_sender = test_event_sender_factory.weak_sender(); + /// let sigint_handler_handle = spawn(async move { + /// signal::ctrl_c().await.unwrap(); + /// cancel_sender.send(TestEvent::Sigint).ok(); + /// }); + /// ``` + pub fn weak_sender(&self) -> TestEventWeakSender { + TestEventWeakSender { + id: self.worker_id.fetch_add(1, Ordering::AcqRel), + sender: self.sender.downgrade(), + } + } +} + +pub struct TestEventWeakSender { + pub id: usize, + sender: WeakUnboundedSender<(usize, TestEvent)>, +} + +impl TestEventWeakSender { + pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> { + Ok( + self + .sender + .upgrade() + .ok_or(ChannelClosedError)? + .send((self.id, message))?, + ) + } +} + +pub struct TestEventWorkerSender { + pub sender: TestEventSender, + pub stdout: PipeWrite, + pub stderr: PipeWrite, +} + +/// Sends messages from a given worker into the test stream. If multiple clones of +/// this sender are kept alive, the worker is kept alive. +/// +/// Any unflushed bytes in the stdout or stderr stream associated with this sender +/// are not guaranteed to be sent on drop unless flush is explicitly called. +pub struct TestEventSender { + pub id: usize, + ref_count: Arc<()>, + sender: UnboundedSender<(usize, TestEvent)>, + sync_sender: UnboundedSender, +} + +impl Clone for TestEventSender { + fn clone(&self) -> Self { + Self { + id: self.id, + ref_count: self.ref_count.clone(), + sender: self.sender.clone(), + sync_sender: self.sync_sender.clone(), + } + } +} + +impl TestEventSender { + pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> { + // Certain messages require us to ensure that all output has been drained to ensure proper + // interleaving of messages. + if message.requires_stdio_sync() { + self.flush()?; + } + Ok(self.sender.send((self.id, message))?) + } + + /// Ensure that all output has been fully flushed by writing a sync marker into the + /// stdout and stderr streams and waiting for it on the other side. + pub fn flush(&mut self) -> Result<(), ChannelClosedError> { + let mutex = parking_lot::RawMutex::INIT; + mutex.lock(); + self.sync_sender.send(SendMutex(&mutex as _))?; + mutex.lock(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use deno_core::unsync::spawn; + use deno_core::unsync::spawn_blocking; + + /// Test that output is correctly interleaved with messages. + #[tokio::test] + async fn spawn_worker() { + test_util::timeout!(60); + let (mut worker, mut receiver) = create_single_test_event_channel(); + + let recv_handle = spawn(async move { + let mut queue = vec![]; + while let Some((_, message)) = receiver.recv().await { + let msg_str = format!("{message:?}"); + if msg_str.len() > 50 { + eprintln!("message = {}...", &msg_str[..50]); + } else { + eprintln!("message = {}", msg_str); + } + queue.push(message); + } + eprintln!("done"); + queue + }); + let send_handle = spawn_blocking(move || { + worker.stdout.write_all(&[1; 100_000]).unwrap(); + eprintln!("Wrote bytes"); + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + eprintln!("Sent"); + worker.stdout.write_all(&[2; 100_000]).unwrap(); + eprintln!("Wrote bytes 2"); + worker.sender.flush().unwrap(); + eprintln!("Done"); + }); + send_handle.await.unwrap(); + let messages = recv_handle.await.unwrap(); + + let mut expected = 1; + let mut count = 0; + for message in messages { + match message { + TestEvent::Output(_, vec) => { + assert_eq!(vec[0], expected); + count += vec.len(); + } + TestEvent::StepWait(_) => { + assert_eq!(count, 100_000); + count = 0; + expected = 2; + } + _ => unreachable!(), + } + } + assert_eq!(expected, 2); + assert_eq!(count, 100_000); + } + + /// Test that flushing a large number of times doesn't hang. + #[tokio::test] + async fn test_flush_lots() { + test_util::timeout!(60); + let (mut worker, mut receiver) = create_single_test_event_channel(); + let recv_handle = spawn(async move { + let mut queue = vec![]; + while let Some((_, message)) = receiver.recv().await { + assert!(!matches!(message, TestEvent::Output(..))); + queue.push(message); + } + eprintln!("Receiver closed"); + queue + }); + let send_handle = spawn_blocking(move || { + for _ in 0..100000 { + worker.sender.send(TestEvent::StepWait(1)).unwrap(); + } + eprintln!("Sent all messages"); + }); + send_handle.await.unwrap(); + let messages = recv_handle.await.unwrap(); + assert_eq!(messages.len(), 100000); + } + + /// Ensure nothing panics if we're racing the runtime shutdown. + #[test] + fn test_runtime_shutdown() { + test_util::timeout!(60); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(async { + let (mut worker, mut receiver) = create_single_test_event_channel(); + tokio::task::spawn(async move { + loop { + if receiver.recv().await.is_none() { + break; + } + } + }); + tokio::task::spawn(async move { + _ = worker.sender.send(TestEvent::Sigint); + }); + }); + } +} diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs index b088cf7a32..e98df4671f 100644 --- a/cli/tools/test/mod.rs +++ b/cli/tools/test/mod.rs @@ -37,7 +37,6 @@ use deno_core::futures::stream; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; -use deno_core::parking_lot::Mutex; use deno_core::serde_v8; use deno_core::stats::RuntimeActivity; use deno_core::stats::RuntimeActivityDiff; @@ -71,7 +70,6 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Write as _; use std::future::poll_fn; -use std::io::Read; use std::io::Write; use std::num::NonZeroUsize; use std::path::Path; @@ -84,14 +82,16 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use tokio::signal; -use tokio::sync::mpsc::unbounded_channel; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::mpsc::WeakUnboundedSender; +mod channel; pub mod fmt; pub mod reporters; +pub use channel::create_single_test_event_channel; +pub use channel::create_test_event_channel; +pub use channel::TestEventReceiver; +pub use channel::TestEventSender; +pub use channel::TestEventWorkerSender; use fmt::format_sanitizer_diff; pub use fmt::format_test_error; use reporters::CompoundTestReporter; @@ -329,13 +329,19 @@ pub struct TestPlan { pub used_only: bool, } +#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)] +pub enum TestStdioStream { + Stdout, + Stderr, +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub enum TestEvent { Register(TestDescription), Plan(TestPlan), Wait(usize), - Output(Vec), + Output(TestStdioStream, Vec), Result(usize, TestResult, u64), UncaughtError(String, Box), StepRegister(TestStepDescription), @@ -345,6 +351,21 @@ pub enum TestEvent { Sigint, } +impl TestEvent { + // Certain messages require us to ensure that all output has been drained to ensure proper + // interleaving of output messages. + pub fn requires_stdio_sync(&self) -> bool { + matches!( + self, + TestEvent::Result(..) + | TestEvent::StepWait(..) + | TestEvent::StepResult(..) + | TestEvent::UncaughtError(..) + | TestEvent::ForceEndReport + ) + } +} + #[derive(Debug, Clone, Deserialize)] pub struct TestSummary { pub total: usize, @@ -432,7 +453,7 @@ pub async fn test_specifier( worker_factory: Arc, permissions: Permissions, specifier: ModuleSpecifier, - mut sender: TestEventSender, + mut worker_sender: TestEventWorkerSender, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { @@ -440,7 +461,9 @@ pub async fn test_specifier( worker_factory, permissions, specifier.clone(), - &sender, + &worker_sender.sender, + StdioPipe::file(worker_sender.stdout), + StdioPipe::file(worker_sender.stderr), fail_fast_tracker, options, ) @@ -449,7 +472,7 @@ pub async fn test_specifier( Ok(()) => Ok(()), Err(error) => { if error.is::() { - sender.send(TestEvent::UncaughtError( + worker_sender.sender.send(TestEvent::UncaughtError( specifier.to_string(), Box::new(error.downcast::().unwrap()), ))?; @@ -463,26 +486,27 @@ pub async fn test_specifier( /// Test a single specifier as documentation containing test programs, an executable test module or /// both. +#[allow(clippy::too_many_arguments)] async fn test_specifier_inner( worker_factory: Arc, permissions: Permissions, specifier: ModuleSpecifier, sender: &TestEventSender, + stdout: StdioPipe, + stderr: StdioPipe, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { if fail_fast_tracker.should_stop() { return Ok(()); } - let stdout = StdioPipe::File(sender.stdout()); - let stderr = StdioPipe::File(sender.stderr()); let mut worker = worker_factory .create_custom_worker( specifier.clone(), PermissionsContainer::new(permissions), vec![ops::testing::deno_test::init_ops(sender.clone())], Stdio { - stdin: StdioPipe::Inherit, + stdin: StdioPipe::inherit(), stdout, stderr, }, @@ -1062,14 +1086,13 @@ async fn test_specifiers( specifiers }; - let (sender, receiver) = unbounded_channel::(); - let sender = TestEventSender::new(sender); + let (test_event_sender_factory, receiver) = create_test_event_channel(); let concurrent_jobs = options.concurrent_jobs; - let sender_ = sender.downgrade(); + let mut cancel_sender = test_event_sender_factory.weak_sender(); let sigint_handler_handle = spawn(async move { signal::ctrl_c().await.unwrap(); - sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok()); + cancel_sender.send(TestEvent::Sigint).ok(); }); HAS_TEST_RUN_SIGINT_HANDLER.store(true, Ordering::Relaxed); let reporter = get_test_reporter(&options); @@ -1078,7 +1101,7 @@ async fn test_specifiers( let join_handles = specifiers.into_iter().map(move |specifier| { let worker_factory = worker_factory.clone(); let permissions = permissions.clone(); - let sender = sender.clone(); + let worker_sender = test_event_sender_factory.worker(); let fail_fast_tracker = fail_fast_tracker.clone(); let specifier_options = options.specifier.clone(); spawn_blocking(move || { @@ -1086,12 +1109,13 @@ async fn test_specifiers( worker_factory, permissions, specifier, - sender.clone(), + worker_sender, fail_fast_tracker, specifier_options, )) }) }); + let join_stream = stream::iter(join_handles) .buffer_unordered(concurrent_jobs.get()) .collect::, tokio::task::JoinError>>>(); @@ -1111,9 +1135,9 @@ async fn test_specifiers( /// Gives receiver back in case it was ended with `TestEvent::ForceEndReport`. pub async fn report_tests( - mut receiver: UnboundedReceiver, + mut receiver: TestEventReceiver, mut reporter: Box, -) -> (Result<(), AnyError>, UnboundedReceiver) { +) -> (Result<(), AnyError>, TestEventReceiver) { let mut tests = IndexMap::new(); let mut test_steps = IndexMap::new(); let mut tests_started = HashSet::new(); @@ -1123,7 +1147,7 @@ pub async fn report_tests( let mut used_only = false; let mut failed = false; - while let Some(event) = receiver.recv().await { + while let Some((_, event)) = receiver.recv().await { match event { TestEvent::Register(description) => { reporter.report_register(&description); @@ -1144,7 +1168,7 @@ pub async fn report_tests( reporter.report_wait(tests.get(&id).unwrap()); } } - TestEvent::Output(output) => { + TestEvent::Output(_, output) => { reporter.report_output(&output); } TestEvent::Result(id, result, elapsed) => { @@ -1629,158 +1653,6 @@ impl FailFastTracker { } } -#[derive(Clone)] -pub struct TestEventSender { - sender: UnboundedSender, - stdout_writer: TestOutputPipe, - stderr_writer: TestOutputPipe, -} - -impl TestEventSender { - pub fn new(sender: UnboundedSender) -> Self { - Self { - stdout_writer: TestOutputPipe::new(sender.clone()), - stderr_writer: TestOutputPipe::new(sender.clone()), - sender, - } - } - - pub fn stdout(&self) -> std::fs::File { - self.stdout_writer.as_file() - } - - pub fn stderr(&self) -> std::fs::File { - self.stderr_writer.as_file() - } - - pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> { - // for any event that finishes collecting output, we need to - // ensure that the collected stdout and stderr pipes are flushed - if matches!( - message, - TestEvent::Result(_, _, _) - | TestEvent::StepWait(_) - | TestEvent::StepResult(_, _, _) - | TestEvent::UncaughtError(_, _) - ) { - self.flush_stdout_and_stderr()?; - } - - self.sender.send(message)?; - Ok(()) - } - - fn downgrade(&self) -> WeakUnboundedSender { - self.sender.downgrade() - } - - fn flush_stdout_and_stderr(&mut self) -> Result<(), AnyError> { - self.stdout_writer.flush()?; - self.stderr_writer.flush()?; - - Ok(()) - } -} - -// use a string that if it ends up in the output won't affect how things are displayed -const ZERO_WIDTH_SPACE: &str = "\u{200B}"; - -struct TestOutputPipe { - writer: os_pipe::PipeWriter, - state: Arc>>>, -} - -impl Clone for TestOutputPipe { - fn clone(&self) -> Self { - Self { - writer: self.writer.try_clone().unwrap(), - state: self.state.clone(), - } - } -} - -impl TestOutputPipe { - pub fn new(sender: UnboundedSender) -> Self { - let (reader, writer) = os_pipe::pipe().unwrap(); - let state = Arc::new(Mutex::new(None)); - - start_output_redirect_thread(reader, sender, state.clone()); - - Self { writer, state } - } - - pub fn flush(&mut self) -> Result<(), AnyError> { - // We want to wake up the other thread and have it respond back - // that it's done clearing out its pipe before returning. - let (sender, receiver) = std::sync::mpsc::channel(); - if let Some(sender) = self.state.lock().replace(sender) { - let _ = sender.send(()); // just in case - } - // Bit of a hack to send a zero width space in order to wake - // the thread up. It seems that sending zero bytes here does - // not work on windows. - self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes())?; - self.writer.flush()?; - // ignore the error as it might have been picked up and closed - let _ = receiver.recv(); - - Ok(()) - } - - pub fn as_file(&self) -> std::fs::File { - pipe_writer_to_file(self.writer.try_clone().unwrap()) - } -} - -#[cfg(windows)] -fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File { - use std::os::windows::prelude::FromRawHandle; - use std::os::windows::prelude::IntoRawHandle; - // SAFETY: Requires consuming ownership of the provided handle - unsafe { std::fs::File::from_raw_handle(writer.into_raw_handle()) } -} - -#[cfg(unix)] -fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File { - use std::os::unix::io::FromRawFd; - use std::os::unix::io::IntoRawFd; - // SAFETY: Requires consuming ownership of the provided handle - unsafe { std::fs::File::from_raw_fd(writer.into_raw_fd()) } -} - -fn start_output_redirect_thread( - mut pipe_reader: os_pipe::PipeReader, - sender: UnboundedSender, - flush_state: Arc>>>, -) { - spawn_blocking(move || loop { - let mut buffer = [0; 512]; - let size = match pipe_reader.read(&mut buffer) { - Ok(0) | Err(_) => break, - Ok(size) => size, - }; - let oneshot_sender = flush_state.lock().take(); - let mut data = &buffer[0..size]; - if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) { - data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()]; - } - - if !data.is_empty() - && sender - .send(TestEvent::Output(buffer[0..size].to_vec())) - .is_err() - { - break; - } - - // Always respond back if this was set. Ideally we would also check to - // ensure the pipe reader is empty before sending back this response. - if let Some(sender) = oneshot_sender { - let _ignore = sender.send(()); - } - }); -} - #[cfg(test)] mod inner_test { use std::path::Path; diff --git a/ext/io/lib.rs b/ext/io/lib.rs index e0d649e0a4..8d80eec256 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -110,36 +110,36 @@ deno_core::extension!(deno_io, let t = &mut state.resource_table; let rid = t.add(fs::FileResource::new( - Rc::new(match stdio.stdin { - StdioPipe::Inherit => StdFileResourceInner::new( + Rc::new(match stdio.stdin.pipe { + StdioPipeInner::Inherit => StdFileResourceInner::new( StdFileResourceKind::Stdin, STDIN_HANDLE.try_clone().unwrap(), ), - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe), }), "stdin".to_string(), )); assert_eq!(rid, 0, "stdin must have ResourceId 0"); let rid = t.add(FileResource::new( - Rc::new(match stdio.stdout { - StdioPipe::Inherit => StdFileResourceInner::new( + Rc::new(match stdio.stdout.pipe { + StdioPipeInner::Inherit => StdFileResourceInner::new( StdFileResourceKind::Stdout, STDOUT_HANDLE.try_clone().unwrap(), ), - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe), }), "stdout".to_string(), )); assert_eq!(rid, 1, "stdout must have ResourceId 1"); let rid = t.add(FileResource::new( - Rc::new(match stdio.stderr { - StdioPipe::Inherit => StdFileResourceInner::new( + Rc::new(match stdio.stderr.pipe { + StdioPipeInner::Inherit => StdFileResourceInner::new( StdFileResourceKind::Stderr, STDERR_HANDLE.try_clone().unwrap(), ), - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe), }), "stderr".to_string(), )); @@ -148,22 +148,41 @@ deno_core::extension!(deno_io, }, ); -pub enum StdioPipe { +#[derive(Default)] +pub struct StdioPipe { + pipe: StdioPipeInner, +} + +impl StdioPipe { + pub const fn inherit() -> Self { + StdioPipe { + pipe: StdioPipeInner::Inherit, + } + } + + pub fn file(f: impl Into) -> Self { + StdioPipe { + pipe: StdioPipeInner::File(f.into()), + } + } +} + +#[derive(Default)] +enum StdioPipeInner { + #[default] Inherit, File(StdFile), } -impl Default for StdioPipe { - fn default() -> Self { - Self::Inherit - } -} - impl Clone for StdioPipe { fn clone(&self) -> Self { - match self { - StdioPipe::Inherit => StdioPipe::Inherit, - StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()), + match &self.pipe { + StdioPipeInner::Inherit => Self { + pipe: StdioPipeInner::Inherit, + }, + StdioPipeInner::File(pipe) => Self { + pipe: StdioPipeInner::File(pipe.try_clone().unwrap()), + }, } } } diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs index 0cad7b1f66..70788f7520 100644 --- a/ext/io/pipe.rs +++ b/ext/io/pipe.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::io; use std::pin::Pin; +use std::process::Stdio; // The synchronous read end of a unidirectional pipe. pub struct PipeRead { @@ -35,32 +36,50 @@ pub struct AsyncPipeWrite { } impl PipeRead { + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeRead { + pub fn into_async(self) -> io::Result { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdout = std::process::ChildStdout::from(owned); - AsyncPipeRead { - read: tokio::process::ChildStdout::from_std(stdout).unwrap(), - } + Ok(AsyncPipeRead { + read: tokio::process::ChildStdout::from_std(stdout)?, + }) } + + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeRead { - AsyncPipeRead { - read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result { + Ok(AsyncPipeRead { + read: tokio::net::unix::pipe::Receiver::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeRead`] instance that shares the same underlying file handle + /// as the existing [`PipeRead`] instance. + pub fn try_clone(&self) -> io::Result { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeRead { + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeRead { - let owned = self.read.into_owned_handle().unwrap(); - PipeRead { file: owned.into() } + pub fn into_sync(self) -> io::Result { + let owned = self.read.into_owned_handle()?; + Ok(PipeRead { file: owned.into() }) } + + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeRead { - let file = self.read.into_nonblocking_fd().unwrap().into(); - PipeRead { file } + pub fn into_sync(self) -> io::Result { + let file = self.read.into_nonblocking_fd()?.into(); + Ok(PipeRead { file }) } } @@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead { } impl PipeWrite { + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeWrite { + pub fn into_async(self) -> io::Result { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdin = std::process::ChildStdin::from(owned); - AsyncPipeWrite { - write: tokio::process::ChildStdin::from_std(stdin).unwrap(), - } + Ok(AsyncPipeWrite { + write: tokio::process::ChildStdin::from_std(stdin)?, + }) } + + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeWrite { - AsyncPipeWrite { - write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result { + Ok(AsyncPipeWrite { + write: tokio::net::unix::pipe::Sender::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle + /// as the existing [`PipeWrite`] instance. + pub fn try_clone(&self) -> io::Result { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeWrite { + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeWrite { - let owned = self.write.into_owned_handle().unwrap(); - PipeWrite { file: owned.into() } + pub fn into_sync(self) -> io::Result { + let owned = self.write.into_owned_handle()?; + Ok(PipeWrite { file: owned.into() }) } + + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeWrite { - let file = self.write.into_nonblocking_fd().unwrap().into(); - PipeWrite { file } + pub fn into_sync(self) -> io::Result { + let file = self.write.into_nonblocking_fd()?.into(); + Ok(PipeWrite { file }) } } @@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite { } } +impl From for Stdio { + fn from(val: PipeRead) -> Self { + Stdio::from(val.file) + } +} + +impl From for Stdio { + fn from(val: PipeWrite) -> Self { + Stdio::from(val.file) + } +} + +impl From for std::fs::File { + fn from(val: PipeRead) -> Self { + val.file + } +} + +impl From for std::fs::File { + fn from(val: PipeWrite) -> Self { + val.file + } +} + +#[cfg(not(windows))] +impl From for std::os::unix::io::OwnedFd { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(not(windows))] +impl From for std::os::unix::io::OwnedFd { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From for std::os::windows::io::OwnedHandle { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From for std::os::windows::io::OwnedHandle { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + /// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles, /// but either side may be promoted to an async-capable reader/writer. /// @@ -228,8 +317,8 @@ mod tests { #[tokio::test] async fn test_async_pipe() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); write.write_all(b"hello").await.unwrap(); let mut buf: [u8; 5] = Default::default(); @@ -248,8 +337,8 @@ mod tests { read.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); // Async write.write_all(b"hello").await.unwrap(); @@ -257,8 +346,8 @@ mod tests { read.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_sync(); - let mut write = write.into_sync(); + let mut read = read.into_sync().unwrap(); + let mut write = write.into_sync().unwrap(); // Sync write.write_all(b"hello").unwrap(); @@ -270,8 +359,8 @@ mod tests { #[tokio::test] async fn test_async_pipe_is_nonblocking() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); let a = tokio::spawn(async move { let mut buf: [u8; 5] = Default::default();