0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

fix(ext/websocket): fix closing of WebSocketStream with unread messages (#15632)

This commit is contained in:
Danny Povolotski 2022-08-30 04:43:17 +03:00 committed by GitHub
parent c3e48cba18
commit 5e0fa5dd88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 1 deletions

View file

@ -201,3 +201,135 @@ Deno.test("forbidden headers", async () => {
await ws.closed;
listener.close();
});
Deno.test("sync close with empty stream", async () => {
const listener = Deno.listen({ port: 4512 });
const promise = (async () => {
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const { response, socket } = Deno.upgradeWebSocket(request);
const p = new Promise<void>((resolve) => {
socket.onopen = () => {
socket.send("first message");
socket.send("second message");
};
socket.onclose = () => resolve();
});
await respondWith(response);
await p;
})();
const ws = new WebSocketStream("ws://localhost:4512");
const { readable } = await ws.connection;
const reader = readable.getReader();
const firstMessage = await reader.read();
assertEquals(firstMessage.value, "first message");
const secondMessage = await reader.read();
assertEquals(secondMessage.value, "second message");
ws.close({ code: 1000 });
await ws.closed;
await promise;
listener.close();
});
Deno.test("sync close with unread messages in stream", async () => {
const listener = Deno.listen({ port: 4512 });
const promise = (async () => {
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const { response, socket } = Deno.upgradeWebSocket(request);
const p = new Promise<void>((resolve) => {
socket.onopen = () => {
socket.send("first message");
socket.send("second message");
socket.send("third message");
socket.send("fourth message");
};
socket.onclose = () => resolve();
});
await respondWith(response);
await p;
})();
const ws = new WebSocketStream("ws://localhost:4512");
const { readable } = await ws.connection;
const reader = readable.getReader();
const firstMessage = await reader.read();
assertEquals(firstMessage.value, "first message");
const secondMessage = await reader.read();
assertEquals(secondMessage.value, "second message");
ws.close({ code: 1000 });
await ws.closed;
await promise;
listener.close();
});
Deno.test("async close with empty stream", async () => {
const listener = Deno.listen({ port: 4512 });
const promise = (async () => {
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const { response, socket } = Deno.upgradeWebSocket(request);
const p = new Promise<void>((resolve) => {
socket.onopen = () => {
socket.send("first message");
socket.send("second message");
};
socket.onclose = () => resolve();
});
await respondWith(response);
await p;
})();
const ws = new WebSocketStream("ws://localhost:4512");
const { readable } = await ws.connection;
const reader = readable.getReader();
const firstMessage = await reader.read();
assertEquals(firstMessage.value, "first message");
const secondMessage = await reader.read();
assertEquals(secondMessage.value, "second message");
setTimeout(() => {
ws.close({ code: 1000 });
}, 0);
await ws.closed;
await promise;
listener.close();
});
Deno.test("async close with unread messages in stream", async () => {
const listener = Deno.listen({ port: 4512 });
const promise = (async () => {
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const { response, socket } = Deno.upgradeWebSocket(request);
const p = new Promise<void>((resolve) => {
socket.onopen = () => {
socket.send("first message");
socket.send("second message");
socket.send("third message");
socket.send("fourth message");
};
socket.onclose = () => resolve();
});
await respondWith(response);
await p;
})();
const ws = new WebSocketStream("ws://localhost:4512");
const { readable } = await ws.connection;
const reader = readable.getReader();
const firstMessage = await reader.read();
assertEquals(firstMessage.value, "first message");
const secondMessage = await reader.read();
assertEquals(secondMessage.value, "second message");
setTimeout(() => {
ws.close({ code: 1000 });
}, 0);
await ws.closed;
await promise;
listener.close();
});

View file

@ -64,11 +64,14 @@
],
);
const CLOSE_RESPONSE_TIMEOUT = 5000;
const _rid = Symbol("[[rid]]");
const _url = Symbol("[[url]]");
const _connection = Symbol("[[connection]]");
const _closed = Symbol("[[closed]]");
const _earlyClose = Symbol("[[earlyClose]]");
const _closeSent = Symbol("[[closeSent]]");
class WebSocketStream {
[_rid];
@ -268,6 +271,21 @@
break;
}
}
if (
this[_closeSent].state === "fulfilled" &&
this[_closed].state === "pending"
) {
if (
new Date().getTime() - await this[_closeSent].promise <=
CLOSE_RESPONSE_TIMEOUT
) {
return pull(controller);
}
this[_closed].resolve(value);
core.tryClose(this[_rid]);
}
};
const readable = new ReadableStream({
start: (controller) => {
@ -286,6 +304,12 @@
// needed to ignore warnings & assertions
}
});
PromisePrototypeThen(this[_closeSent].promise, () => {
if (this[_closed].state === "pending") {
return pull(controller);
}
});
},
pull,
cancel: async (reason) => {
@ -328,6 +352,7 @@
[_earlyClose] = false;
[_closed] = new Deferred();
[_closeSent] = new Deferred();
get closed() {
webidl.assertBranded(this, WebSocketStreamPrototype);
return this[_closed].promise;
@ -369,8 +394,13 @@
if (this[_connection].state === "pending") {
this[_earlyClose] = true;
} else if (this[_closed].state === "pending") {
PromisePrototypeCatch(
PromisePrototypeThen(
core.opAsync("op_ws_close", this[_rid], code, closeInfo.reason),
() => {
setTimeout(() => {
this[_closeSent].resolve(new Date().getTime());
}, 0);
},
(err) => {
this[_rid] && core.tryClose(this[_rid]);
this[_closed].reject(err);