diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 01dd265792..a38af036aa 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -23,12 +23,10 @@ const primordials = globalThis.__bootstrap.primordials; const { ArrayBufferPrototype, ArrayBufferIsView, - ArrayBufferPrototypeGetByteLength, ArrayPrototypeJoin, ArrayPrototypeMap, ArrayPrototypeSome, DataView, - DataViewPrototypeGetByteLength, ErrorPrototypeToString, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, @@ -46,7 +44,6 @@ const { PromisePrototypeCatch, SymbolFor, TypedArrayPrototypeGetByteLength, - TypedArrayPrototypeGetSymbolToStringTag, } = primordials; const op_ws_check_permission_and_cancel_handle = core.ops.op_ws_check_permission_and_cancel_handle; @@ -57,6 +54,7 @@ const { op_ws_send_text, op_ws_next_event, op_ws_send_ping, + op_ws_get_buffered_amount, } = core.ensureFastOps(); webidl.converters["sequence or DOMString"] = ( @@ -111,7 +109,6 @@ const _role = Symbol("[[role]]"); const _extensions = Symbol("[[extensions]]"); const _protocol = Symbol("[[protocol]]"); const _binaryType = Symbol("[[binaryType]]"); -const _bufferedAmount = Symbol("[[bufferedAmount]]"); const _eventLoop = Symbol("[[eventLoop]]"); const _server = Symbol("[[server]]"); @@ -179,10 +176,13 @@ class WebSocket extends EventTarget { } } - [_bufferedAmount] = 0; get bufferedAmount() { webidl.assertBranded(this, WebSocketPrototype); - return this[_bufferedAmount]; + if (this[_readyState] === OPEN) { + return op_ws_get_buffered_amount(this[_rid]); + } else { + return 0; + } } constructor(url, protocols = []) { @@ -318,55 +318,25 @@ class WebSocket extends EventTarget { throw new DOMException("readyState not OPEN", "InvalidStateError"); } - /** - * @param {ArrayBufferView} view - * @param {number} byteLength - */ - const sendTypedArray = (view, byteLength) => { - this[_bufferedAmount] += byteLength; - PromisePrototypeThen( - op_ws_send_binary( - this[_rid], - view, - ), - () => { - this[_bufferedAmount] -= byteLength; - }, - ); - }; - if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { PromisePrototypeThen( // deno-lint-ignore prefer-primordials data.slice().arrayBuffer(), (ab) => - sendTypedArray( + op_ws_send_binary( + this[_rid], new DataView(ab), - ArrayBufferPrototypeGetByteLength(ab), ), ); } else if (ArrayBufferIsView(data)) { - if (TypedArrayPrototypeGetSymbolToStringTag(data) === undefined) { - // DataView - sendTypedArray(data, DataViewPrototypeGetByteLength(data)); - } else { - // TypedArray - sendTypedArray(data, TypedArrayPrototypeGetByteLength(data)); - } + op_ws_send_binary(this[_rid], data); } else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { - sendTypedArray(data, ArrayBufferPrototypeGetByteLength(data)); + op_ws_send_binary(this[_rid], data); } else { const string = String(data); - const d = core.encode(string); - this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d); - PromisePrototypeThen( - op_ws_send_text( - this[_rid], - string, - ), - () => { - this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d); - }, + op_ws_send_text( + this[_rid], + string, ); } } diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 00d5bdaecf..be1001eb60 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -34,8 +34,8 @@ const { Uint8ArrayPrototype, } = primordials; const { - op_ws_send_text, - op_ws_send_binary, + op_ws_send_text_async, + op_ws_send_binary_async, op_ws_next_event, op_ws_create, op_ws_close, @@ -210,11 +210,11 @@ class WebSocketStream { const writable = new WritableStream({ write: async (chunk) => { if (typeof chunk === "string") { - await op_ws_send_text(this[_rid], chunk); + await op_ws_send_text_async(this[_rid], chunk); } else if ( ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk) ) { - await op_ws_send_binary(this[_rid], chunk); + await op_ws_send_binary_async(this[_rid], chunk); } else { throw new TypeError( "A chunk may only be either a string or an Uint8Array", diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index f2101b413f..af987c1e4b 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -281,8 +281,10 @@ where } let resource = ServerWebSocket { + buffered: Cell::new(0), + errored: Cell::new(None), ws: AsyncRefCell::new(FragmentCollector::new(stream)), - closed: Rc::new(Cell::new(false)), + closed: Cell::new(false), tx_lock: AsyncRefCell::new(()), }; let mut state = state.borrow_mut(); @@ -315,18 +317,20 @@ pub enum MessageKind { } pub struct ServerWebSocket { + buffered: Cell, + errored: Cell>, ws: AsyncRefCell>, - closed: Rc>, + closed: Cell, tx_lock: AsyncRefCell<()>, } impl ServerWebSocket { #[inline] pub async fn write_frame( - self: Rc, + self: &Rc, frame: Frame, ) -> Result<(), AnyError> { - let _lock = RcRef::map(&self, |r| &r.tx_lock).borrow_mut().await; + let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await; // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket // to populate the write buffer. We encounter an await point when writing // to the socket after the frame has already been written to the buffer. @@ -361,8 +365,10 @@ pub fn ws_create_server_stream( ws.set_auto_pong(true); let ws_resource = ServerWebSocket { + buffered: Cell::new(0), + errored: Cell::new(None), ws: AsyncRefCell::new(FragmentCollector::new(ws)), - closed: Rc::new(Cell::new(false)), + closed: Cell::new(false), tx_lock: AsyncRefCell::new(()), }; @@ -370,8 +376,48 @@ pub fn ws_create_server_stream( Ok(rid) } -#[op] -pub async fn op_ws_send_binary( +#[op(fast)] +pub fn op_ws_send_binary( + state: &mut OpState, + rid: ResourceId, + data: ZeroCopyBuf, +) { + let resource = state.resource_table.get::(rid).unwrap(); + let data = data.to_vec(); + let len = data.len(); + resource.buffered.set(resource.buffered.get() + len); + deno_core::task::spawn(async move { + if let Err(err) = resource + .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .await + { + resource.errored.set(Some(err)); + } else { + resource.buffered.set(resource.buffered.get() - len); + } + }); +} + +#[op(fast)] +pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) { + let resource = state.resource_table.get::(rid).unwrap(); + let len = data.len(); + resource.buffered.set(resource.buffered.get() + len); + deno_core::task::spawn(async move { + if let Err(err) = resource + .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .await + { + resource.errored.set(Some(err)); + } else { + resource.buffered.set(resource.buffered.get() - len); + } + }); +} + +/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure. +#[op(fast)] +pub async fn op_ws_send_binary_async( state: Rc>, rid: ResourceId, data: ZeroCopyBuf, @@ -380,13 +426,15 @@ pub async fn op_ws_send_binary( .borrow_mut() .resource_table .get::(rid)?; + let data = data.to_vec(); resource - .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) + .write_frame(Frame::new(true, OpCode::Binary, None, data)) .await } -#[op] -pub async fn op_ws_send_text( +/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure. +#[op(fast)] +pub async fn op_ws_send_text_async( state: Rc>, rid: ResourceId, data: String, @@ -400,6 +448,16 @@ pub async fn op_ws_send_text( .await } +#[op(fast)] +pub fn op_ws_get_buffered_amount(state: &mut OpState, rid: ResourceId) -> u32 { + state + .resource_table + .get::(rid) + .unwrap() + .buffered + .get() as u32 +} + #[op] pub async fn op_ws_send_pong( state: Rc>, @@ -441,8 +499,7 @@ pub async fn op_ws_close( .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) .unwrap_or_else(|| Frame::close_raw(vec![])); - let cell = Rc::clone(&resource.closed); - cell.set(true); + resource.closed.set(true); resource.write_frame(frame).await?; Ok(()) } @@ -457,6 +514,10 @@ pub async fn op_ws_next_event( .resource_table .get::(rid)?; + if let Some(err) = resource.errored.take() { + return Err(err); + } + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; loop { let val = match ws.read_frame().await { @@ -519,8 +580,11 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, + op_ws_send_binary_async, + op_ws_send_text_async, op_ws_send_ping, op_ws_send_pong, + op_ws_get_buffered_amount, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = {