diff --git a/cli/tests/testdata/websocketstream_test.ts b/cli/tests/testdata/websocketstream_test.ts index db9d1a094e..71969314ec 100644 --- a/cli/tests/testdata/websocketstream_test.ts +++ b/cli/tests/testdata/websocketstream_test.ts @@ -201,3 +201,135 @@ Deno.test("forbidden headers", async () => { await ws.closed; listener.close(); }); + +Deno.test("sync close with empty stream", async () => { + const listener = Deno.listen({ port: 4512 }); + const promise = (async () => { + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const { request, respondWith } = (await httpConn.nextRequest())!; + const { response, socket } = Deno.upgradeWebSocket(request); + const p = new Promise((resolve) => { + socket.onopen = () => { + socket.send("first message"); + socket.send("second message"); + }; + socket.onclose = () => resolve(); + }); + await respondWith(response); + await p; + })(); + + const ws = new WebSocketStream("ws://localhost:4512"); + const { readable } = await ws.connection; + const reader = readable.getReader(); + const firstMessage = await reader.read(); + assertEquals(firstMessage.value, "first message"); + const secondMessage = await reader.read(); + assertEquals(secondMessage.value, "second message"); + ws.close({ code: 1000 }); + await ws.closed; + await promise; + listener.close(); +}); + +Deno.test("sync close with unread messages in stream", async () => { + const listener = Deno.listen({ port: 4512 }); + const promise = (async () => { + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const { request, respondWith } = (await httpConn.nextRequest())!; + const { response, socket } = Deno.upgradeWebSocket(request); + const p = new Promise((resolve) => { + socket.onopen = () => { + socket.send("first message"); + socket.send("second message"); + socket.send("third message"); + socket.send("fourth message"); + }; + socket.onclose = () => resolve(); + }); + await respondWith(response); + await p; + })(); + + const ws = new WebSocketStream("ws://localhost:4512"); + const { readable } = await ws.connection; + const reader = readable.getReader(); + const firstMessage = await reader.read(); + assertEquals(firstMessage.value, "first message"); + const secondMessage = await reader.read(); + assertEquals(secondMessage.value, "second message"); + ws.close({ code: 1000 }); + await ws.closed; + await promise; + listener.close(); +}); + +Deno.test("async close with empty stream", async () => { + const listener = Deno.listen({ port: 4512 }); + const promise = (async () => { + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const { request, respondWith } = (await httpConn.nextRequest())!; + const { response, socket } = Deno.upgradeWebSocket(request); + const p = new Promise((resolve) => { + socket.onopen = () => { + socket.send("first message"); + socket.send("second message"); + }; + socket.onclose = () => resolve(); + }); + await respondWith(response); + await p; + })(); + + const ws = new WebSocketStream("ws://localhost:4512"); + const { readable } = await ws.connection; + const reader = readable.getReader(); + const firstMessage = await reader.read(); + assertEquals(firstMessage.value, "first message"); + const secondMessage = await reader.read(); + assertEquals(secondMessage.value, "second message"); + setTimeout(() => { + ws.close({ code: 1000 }); + }, 0); + await ws.closed; + await promise; + listener.close(); +}); + +Deno.test("async close with unread messages in stream", async () => { + const listener = Deno.listen({ port: 4512 }); + const promise = (async () => { + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const { request, respondWith } = (await httpConn.nextRequest())!; + const { response, socket } = Deno.upgradeWebSocket(request); + const p = new Promise((resolve) => { + socket.onopen = () => { + socket.send("first message"); + socket.send("second message"); + socket.send("third message"); + socket.send("fourth message"); + }; + socket.onclose = () => resolve(); + }); + await respondWith(response); + await p; + })(); + + const ws = new WebSocketStream("ws://localhost:4512"); + const { readable } = await ws.connection; + const reader = readable.getReader(); + const firstMessage = await reader.read(); + assertEquals(firstMessage.value, "first message"); + const secondMessage = await reader.read(); + assertEquals(secondMessage.value, "second message"); + setTimeout(() => { + ws.close({ code: 1000 }); + }, 0); + await ws.closed; + await promise; + listener.close(); +}); diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 5266c8dfb6..cf83fe4c7d 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -64,11 +64,14 @@ ], ); + const CLOSE_RESPONSE_TIMEOUT = 5000; + const _rid = Symbol("[[rid]]"); const _url = Symbol("[[url]]"); const _connection = Symbol("[[connection]]"); const _closed = Symbol("[[closed]]"); const _earlyClose = Symbol("[[earlyClose]]"); + const _closeSent = Symbol("[[closeSent]]"); class WebSocketStream { [_rid]; @@ -268,6 +271,21 @@ break; } } + + if ( + this[_closeSent].state === "fulfilled" && + this[_closed].state === "pending" + ) { + if ( + new Date().getTime() - await this[_closeSent].promise <= + CLOSE_RESPONSE_TIMEOUT + ) { + return pull(controller); + } + + this[_closed].resolve(value); + core.tryClose(this[_rid]); + } }; const readable = new ReadableStream({ start: (controller) => { @@ -286,6 +304,12 @@ // needed to ignore warnings & assertions } }); + + PromisePrototypeThen(this[_closeSent].promise, () => { + if (this[_closed].state === "pending") { + return pull(controller); + } + }); }, pull, cancel: async (reason) => { @@ -328,6 +352,7 @@ [_earlyClose] = false; [_closed] = new Deferred(); + [_closeSent] = new Deferred(); get closed() { webidl.assertBranded(this, WebSocketStreamPrototype); return this[_closed].promise; @@ -369,8 +394,13 @@ if (this[_connection].state === "pending") { this[_earlyClose] = true; } else if (this[_closed].state === "pending") { - PromisePrototypeCatch( + PromisePrototypeThen( core.opAsync("op_ws_close", this[_rid], code, closeInfo.reason), + () => { + setTimeout(() => { + this[_closeSent].resolve(new Date().getTime()); + }, 0); + }, (err) => { this[_rid] && core.tryClose(this[_rid]); this[_closed].reject(err);