0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

feat(ext/websocket): server automatically handle ping/pong for incoming WebSocket (#13172)

This commit is contained in:
Leo Kettmeir 2022-01-06 17:41:16 +01:00 committed by GitHub
parent 46f2ff1205
commit 2067820714
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 137 additions and 2 deletions

View file

@ -2745,6 +2745,14 @@ declare namespace Deno {
export interface UpgradeWebSocketOptions {
protocol?: string;
/**
* If the client does not respond to this frame with a
* `pong` within the timeout specified, the connection is deemed
* unhealthy and is closed. The `close` and `error` event will be emitted.
*
* The default is 120 seconds. Set to 0 to disable timeouts.
*/
idleTimeout?: number;
}
/**

View file

@ -735,6 +735,39 @@ fn websocket_server_multi_field_connection_header() {
assert!(child.wait().unwrap().success());
}
#[test]
fn websocket_server_idletimeout() {
let script = util::testdata_path().join("websocket_server_idletimeout.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem");
let mut child = util::deno_cmd()
.arg("test")
.arg("--unstable")
.arg("--allow-net")
.arg("--cert")
.arg(root_ca)
.arg(script)
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
let stdout = child.stdout.as_mut().unwrap();
let mut buffer = [0; 5];
let read = stdout.read(&mut buffer).unwrap();
assert_eq!(read, 5);
let msg = std::str::from_utf8(&buffer).unwrap();
assert_eq!(msg, "READY");
let req = http::request::Builder::new()
.uri("ws://localhost:4502")
.body(())
.unwrap();
let (_ws, _request) =
deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req)
.unwrap();
assert!(child.wait().unwrap().success());
}
#[cfg(not(windows))]
#[test]
fn set_raw_should_not_panic_on_no_tty() {

View file

@ -0,0 +1,26 @@
import { assertEquals } from "../../../test_util/std/testing/asserts.ts";
import { deferred } from "../../../test_util/std/async/deferred.ts";
const errorDeferred = deferred();
const closeDeferred = deferred();
const listener = Deno.listen({ port: 4502 });
console.log("READY");
const httpConn = Deno.serveHttp(await listener.accept());
const { request, respondWith } = (await httpConn.nextRequest())!;
const { response, socket } = Deno.upgradeWebSocket(request, {
idleTimeout: 1,
});
socket.onerror = (e) => {
assertEquals((e as ErrorEvent).message, "No response from ping frame.");
errorDeferred.resolve();
};
socket.onclose = (e) => {
assertEquals(e.reason, "No response from ping frame.");
closeDeferred.resolve();
};
await respondWith(response);
await errorDeferred;
await closeDeferred;
listener.close();

View file

@ -17,8 +17,17 @@
const { BadResource, Interrupted } = core;
const { ReadableStream } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
const { WebSocket, _rid, _readyState, _eventLoop, _protocol, _server } =
window.__bootstrap.webSocket;
const {
WebSocket,
_rid,
_readyState,
_eventLoop,
_protocol,
_server,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_serverHandleIdleTimeout,
} = window.__bootstrap.webSocket;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@ -277,6 +286,13 @@
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
}
} finally {
@ -378,6 +394,8 @@
setEventTargetData(socket);
socket[_server] = true;
response[_ws] = socket;
socket[_idleTimeoutDuration] = options.idleTimeout ?? 120;
socket[_idleTimeoutTimeout] = null;
return { response, socket };
}

View file

@ -69,7 +69,11 @@
const _binaryType = Symbol("[[binaryType]]");
const _bufferedAmount = Symbol("[[bufferedAmount]]");
const _eventLoop = Symbol("[[eventLoop]]");
const _server = Symbol("[[server]]");
const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
class WebSocket extends EventTarget {
[_rid];
@ -388,6 +392,7 @@
switch (kind) {
case "string": {
this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", {
data: value,
origin: this[_url],
@ -396,6 +401,7 @@
break;
}
case "binary": {
this[_serverHandleIdleTimeout]();
let data;
if (this.binaryType === "blob") {
@ -417,9 +423,14 @@
});
break;
}
case "pong": {
this[_serverHandleIdleTimeout]();
break;
}
case "closed":
case "close": {
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
const event = new CloseEvent("close", {
wasClean: true,
@ -446,6 +457,40 @@
}
}
}
[_serverHandleIdleTimeout]() {
if (this[_idleTimeoutDuration]) {
clearTimeout(this[_idleTimeoutTimeout]);
this[_idleTimeoutTimeout] = setTimeout(async () => {
await core.opAsync("op_ws_send", this[_rid], {
kind: "ping",
});
this[_idleTimeoutTimeout] = setTimeout(async () => {
this[_readyState] = CLOSING;
const reason = "No response from ping frame.";
await core.opAsync("op_ws_close", {
rid: this[_rid],
code: 1001,
reason,
});
this[_readyState] = CLOSED;
const errEvent = new ErrorEvent("error", {
message: reason,
});
this.dispatchEvent(errEvent);
const event = new CloseEvent("close", {
wasClean: false,
code: 1001,
reason,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
}, (this[_idleTimeoutDuration] / 2) * 1000);
}, (this[_idleTimeoutDuration] / 2) * 1000);
}
}
}
ObjectDefineProperties(WebSocket, {
@ -477,5 +522,8 @@
_eventLoop,
_protocol,
_server,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_serverHandleIdleTimeout,
};
})(this);

View file

@ -376,6 +376,7 @@ pub enum SendValue {
Text(String),
Binary(ZeroCopyBuf),
Pong,
Ping,
}
pub async fn op_ws_send(
@ -387,6 +388,7 @@ pub async fn op_ws_send(
SendValue::Text(text) => Message::Text(text),
SendValue::Binary(buf) => Message::Binary(buf.to_vec()),
SendValue::Pong => Message::Pong(vec![]),
SendValue::Ping => Message::Ping(vec![]),
};
let resource = state