From 0cd0a9d5ed873aad9464143ed494facd9a0d8a34 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 31 Mar 2023 10:34:12 +0530 Subject: [PATCH] perf(ext/websocket): efficient event kind serialization (#18509) Use u16 to represent the kind of event (0 - 6) & event code > 6 is treated as the close code. This way we can represent all events + the close code in a single JS number. This is safe because (as per RFC 6455) close code from 0-999 are reserved & not used. | name | avg msg/sec/core | | --- | --- | | deno_main | `127820.750000` | | deno #18506 | `140079.000000` | | deno #18506 + this | `150104.250000` | --- ext/websocket/01_websocket.js | 77 ++++++++++++++++------------- ext/websocket/02_websocketstream.js | 43 ++++++++++------ ext/websocket/lib.rs | 65 ++++++++++++++---------- 3 files changed, 112 insertions(+), 73 deletions(-) diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 06eb08b60d..03a6427c29 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -394,13 +394,14 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { - const { kind, value } = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: { + /* string */ this[_serverHandleIdleTimeout](); const event = new MessageEvent("message", { data: value, @@ -409,14 +410,15 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "binary": { + case 1: { + /* binary */ this[_serverHandleIdleTimeout](); let data; if (this.binaryType === "blob") { data = new Blob([value]); } else { - data = value.buffer; + data = value; } const event = new MessageEvent("message", { @@ -427,39 +429,13 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "pong": { + case 2: { + /* pong */ this[_serverHandleIdleTimeout](); break; } - case "closed": - case "close": { - const prevState = this[_readyState]; - this[_readyState] = CLOSED; - clearTimeout(this[_idleTimeoutTimeout]); - - if (prevState === OPEN) { - try { - await core.opAsync( - "op_ws_close", - this[_rid], - value.code, - value.reason, - ); - } catch { - // ignore failures - } - } - - const event = new CloseEvent("close", { - wasClean: true, - code: value.code, - reason: value.reason, - }); - this.dispatchEvent(event); - core.tryClose(this[_rid]); - break; - } - case "error": { + case 5: { + /* error */ this[_readyState] = CLOSED; const errorEv = new ErrorEvent("error", { @@ -472,6 +448,39 @@ class WebSocket extends EventTarget { core.tryClose(this[_rid]); break; } + case 3: { + /* ping */ + break; + } + default: { + /* close */ + const code = kind; + const prevState = this[_readyState]; + this[_readyState] = CLOSED; + clearTimeout(this[_idleTimeoutTimeout]); + + if (prevState === OPEN) { + try { + await core.opAsync( + "op_ws_close", + this[_rid], + code, + value, + ); + } catch { + // ignore failures + } + } + + const event = new CloseEvent("close", { + wasClean: true, + code: code, + reason: value, + }); + this.dispatchEvent(event); + core.tryClose(this[_rid]); + break; + } } } } diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 0a3aeb1926..6e487f0b7c 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -167,12 +167,13 @@ class WebSocketStream { PromisePrototypeThen( (async () => { while (true) { - const { kind } = await core.opAsync( + const { 0: kind } = await core.opAsync( "op_ws_next_event", create.rid, ); - if (kind === "close") { + if (kind > 6) { + /* close */ break; } } @@ -237,37 +238,51 @@ class WebSocketStream { }, }); const pull = async (controller) => { - const { kind, value } = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: + case 1: { + /* string */ + /* binary */ controller.enqueue(value); break; } - case "binary": { - controller.enqueue(value); + case 5: { + /* error */ + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + core.tryClose(this[_rid]); break; } - case "ping": { + case 3: { + /* ping */ await core.opAsync("op_ws_send", this[_rid], { kind: "pong", }); await pull(controller); break; } - case "closed": - case "close": { - this[_closed].resolve(value); + case 2: { + /* pong */ + break; + } + case 6: { + /* closed */ + this[_closed].resolve(undefined); core.tryClose(this[_rid]); break; } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); + default: { + /* close */ + this[_closed].resolve({ + code: kind, + reason: value, + }); core.tryClose(this[_rid]); break; } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 8d3cb20d22..1c586b383f 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream; use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; use deno_core::op; +use deno_core::StringOrBuffer; use deno_core::url; use deno_core::AsyncRefCell; @@ -475,23 +476,21 @@ pub async fn op_ws_close( Ok(()) } -#[derive(Serialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum NextEventResponse { - String(String), - Binary(ZeroCopyBuf), - Close { code: u16, reason: String }, - Ping, - Pong, - Error(String), - Closed, +#[repr(u16)] +pub enum MessageKind { + Text = 0, + Binary = 1, + Pong = 2, + Ping = 3, + Error = 5, + Closed = 6, } #[op] pub async fn op_ws_next_event( state: Rc>, rid: ResourceId, -) -> Result { +) -> Result<(u16, StringOrBuffer), AnyError> { let resource = state .borrow_mut() .resource_table @@ -500,24 +499,40 @@ pub async fn op_ws_next_event( let cancel = RcRef::map(&resource, |r| &r.cancel); let val = resource.next_message(cancel).await?; let res = match val { - Some(Ok(Message::Text(text))) => NextEventResponse::String(text), - Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), - Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { - code: frame.code.into(), - reason: frame.reason.to_string(), - }, - Some(Ok(Message::Close(None))) => NextEventResponse::Close { - code: 1005, - reason: String::new(), - }, - Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, - Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, - Some(Err(e)) => NextEventResponse::Error(e.to_string()), + Some(Ok(Message::Text(text))) => { + (MessageKind::Text as u16, StringOrBuffer::String(text)) + } + Some(Ok(Message::Binary(data))) => ( + MessageKind::Binary as u16, + StringOrBuffer::Buffer(data.into()), + ), + Some(Ok(Message::Close(Some(frame)))) => ( + frame.code.into(), + StringOrBuffer::String(frame.reason.to_string()), + ), + Some(Ok(Message::Close(None))) => { + (1005, StringOrBuffer::String("".to_string())) + } + Some(Ok(Message::Ping(_))) => ( + MessageKind::Ping as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Ok(Message::Pong(_))) => ( + MessageKind::Pong as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Err(e)) => ( + MessageKind::Error as u16, + StringOrBuffer::String(e.to_string()), + ), None => { // No message was received, presumably the socket closed while we waited. // Try close the stream, ignoring any errors, and report closed status to JavaScript. let _ = state.borrow_mut().resource_table.close(rid); - NextEventResponse::Closed + ( + MessageKind::Closed as u16, + StringOrBuffer::Buffer(vec![].into()), + ) } }; Ok(res)