mirror of
https://github.com/denoland/deno.git
synced 2025-02-21 04:42:51 -05:00
fix(cli): remove possible deadlock in test channel (#22662)
The stderr stream could possibly starve the other bits of the output-redirecting event loop. Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
2b9c879146
commit
8f359181e6
1 changed files with 39 additions and 9 deletions
|
@ -5,16 +5,21 @@ use super::TestStdioStream;
|
||||||
use deno_core::futures::future::poll_fn;
|
use deno_core::futures::future::poll_fn;
|
||||||
use deno_core::parking_lot;
|
use deno_core::parking_lot;
|
||||||
use deno_core::parking_lot::lock_api::RawMutex;
|
use deno_core::parking_lot::lock_api::RawMutex;
|
||||||
|
use deno_core::parking_lot::lock_api::RawMutexTimed;
|
||||||
use deno_runtime::deno_io::pipe;
|
use deno_runtime::deno_io::pipe;
|
||||||
use deno_runtime::deno_io::AsyncPipeRead;
|
use deno_runtime::deno_io::AsyncPipeRead;
|
||||||
use deno_runtime::deno_io::PipeRead;
|
use deno_runtime::deno_io::PipeRead;
|
||||||
use deno_runtime::deno_io::PipeWrite;
|
use deno_runtime::deno_io::PipeWrite;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
use std::future::Future;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::task::ready;
|
||||||
|
use std::task::Poll;
|
||||||
|
use std::time::Duration;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::io::ReadBuf;
|
use tokio::io::ReadBuf;
|
||||||
|
@ -143,17 +148,23 @@ impl TestStream {
|
||||||
self.read_opt.is_some()
|
self.read_opt.is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cancellation-safe.
|
||||||
|
#[inline]
|
||||||
|
fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
|
||||||
|
poll_fn(|cx| self.poll_pipe(cx))
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to read from a given stream, pushing all of the data in it into the given
|
/// Attempt to read from a given stream, pushing all of the data in it into the given
|
||||||
/// [`UnboundedSender`] before returning.
|
/// [`UnboundedSender`] before returning.
|
||||||
async fn pipe(&mut self) {
|
fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
|
||||||
let mut buffer = [0_u8; BUFFER_SIZE];
|
let mut buffer = [0_u8; BUFFER_SIZE];
|
||||||
let mut buf = ReadBuf::new(&mut buffer);
|
let mut buf = ReadBuf::new(&mut buffer);
|
||||||
let res = {
|
let res = {
|
||||||
// No more stream, so just return.
|
// No more stream, we shouldn't hit this case.
|
||||||
let Some(stream) = &mut self.read_opt else {
|
let Some(stream) = &mut self.read_opt else {
|
||||||
return;
|
unreachable!();
|
||||||
};
|
};
|
||||||
poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
|
ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
|
||||||
};
|
};
|
||||||
match res {
|
match res {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
@ -173,6 +184,7 @@ impl TestStream {
|
||||||
self.read_opt.take();
|
self.read_opt.take();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read and "block" until the sync markers have been read.
|
/// Read and "block" until the sync markers have been read.
|
||||||
|
@ -249,11 +261,21 @@ impl TestEventSenderFactory {
|
||||||
let mut test_stderr =
|
let mut test_stderr =
|
||||||
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;
|
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;
|
||||||
|
|
||||||
|
// This ensures that the stdout and stderr streams in the select! loop below cannot starve each
|
||||||
|
// other.
|
||||||
|
let mut alternate_stream_priority = false;
|
||||||
|
|
||||||
// This function will be woken whenever a stream or the receiver is ready
|
// This function will be woken whenever a stream or the receiver is ready
|
||||||
loop {
|
loop {
|
||||||
|
alternate_stream_priority = !alternate_stream_priority;
|
||||||
|
let (a, b) = if alternate_stream_priority {
|
||||||
|
(&mut test_stdout, &mut test_stderr)
|
||||||
|
} else {
|
||||||
|
(&mut test_stderr, &mut test_stdout)
|
||||||
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = test_stdout.pipe(), if test_stdout.is_alive() => {},
|
biased; // We actually want to poll the channel first
|
||||||
_ = test_stderr.pipe(), if test_stdout.is_alive() => {},
|
|
||||||
recv = sync_receiver.recv() => {
|
recv = sync_receiver.recv() => {
|
||||||
match recv {
|
match recv {
|
||||||
// If the channel closed, we assume that all important data from the streams was synced,
|
// If the channel closed, we assume that all important data from the streams was synced,
|
||||||
|
@ -273,6 +295,10 @@ impl TestEventSenderFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
|
||||||
|
// This is necessary because of the `biased` flag above to avoid starvation.
|
||||||
|
_ = a.pipe(), if a.is_alive() => {},
|
||||||
|
_ = b.pipe(), if b.is_alive() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,7 +403,12 @@ impl TestEventSender {
|
||||||
let mutex = parking_lot::RawMutex::INIT;
|
let mutex = parking_lot::RawMutex::INIT;
|
||||||
mutex.lock();
|
mutex.lock();
|
||||||
self.sync_sender.send(SendMutex(&mutex as _))?;
|
self.sync_sender.send(SendMutex(&mutex as _))?;
|
||||||
mutex.lock();
|
if !mutex.try_lock_for(Duration::from_secs(30)) {
|
||||||
|
panic!(
|
||||||
|
"Test flush deadlock, sender closed = {}",
|
||||||
|
self.sync_sender.is_closed()
|
||||||
|
);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -444,10 +475,9 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test that flushing a large number of times doesn't hang.
|
/// Test that flushing a large number of times doesn't hang.
|
||||||
#[ignore]
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_flush_lots() {
|
async fn test_flush_lots() {
|
||||||
test_util::timeout!(60);
|
test_util::timeout!(240);
|
||||||
let (mut worker, mut receiver) = create_single_test_event_channel();
|
let (mut worker, mut receiver) = create_single_test_event_channel();
|
||||||
let recv_handle = spawn(async move {
|
let recv_handle = spawn(async move {
|
||||||
let mut queue = vec![];
|
let mut queue = vec![];
|
||||||
|
|
Loading…
Add table
Reference in a new issue