mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
fix(io): create_named_pipe parallelism (#22597)
Investigating https://github.com/denoland/deno/issues/22574 Unable to reproduce with a unit test, but assuming that it's a name collision or create pipe/open pipe race, and adding some additional diagnostics.
This commit is contained in:
parent
eaad94687b
commit
d722de886b
2 changed files with 102 additions and 23 deletions
|
@ -25,5 +25,5 @@ tokio.workspace = true
|
||||||
os_pipe.workspace = true
|
os_pipe.workspace = true
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
winapi = { workspace = true, features = ["winbase", "processenv"] }
|
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
|
|
|
@ -3,7 +3,11 @@ use rand::thread_rng;
|
||||||
use rand::RngCore;
|
use rand::RngCore;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os::windows::io::RawHandle;
|
use std::os::windows::io::RawHandle;
|
||||||
|
use std::sync::atomic::AtomicU32;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::time::Duration;
|
||||||
use winapi::shared::minwindef::DWORD;
|
use winapi::shared::minwindef::DWORD;
|
||||||
|
use winapi::um::errhandlingapi::GetLastError;
|
||||||
use winapi::um::fileapi::CreateFileA;
|
use winapi::um::fileapi::CreateFileA;
|
||||||
use winapi::um::fileapi::OPEN_EXISTING;
|
use winapi::um::fileapi::OPEN_EXISTING;
|
||||||
use winapi::um::handleapi::CloseHandle;
|
use winapi::um::handleapi::CloseHandle;
|
||||||
|
@ -15,7 +19,6 @@ use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
|
||||||
use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
|
use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
|
||||||
use winapi::um::winbase::PIPE_READMODE_BYTE;
|
use winapi::um::winbase::PIPE_READMODE_BYTE;
|
||||||
use winapi::um::winbase::PIPE_TYPE_BYTE;
|
use winapi::um::winbase::PIPE_TYPE_BYTE;
|
||||||
use winapi::um::winnt::FILE_ATTRIBUTE_NORMAL;
|
|
||||||
use winapi::um::winnt::GENERIC_READ;
|
use winapi::um::winnt::GENERIC_READ;
|
||||||
use winapi::um::winnt::GENERIC_WRITE;
|
use winapi::um::winnt::GENERIC_WRITE;
|
||||||
|
|
||||||
|
@ -28,10 +31,23 @@ use winapi::um::winnt::GENERIC_WRITE;
|
||||||
/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely
|
/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely
|
||||||
/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe
|
/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe
|
||||||
pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
|
pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
|
||||||
|
// Silently retry up to 10 times.
|
||||||
|
for _ in 0..10 {
|
||||||
|
if let Ok(res) = create_named_pipe_inner() {
|
||||||
|
return Ok(res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
create_named_pipe_inner()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_named_pipe_inner() -> io::Result<(RawHandle, RawHandle)> {
|
||||||
|
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
|
||||||
|
// Create an extremely-likely-unique pipe name from randomness, identity and a serial counter.
|
||||||
let pipe_name = format!(
|
let pipe_name = format!(
|
||||||
r#"\\.\pipe\deno_pipe_{:x}_{:x}\0"#,
|
r#"\\.\pipe\deno_pipe_{:x}.{:x}.{:x}\0"#,
|
||||||
|
thread_rng().next_u64(),
|
||||||
std::process::id(),
|
std::process::id(),
|
||||||
thread_rng().next_u64()
|
NEXT_ID.fetch_add(1, Ordering::SeqCst),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create security attributes to make the pipe handles non-inheritable
|
// Create security attributes to make the pipe handles non-inheritable
|
||||||
|
@ -62,32 +78,58 @@ pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
|
||||||
};
|
};
|
||||||
|
|
||||||
if server_handle == INVALID_HANDLE_VALUE {
|
if server_handle == INVALID_HANDLE_VALUE {
|
||||||
|
// This should not happen, so we would like to get some better diagnostics here.
|
||||||
|
// SAFETY: Printing last error for diagnostics
|
||||||
|
unsafe {
|
||||||
|
eprintln!("*** Unexpected server pipe failure: {:x}", GetLastError());
|
||||||
|
}
|
||||||
return Err(io::Error::last_os_error());
|
return Err(io::Error::last_os_error());
|
||||||
}
|
}
|
||||||
|
|
||||||
// SAFETY: Create the pipe client with non-inheritable handle
|
// The pipe might not be ready yet in rare cases, so we loop for a bit
|
||||||
let client_handle = unsafe {
|
for i in 0..10 {
|
||||||
CreateFileA(
|
// SAFETY: Create the pipe client with non-inheritable handle
|
||||||
pipe_name.as_ptr() as *const i8,
|
let client_handle = unsafe {
|
||||||
GENERIC_READ | GENERIC_WRITE | FILE_FLAG_OVERLAPPED,
|
CreateFileA(
|
||||||
0,
|
pipe_name.as_ptr() as *const i8,
|
||||||
&mut security_attributes,
|
GENERIC_READ | GENERIC_WRITE,
|
||||||
OPEN_EXISTING,
|
0,
|
||||||
FILE_ATTRIBUTE_NORMAL,
|
&mut security_attributes,
|
||||||
std::ptr::null_mut(),
|
OPEN_EXISTING,
|
||||||
)
|
FILE_FLAG_OVERLAPPED,
|
||||||
};
|
std::ptr::null_mut(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
if client_handle == INVALID_HANDLE_VALUE {
|
// There is a very rare case where the pipe is not ready to open. If we get `ERROR_PATH_NOT_FOUND`,
|
||||||
let err = io::Error::last_os_error();
|
// we spin and try again in 1-10ms.
|
||||||
// SAFETY: Close the handles if we failed
|
if client_handle == INVALID_HANDLE_VALUE {
|
||||||
unsafe {
|
// SAFETY: Getting last error for diagnostics
|
||||||
CloseHandle(server_handle);
|
let error = unsafe { GetLastError() };
|
||||||
|
if error == winapi::shared::winerror::ERROR_FILE_NOT_FOUND
|
||||||
|
|| error == winapi::shared::winerror::ERROR_PATH_NOT_FOUND
|
||||||
|
{
|
||||||
|
// Exponential backoff, but don't sleep longer than 10ms
|
||||||
|
eprintln!("*** Unexpected client pipe not found failure: {:x}", error);
|
||||||
|
std::thread::sleep(Duration::from_millis(10.min(2_u64.pow(i) + 1)));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should not happen, so we would like to get some better diagnostics here.
|
||||||
|
eprintln!("*** Unexpected client pipe failure: {:x}", error);
|
||||||
|
let err = io::Error::last_os_error();
|
||||||
|
// SAFETY: Close the handles if we failed
|
||||||
|
unsafe {
|
||||||
|
CloseHandle(server_handle);
|
||||||
|
}
|
||||||
|
return Err(err);
|
||||||
}
|
}
|
||||||
return Err(err);
|
|
||||||
|
return Ok((server_handle, client_handle));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((server_handle, client_handle))
|
// We failed to open the pipe despite sleeping
|
||||||
|
Err(std::io::ErrorKind::NotFound.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -97,6 +139,8 @@ mod tests {
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::os::windows::io::FromRawHandle;
|
use std::os::windows::io::FromRawHandle;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::Barrier;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn make_named_pipe() {
|
fn make_named_pipe() {
|
||||||
|
@ -112,4 +156,39 @@ mod tests {
|
||||||
client.read_exact(&mut buf).unwrap();
|
client.read_exact(&mut buf).unwrap();
|
||||||
assert_eq!(&buf, b"hello");
|
assert_eq!(&buf, b"hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn make_many_named_pipes_serial() {
|
||||||
|
let mut handles = vec![];
|
||||||
|
for _ in 0..100 {
|
||||||
|
let (server, client) = create_named_pipe().unwrap();
|
||||||
|
// SAFETY: For testing
|
||||||
|
let server = unsafe { File::from_raw_handle(server) };
|
||||||
|
// SAFETY: For testing
|
||||||
|
let client = unsafe { File::from_raw_handle(client) };
|
||||||
|
handles.push((server, client))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn make_many_named_pipes_parallel() {
|
||||||
|
let mut handles = vec![];
|
||||||
|
let barrier = Arc::new(Barrier::new(50));
|
||||||
|
for _ in 0..50 {
|
||||||
|
let barrier = barrier.clone();
|
||||||
|
handles.push(std::thread::spawn(move || {
|
||||||
|
barrier.wait();
|
||||||
|
let (server, client) = create_named_pipe().unwrap();
|
||||||
|
// SAFETY: For testing
|
||||||
|
let server = unsafe { File::from_raw_handle(server) };
|
||||||
|
// SAFETY: For testing
|
||||||
|
let client = unsafe { File::from_raw_handle(client) };
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
drop((server, client));
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
for handle in handles.drain(..) {
|
||||||
|
handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue