From 2067820714fea49be1692fa678754488ace8228b Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Thu, 6 Jan 2022 17:41:16 +0100 Subject: [PATCH] feat(ext/websocket): server automatically handle ping/pong for incoming WebSocket (#13172) --- cli/dts/lib.deno.ns.d.ts | 8 ++++ cli/tests/integration/mod.rs | 33 +++++++++++++ .../testdata/websocket_server_idletimeout.ts | 26 ++++++++++ ext/http/01_http.js | 22 ++++++++- ext/websocket/01_websocket.js | 48 +++++++++++++++++++ ext/websocket/lib.rs | 2 + 6 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 cli/tests/testdata/websocket_server_idletimeout.ts diff --git a/cli/dts/lib.deno.ns.d.ts b/cli/dts/lib.deno.ns.d.ts index 914c63b246..03ba3d8982 100644 --- a/cli/dts/lib.deno.ns.d.ts +++ b/cli/dts/lib.deno.ns.d.ts @@ -2745,6 +2745,14 @@ declare namespace Deno { export interface UpgradeWebSocketOptions { protocol?: string; + /** + * If the client does not respond to this frame with a + * `pong` within the timeout specified, the connection is deemed + * unhealthy and is closed. The `close` and `error` event will be emitted. + * + * The default is 120 seconds. Set to 0 to disable timeouts. + */ + idleTimeout?: number; } /** diff --git a/cli/tests/integration/mod.rs b/cli/tests/integration/mod.rs index 150683749a..3c067c17f1 100644 --- a/cli/tests/integration/mod.rs +++ b/cli/tests/integration/mod.rs @@ -735,6 +735,39 @@ fn websocket_server_multi_field_connection_header() { assert!(child.wait().unwrap().success()); } +#[test] +fn websocket_server_idletimeout() { + let script = util::testdata_path().join("websocket_server_idletimeout.ts"); + let root_ca = util::testdata_path().join("tls/RootCA.pem"); + let mut child = util::deno_cmd() + .arg("test") + .arg("--unstable") + .arg("--allow-net") + .arg("--cert") + .arg(root_ca) + .arg(script) + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let stdout = child.stdout.as_mut().unwrap(); + let mut buffer = [0; 5]; + let read = stdout.read(&mut buffer).unwrap(); + assert_eq!(read, 5); + let msg = std::str::from_utf8(&buffer).unwrap(); + assert_eq!(msg, "READY"); + + let req = http::request::Builder::new() + .uri("ws://localhost:4502") + .body(()) + .unwrap(); + let (_ws, _request) = + deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req) + .unwrap(); + + assert!(child.wait().unwrap().success()); +} + #[cfg(not(windows))] #[test] fn set_raw_should_not_panic_on_no_tty() { diff --git a/cli/tests/testdata/websocket_server_idletimeout.ts b/cli/tests/testdata/websocket_server_idletimeout.ts new file mode 100644 index 0000000000..ffd88c3d37 --- /dev/null +++ b/cli/tests/testdata/websocket_server_idletimeout.ts @@ -0,0 +1,26 @@ +import { assertEquals } from "../../../test_util/std/testing/asserts.ts"; +import { deferred } from "../../../test_util/std/async/deferred.ts"; + +const errorDeferred = deferred(); +const closeDeferred = deferred(); + +const listener = Deno.listen({ port: 4502 }); +console.log("READY"); +const httpConn = Deno.serveHttp(await listener.accept()); +const { request, respondWith } = (await httpConn.nextRequest())!; +const { response, socket } = Deno.upgradeWebSocket(request, { + idleTimeout: 1, +}); +socket.onerror = (e) => { + assertEquals((e as ErrorEvent).message, "No response from ping frame."); + errorDeferred.resolve(); +}; +socket.onclose = (e) => { + assertEquals(e.reason, "No response from ping frame."); + closeDeferred.resolve(); +}; +await respondWith(response); + +await errorDeferred; +await closeDeferred; +listener.close(); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 7620d154a1..4922ed07ab 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -17,8 +17,17 @@ const { BadResource, Interrupted } = core; const { ReadableStream } = window.__bootstrap.streams; const abortSignal = window.__bootstrap.abortSignal; - const { WebSocket, _rid, _readyState, _eventLoop, _protocol, _server } = - window.__bootstrap.webSocket; + const { + WebSocket, + _rid, + _readyState, + _eventLoop, + _protocol, + _server, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _serverHandleIdleTimeout, + } = window.__bootstrap.webSocket; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -277,6 +286,13 @@ ws.dispatchEvent(event); ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); } } } finally { @@ -378,6 +394,8 @@ setEventTargetData(socket); socket[_server] = true; response[_ws] = socket; + socket[_idleTimeoutDuration] = options.idleTimeout ?? 120; + socket[_idleTimeoutTimeout] = null; return { response, socket }; } diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index e11a2c55f3..46cb584415 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -69,7 +69,11 @@ const _binaryType = Symbol("[[binaryType]]"); const _bufferedAmount = Symbol("[[bufferedAmount]]"); const _eventLoop = Symbol("[[eventLoop]]"); + const _server = Symbol("[[server]]"); + const _idleTimeoutDuration = Symbol("[[idleTimeout]]"); + const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]"); + const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]"); class WebSocket extends EventTarget { [_rid]; @@ -388,6 +392,7 @@ switch (kind) { case "string": { + this[_serverHandleIdleTimeout](); const event = new MessageEvent("message", { data: value, origin: this[_url], @@ -396,6 +401,7 @@ break; } case "binary": { + this[_serverHandleIdleTimeout](); let data; if (this.binaryType === "blob") { @@ -417,9 +423,14 @@ }); break; } + case "pong": { + this[_serverHandleIdleTimeout](); + break; + } case "closed": case "close": { this[_readyState] = CLOSED; + clearTimeout(this[_idleTimeoutTimeout]); const event = new CloseEvent("close", { wasClean: true, @@ -446,6 +457,40 @@ } } } + + [_serverHandleIdleTimeout]() { + if (this[_idleTimeoutDuration]) { + clearTimeout(this[_idleTimeoutTimeout]); + this[_idleTimeoutTimeout] = setTimeout(async () => { + await core.opAsync("op_ws_send", this[_rid], { + kind: "ping", + }); + this[_idleTimeoutTimeout] = setTimeout(async () => { + this[_readyState] = CLOSING; + const reason = "No response from ping frame."; + await core.opAsync("op_ws_close", { + rid: this[_rid], + code: 1001, + reason, + }); + this[_readyState] = CLOSED; + + const errEvent = new ErrorEvent("error", { + message: reason, + }); + this.dispatchEvent(errEvent); + + const event = new CloseEvent("close", { + wasClean: false, + code: 1001, + reason, + }); + this.dispatchEvent(event); + core.tryClose(this[_rid]); + }, (this[_idleTimeoutDuration] / 2) * 1000); + }, (this[_idleTimeoutDuration] / 2) * 1000); + } + } } ObjectDefineProperties(WebSocket, { @@ -477,5 +522,8 @@ _eventLoop, _protocol, _server, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _serverHandleIdleTimeout, }; })(this); diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 544423066f..3e245afbd3 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -376,6 +376,7 @@ pub enum SendValue { Text(String), Binary(ZeroCopyBuf), Pong, + Ping, } pub async fn op_ws_send( @@ -387,6 +388,7 @@ pub async fn op_ws_send( SendValue::Text(text) => Message::Text(text), SendValue::Binary(buf) => Message::Binary(buf.to_vec()), SendValue::Pong => Message::Pong(vec![]), + SendValue::Ping => Message::Ping(vec![]), }; let resource = state