mirror of
https://github.com/denoland/deno.git
synced 2025-01-21 21:50:00 -05:00
fix(ext/websocket): pass on uncaught errors in idleTimeout (#21846)
Fixes https://github.com/denoland/deno/issues/21840 The problem was hard to reproduce as its a race condition. I've added a test that reproduces the problem 1/10 tries. We should move the idleTimeout handling to Rust (maybe even built into fastwebsocket).
This commit is contained in:
parent
19c10c0246
commit
6db631a432
3 changed files with 83 additions and 2 deletions
|
@ -405,3 +405,33 @@ Deno.test(
|
||||||
await Promise.all([deferred.promise, server.finished]);
|
await Promise.all([deferred.promise, server.finished]);
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ sanitizeOps: false },
|
||||||
|
async function websocketServerGetsGhosted() {
|
||||||
|
const ac = new AbortController();
|
||||||
|
const listeningDeferred = Promise.withResolvers<void>();
|
||||||
|
|
||||||
|
const server = Deno.serve({
|
||||||
|
handler: (req) => {
|
||||||
|
const { socket, response } = Deno.upgradeWebSocket(req, {
|
||||||
|
idleTimeout: 2,
|
||||||
|
});
|
||||||
|
socket.onerror = () => socket.close();
|
||||||
|
socket.onclose = () => ac.abort();
|
||||||
|
return response;
|
||||||
|
},
|
||||||
|
signal: ac.signal,
|
||||||
|
onListen: () => listeningDeferred.resolve(),
|
||||||
|
hostname: "localhost",
|
||||||
|
port: servePort,
|
||||||
|
});
|
||||||
|
|
||||||
|
await listeningDeferred.promise;
|
||||||
|
const r = await fetch("http://localhost:4545/ghost_ws_client");
|
||||||
|
assertEquals(r.status, 200);
|
||||||
|
await r.body?.cancel();
|
||||||
|
|
||||||
|
await server.finished;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
|
@ -502,12 +502,15 @@ class WebSocket extends EventTarget {
|
||||||
clearTimeout(this[_idleTimeoutTimeout]);
|
clearTimeout(this[_idleTimeoutTimeout]);
|
||||||
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
||||||
if (this[_readyState] === OPEN) {
|
if (this[_readyState] === OPEN) {
|
||||||
await op_ws_send_ping(this[_rid]);
|
await PromisePrototypeCatch(op_ws_send_ping(this[_rid]), () => {});
|
||||||
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
||||||
if (this[_readyState] === OPEN) {
|
if (this[_readyState] === OPEN) {
|
||||||
this[_readyState] = CLOSING;
|
this[_readyState] = CLOSING;
|
||||||
const reason = "No response from ping frame.";
|
const reason = "No response from ping frame.";
|
||||||
await op_ws_close(this[_rid], 1001, reason);
|
await PromisePrototypeCatch(
|
||||||
|
op_ws_close(this[_rid], 1001, reason),
|
||||||
|
() => {},
|
||||||
|
);
|
||||||
this[_readyState] = CLOSED;
|
this[_readyState] = CLOSED;
|
||||||
|
|
||||||
const errEvent = new ErrorEvent("error", {
|
const errEvent = new ErrorEvent("error", {
|
||||||
|
|
|
@ -455,6 +455,54 @@ async fn main_server(
|
||||||
);
|
);
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
(&Method::GET, "/ghost_ws_client") => {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut tcp_stream = TcpStream::connect("localhost:4248").await.unwrap();
|
||||||
|
#[cfg(unix)]
|
||||||
|
// SAFETY: set socket keep alive.
|
||||||
|
unsafe {
|
||||||
|
use std::os::fd::AsRawFd;
|
||||||
|
|
||||||
|
let fd = tcp_stream.as_raw_fd();
|
||||||
|
let mut val: libc::c_int = 1;
|
||||||
|
let r = libc::setsockopt(
|
||||||
|
fd,
|
||||||
|
libc::SOL_SOCKET,
|
||||||
|
libc::SO_KEEPALIVE,
|
||||||
|
&mut val as *mut _ as *mut libc::c_void,
|
||||||
|
std::mem::size_of_val(&val) as libc::socklen_t,
|
||||||
|
);
|
||||||
|
assert_eq!(r, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Typical websocket handshake request.
|
||||||
|
let headers = [
|
||||||
|
"GET / HTTP/1.1",
|
||||||
|
"Host: localhost",
|
||||||
|
"Upgrade: websocket",
|
||||||
|
"Connection: Upgrade",
|
||||||
|
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==",
|
||||||
|
"Sec-WebSocket-Version: 13",
|
||||||
|
"\r\n",
|
||||||
|
]
|
||||||
|
.join("\r\n");
|
||||||
|
tcp_stream.write_all(headers.as_bytes()).await.unwrap();
|
||||||
|
|
||||||
|
let mut buf = [0u8; 200];
|
||||||
|
let n = tcp_stream.read(&mut buf).await.unwrap();
|
||||||
|
assert!(n > 0);
|
||||||
|
|
||||||
|
// Ghost the server:
|
||||||
|
// - Close the read half of the connection.
|
||||||
|
// - forget the TcpStream.
|
||||||
|
let tcp_stream = tcp_stream.into_std().unwrap();
|
||||||
|
let _ = tcp_stream.shutdown(std::net::Shutdown::Read);
|
||||||
|
std::mem::forget(tcp_stream);
|
||||||
|
|
||||||
|
let res = Response::new(empty_body());
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
(_, "/multipart_form_data.txt") => {
|
(_, "/multipart_form_data.txt") => {
|
||||||
let b = "Preamble\r\n\
|
let b = "Preamble\r\n\
|
||||||
--boundary\t \r\n\
|
--boundary\t \r\n\
|
||||||
|
|
Loading…
Add table
Reference in a new issue