0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

fix: [ws] sock shouldn't throw eof error when failed to read frame (#4083)

This commit is contained in:
Yusuke Sakurai 2020-02-24 01:59:36 +09:00 committed by GitHub
parent e9fff02e96
commit d9886a44d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 56 deletions

View file

@ -12,7 +12,8 @@ import {
assertEquals, assertEquals,
assertNotEquals, assertNotEquals,
assertThrowsAsync, assertThrowsAsync,
AssertionError AssertionError,
assertNotEOF
} from "../testing/asserts.ts"; } from "../testing/asserts.ts";
import { import {
Response, Response,
@ -32,11 +33,6 @@ import {
import { delay, deferred } from "../util/async.ts"; import { delay, deferred } from "../util/async.ts";
import { StringReader } from "../io/readers.ts"; import { StringReader } from "../io/readers.ts";
function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
assertNotEquals(val, Deno.EOF);
return val as T;
}
interface ResponseTest { interface ResponseTest {
response: Response; response: Response;
raw: string; raw: string;

View file

@ -8,8 +8,8 @@ type Reader = Deno.Reader;
import { import {
assert, assert,
assertEquals, assertEquals,
assertNotEquals, fail,
fail assertNotEOF
} from "../testing/asserts.ts"; } from "../testing/asserts.ts";
import { import {
BufReader, BufReader,
@ -24,11 +24,6 @@ import { charCode, copyBytes, stringsReader } from "./util.ts";
const encoder = new TextEncoder(); const encoder = new TextEncoder();
function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
assertNotEquals(val, Deno.EOF);
return val as T;
}
async function readBytes(buf: BufReader): Promise<string> { async function readBytes(buf: BufReader): Promise<string> {
const b = new Uint8Array(1000); const b = new Uint8Array(1000);
let nb = 0; let nb = 0;

View file

@ -378,3 +378,8 @@ export function unimplemented(msg?: string): never {
export function unreachable(): never { export function unreachable(): never {
throw new AssertionError("unreachable"); throw new AssertionError("unreachable");
} }
export function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
assertNotEquals(val, Deno.EOF);
return val as T;
}

View file

@ -9,16 +9,11 @@ import { stringsReader } from "../io/util.ts";
import { import {
assert, assert,
assertEquals, assertEquals,
assertNotEquals, assertThrows,
assertThrows assertNotEOF
} from "../testing/asserts.ts"; } from "../testing/asserts.ts";
const { test } = Deno; const { test } = Deno;
function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
assertNotEquals(val, Deno.EOF);
return val as T;
}
function reader(s: string): TextProtoReader { function reader(s: string): TextProtoReader {
return new TextProtoReader(new BufReader(stringsReader(s))); return new TextProtoReader(new BufReader(stringsReader(s)));
} }

View file

@ -2,15 +2,15 @@
import { decode, encode } from "../strings/mod.ts"; import { decode, encode } from "../strings/mod.ts";
import { hasOwnProperty } from "../util/has_own_property.ts"; import { hasOwnProperty } from "../util/has_own_property.ts";
type Conn = Deno.Conn;
type Writer = Deno.Writer;
import { BufReader, BufWriter, UnexpectedEOFError } from "../io/bufio.ts"; import { BufReader, BufWriter, UnexpectedEOFError } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts"; import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts"; import { writeResponse } from "../http/server.ts";
import { TextProtoReader } from "../textproto/mod.ts"; import { TextProtoReader } from "../textproto/mod.ts";
import { Deferred, deferred } from "../util/async.ts"; import { Deferred, deferred } from "../util/async.ts";
import { assertNotEOF } from "../testing/asserts.ts";
import Conn = Deno.Conn;
import Writer = Deno.Writer;
export enum OpCode { export enum OpCode {
Continue = 0x0, Continue = 0x0,
@ -24,8 +24,8 @@ export enum OpCode {
export type WebSocketEvent = export type WebSocketEvent =
| string | string
| Uint8Array | Uint8Array
| WebSocketCloseEvent | WebSocketCloseEvent // Received after closing connection finished.
| WebSocketPingEvent | WebSocketPingEvent // Received after pong frame responded.
| WebSocketPongEvent; | WebSocketPongEvent;
export interface WebSocketCloseEvent { export interface WebSocketCloseEvent {
@ -71,7 +71,11 @@ export function append(a: Uint8Array, b: Uint8Array): Uint8Array {
return output; return output;
} }
export class SocketClosedError extends Error {} export class SocketClosedError extends Error {
constructor(msg = "Socket has already been closed") {
super(msg);
}
}
export interface WebSocketFrame { export interface WebSocketFrame {
isLastFrame: boolean; isLastFrame: boolean;
@ -86,11 +90,26 @@ export interface WebSocket {
receive(): AsyncIterableIterator<WebSocketEvent>; receive(): AsyncIterableIterator<WebSocketEvent>;
/**
* @throws SocketClosedError
*/
send(data: WebSocketMessage): Promise<void>; send(data: WebSocketMessage): Promise<void>;
/**
* @param data
* @throws SocketClosedError
*/
ping(data?: WebSocketMessage): Promise<void>; ping(data?: WebSocketMessage): Promise<void>;
/** Close connection after sending close frame to peer.
* This is canonical way of disconnection but it may hang because of peer's response delay.
* @throws SocketClosedError
*/
close(code: number, reason?: string): Promise<void>; close(code: number, reason?: string): Promise<void>;
/** Close connection forcely without sending close frame to peer.
* This is basically undesirable way of disconnection. Use carefully. */
closeForce(): void;
} }
/** Unmask masked websocket payload */ /** Unmask masked websocket payload */
@ -141,10 +160,12 @@ export async function writeFrame(
await w.flush(); await w.flush();
} }
/** Read websocket frame from given BufReader */ /** Read websocket frame from given BufReader
* @throws UnexpectedEOFError When peer closed connection without close frame
* @throws Error Frame is invalid
*/
export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
let b = await buf.readByte(); let b = assertNotEOF(await buf.readByte());
if (b === Deno.EOF) throw new UnexpectedEOFError();
let isLastFrame = false; let isLastFrame = false;
switch (b >>> 4) { switch (b >>> 4) {
case 0b1000: case 0b1000:
@ -158,28 +179,25 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
} }
const opcode = b & 0x0f; const opcode = b & 0x0f;
// has_mask & payload // has_mask & payload
b = await buf.readByte(); b = assertNotEOF(await buf.readByte());
if (b === Deno.EOF) throw new UnexpectedEOFError();
const hasMask = b >>> 7; const hasMask = b >>> 7;
let payloadLength = b & 0b01111111; let payloadLength = b & 0b01111111;
if (payloadLength === 126) { if (payloadLength === 126) {
const l = await readShort(buf); const l = assertNotEOF(await readShort(buf));
if (l === Deno.EOF) throw new UnexpectedEOFError();
payloadLength = l; payloadLength = l;
} else if (payloadLength === 127) { } else if (payloadLength === 127) {
const l = await readLong(buf); const l = assertNotEOF(await readLong(buf));
if (l === Deno.EOF) throw new UnexpectedEOFError();
payloadLength = Number(l); payloadLength = Number(l);
} }
// mask // mask
let mask; let mask: Uint8Array | undefined;
if (hasMask) { if (hasMask) {
mask = new Uint8Array(4); mask = new Uint8Array(4);
await buf.readFull(mask); assertNotEOF(await buf.readFull(mask));
} }
// payload // payload
const payload = new Uint8Array(payloadLength); const payload = new Uint8Array(payloadLength);
await buf.readFull(payload); assertNotEOF(await buf.readFull(payload));
return { return {
isLastFrame, isLastFrame,
opcode, opcode,
@ -223,8 +241,14 @@ class WebSocketImpl implements WebSocket {
async *receive(): AsyncIterableIterator<WebSocketEvent> { async *receive(): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = []; let frames: WebSocketFrame[] = [];
let payloadsLength = 0; let payloadsLength = 0;
while (true) { while (!this._isClosed) {
const frame = await readFrame(this.bufReader); let frame: WebSocketFrame;
try {
frame = await readFrame(this.bufReader);
} catch (e) {
this.ensureSocketClosed();
break;
}
unmask(frame.payload, frame.mask); unmask(frame.payload, frame.mask);
switch (frame.opcode) { switch (frame.opcode) {
case OpCode.TextFrame: case OpCode.TextFrame:
@ -276,11 +300,13 @@ class WebSocketImpl implements WebSocket {
} }
private dequeue(): void { private dequeue(): void {
const [e] = this.sendQueue; const [entry] = this.sendQueue;
if (!e) return; if (!entry) return;
writeFrame(e.frame, this.bufWriter) if (this._isClosed) return;
.then(() => e.d.resolve()) const { d, frame } = entry;
.catch(e => e.d.reject(e)) writeFrame(frame, this.bufWriter)
.then(() => d.resolve())
.catch(e => d.reject(e))
.finally(() => { .finally(() => {
this.sendQueue.shift(); this.sendQueue.shift();
this.dequeue(); this.dequeue();
@ -288,6 +314,9 @@ class WebSocketImpl implements WebSocket {
} }
private enqueue(frame: WebSocketFrame): Promise<void> { private enqueue(frame: WebSocketFrame): Promise<void> {
if (this._isClosed) {
throw new SocketClosedError();
}
const d = deferred<void>(); const d = deferred<void>();
this.sendQueue.push({ d, frame }); this.sendQueue.push({ d, frame });
if (this.sendQueue.length === 1) { if (this.sendQueue.length === 1) {
@ -297,9 +326,6 @@ class WebSocketImpl implements WebSocket {
} }
async send(data: WebSocketMessage): Promise<void> { async send(data: WebSocketMessage): Promise<void> {
if (this.isClosed) {
throw new SocketClosedError("socket has been closed");
}
const opcode = const opcode =
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
const payload = typeof data === "string" ? encode(data) : data; const payload = typeof data === "string" ? encode(data) : data;
@ -354,16 +380,21 @@ class WebSocketImpl implements WebSocket {
} }
} }
closeForce(): void {
this.ensureSocketClosed();
}
private ensureSocketClosed(): void { private ensureSocketClosed(): void {
if (this.isClosed) { if (this.isClosed) return;
return;
}
try { try {
this.conn.close(); this.conn.close();
} catch (e) { } catch (e) {
console.error(e); console.error(e);
} finally { } finally {
this._isClosed = true; this._isClosed = true;
const rest = this.sendQueue;
this.sendQueue = [];
rest.forEach(e => e.d.reject(new SocketClosedError()));
} }
} }
} }

View file

@ -13,13 +13,14 @@ import {
readFrame, readFrame,
unmask, unmask,
writeFrame, writeFrame,
createWebSocket createWebSocket,
SocketClosedError
} from "./mod.ts"; } from "./mod.ts";
import { encode, decode } from "../strings/mod.ts"; import { encode, decode } from "../strings/mod.ts";
type Writer = Deno.Writer; import Writer = Deno.Writer;
type Reader = Deno.Reader; import Reader = Deno.Reader;
type Conn = Deno.Conn; import Conn = Deno.Conn;
const { Buffer } = Deno; import Buffer = Deno.Buffer;
test(async function wsReadUnmaskedTextFrame(): Promise<void> { test(async function wsReadUnmaskedTextFrame(): Promise<void> {
// unmasked single text frame with payload "Hello" // unmasked single text frame with payload "Hello"
@ -326,3 +327,60 @@ test("WebSocket.send(), WebSocket.ping() should be exclusive", async (): Promise
assertEquals(third.opcode, OpCode.BinaryFrame); assertEquals(third.opcode, OpCode.BinaryFrame);
assertEquals(bytes.equal(third.payload, new Uint8Array([3])), true); assertEquals(bytes.equal(third.payload, new Uint8Array([3])), true);
}); });
test("WebSocket should throw SocketClosedError when peer closed connection without close frame", async () => {
const buf = new Buffer();
const eofReader: Deno.Reader = {
async read(_: Uint8Array): Promise<number | Deno.EOF> {
return Deno.EOF;
}
};
const conn = dummyConn(eofReader, buf);
const sock = createWebSocket({ conn });
sock.closeForce();
await assertThrowsAsync(() => sock.send("hello"), SocketClosedError);
await assertThrowsAsync(() => sock.ping(), SocketClosedError);
await assertThrowsAsync(() => sock.close(0), SocketClosedError);
});
test("WebSocket shouldn't throw UnexpectedEOFError on recive()", async () => {
const buf = new Buffer();
const eofReader: Deno.Reader = {
async read(_: Uint8Array): Promise<number | Deno.EOF> {
return Deno.EOF;
}
};
const conn = dummyConn(eofReader, buf);
const sock = createWebSocket({ conn });
const it = sock.receive();
const { value, done } = await it.next();
assertEquals(value, undefined);
assertEquals(done, true);
});
test("WebSocket should reject sending promise when connection reset forcely", async () => {
const buf = new Buffer();
let timer: number | undefined;
const lazyWriter: Deno.Writer = {
async write(_: Uint8Array): Promise<number> {
return new Promise(resolve => {
timer = setTimeout(() => resolve(0), 1000);
});
}
};
const conn = dummyConn(buf, lazyWriter);
const sock = createWebSocket({ conn });
const onError = (e: unknown): unknown => e;
const p = Promise.all([
sock.send("hello").catch(onError),
sock.send(new Uint8Array([1, 2])).catch(onError),
sock.ping().catch(onError)
]);
sock.closeForce();
assertEquals(sock.isClosed, true);
const [a, b, c] = await p;
assert(a instanceof SocketClosedError);
assert(b instanceof SocketClosedError);
assert(c instanceof SocketClosedError);
clearTimeout(timer);
});