From 13f7f8c415f0f3ce7cb8801beb317066ef59a0e3 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sun, 23 Apr 2023 09:59:46 -0600 Subject: [PATCH] fix(ext/http): ensure that multiple upgrades and multiple simultaneous requests cannot cause a panic (#18810) Fix a bug where we weren't saving `slabId` in #18619, plus add some robustness checks around multiple upgrades (w/test). --- cli/tests/unit/serve_test.ts | 86 ++++++++++++++++++++++++++++++++++++ ext/http/00_serve.js | 11 ++++- ext/http/http_next.rs | 19 ++++++-- 3 files changed, 111 insertions(+), 5 deletions(-) diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 8344f1be5e..9268c7aab8 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -758,6 +758,92 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { await server; }); +Deno.test( + { permissions: { net: true } }, + async function httpServerWebSocketUpgradeTwice() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const { + response, + socket, + } = Deno.upgradeWebSocket(request); + assertThrows( + () => { + Deno.upgradeWebSocket(request); + }, + Deno.errors.Http, + "already upgraded", + ); + socket.onerror = (e) => { + console.error(e); + fail(); + }; + socket.onmessage = (m) => { + socket.send(m.data); + socket.close(1001); + }; + return response; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + const def = deferred(); + const ws = new WebSocket("ws://localhost:4501"); + ws.onmessage = (m) => assertEquals(m.data, "foo"); + ws.onerror = (e) => { + console.error(e); + fail(); + }; + ws.onclose = () => def.resolve(); + ws.onopen = () => ws.send("foo"); + + await def; + ac.abort(); + await server; + }, +); + +Deno.test( + { permissions: { net: true } }, + async function httpServerWebSocketCloseFast() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const { + response, + socket, + } = Deno.upgradeWebSocket(request); + socket.onopen = () => socket.close(); + return response; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + const def = deferred(); + const ws = new WebSocket("ws://localhost:4501"); + ws.onerror = (e) => { + console.error(e); + fail(); + }; + ws.onclose = () => def.resolve(); + + await def; + ac.abort(); + await server; + }, +); + Deno.test( { permissions: { net: true } }, async function httpServerWebSocketCanAccessRequest() { diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 91bd360944..3022bc5fac 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -108,6 +108,13 @@ class InnerRequest { } _wantsUpgrade(upgradeType, ...originalArgs) { + if (this.#upgraded) { + throw new Deno.errors.Http("already upgraded"); + } + if (this.#slabId === undefined) { + throw new Deno.errors.Http("already closed"); + } + // upgradeHttp is async // TODO(mmastrac) if (upgradeType == "upgradeHttp") { @@ -125,6 +132,8 @@ class InnerRequest { const response = originalArgs[0]; const ws = originalArgs[1]; + const slabId = this.#slabId; + this.url(); this.headerList; this.close(); @@ -140,7 +149,7 @@ class InnerRequest { // Returns the connection and extra bytes, which we can pass directly to op_ws_server_create const upgrade = await core.opAsync2( "op_upgrade", - this.#slabId, + slabId, response.headerList, ); const wsRid = core.ops.op_ws_server_create(upgrade[0], upgrade[1]); diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 25088e1ab0..47888f0a49 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -112,7 +112,14 @@ macro_rules! with { SLAB.with(|slab| { let mut borrow = slab.borrow_mut(); #[allow(unused_mut)] // TODO(mmastrac): compiler issue? - let mut $http = borrow.get_mut(key).unwrap(); + let mut $http = match borrow.get_mut(key) { + Some(http) => http, + None => panic!( + "Attemped to access invalid request {} ({} in total available)", + key, + borrow.len() + ), + }; #[cfg(__zombie_http_tracking)] if !$http.alive { panic!("Attempted to access a dead HTTP object") @@ -199,7 +206,11 @@ pub async fn op_upgrade( // Stage 1: set the respnse to 101 Switching Protocols and send it let upgrade = with_http_mut(index, |http| { // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - let upgrade = http.request_parts.extensions.remove::().unwrap(); + let upgrade = http + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable"))?; let response = http.response.as_mut().unwrap(); *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; @@ -210,8 +221,8 @@ pub async fn op_upgrade( ); } http.promise.complete(true); - upgrade - }); + Ok::<_, AnyError>(upgrade) + })?; // Stage 2: wait for the request to finish upgrading let upgraded = upgrade.await?;