From 1976504c632c78aaadbf24dc94e8ce5626bce9f1 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 20 Apr 2023 21:54:22 +0530 Subject: [PATCH] refactor(ext/websocket): use fastwebsockets client (#18725) --- Cargo.lock | 9 +- ext/http/lib.rs | 39 +-- ext/websocket/01_websocket.js | 16 +- ext/websocket/02_websocketstream.js | 2 +- ext/websocket/Cargo.toml | 2 +- ext/websocket/lib.rs | 372 +++++++++++++--------------- ext/websocket/server.rs | 194 --------------- 7 files changed, 183 insertions(+), 451 deletions(-) delete mode 100644 ext/websocket/server.rs diff --git a/Cargo.lock b/Cargo.lock index f301625015..29f02d5c42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1819,11 +1819,16 @@ dependencies = [ [[package]] name = "fastwebsockets" -version = "0.1.3" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d57e99c3fa6d0e1c6aeb84f4c904b26425128215fd318a251d8e785e373d43b6" +checksum = "99a248d92ac4e9048a30d147d7897eaaadd0a5230f11982ab7d6935d7d268902" dependencies = [ + "base64 0.21.0", "cc", + "hyper", + "pin-project", + "rand", + "sha1", "simdutf8", "tokio", "utf-8", diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 289e7bf0f9..43e3c130aa 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1129,41 +1129,6 @@ async fn op_http_upgrade_early( Ok(rid) } -struct UpgradedStream(hyper::upgrade::Upgraded); -impl tokio::io::AsyncRead for UpgradedStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut tokio::io::ReadBuf, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) - } -} - -impl tokio::io::AsyncWrite for UpgradedStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) - } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) - } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_shutdown(cx) - } -} - -impl deno_websocket::Upgraded for UpgradedStream {} - #[op] async fn op_http_upgrade_websocket( state: Rc>, @@ -1183,9 +1148,7 @@ async fn op_http_upgrade_websocket( }; let transport = hyper::upgrade::on(request).await?; - let ws_rid = - ws_create_server_stream(&state, Box::pin(UpgradedStream(transport))) - .await?; + let ws_rid = ws_create_server_stream(&state, transport).await?; Ok(ws_rid) } diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 2c6bf46b27..60378b6758 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -317,9 +317,7 @@ class WebSocket extends EventTarget { this[_bufferedAmount] += byteLength; PromisePrototypeThen( core.opAsync2( - this[_role] === SERVER - ? "op_server_ws_send_binary" - : "op_ws_send_binary", + "op_ws_send_binary", this[_rid], view, ), @@ -357,7 +355,7 @@ class WebSocket extends EventTarget { this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d); PromisePrototypeThen( core.opAsync2( - this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text", + "op_ws_send_text", this[_rid], string, ), @@ -416,7 +414,7 @@ class WebSocket extends EventTarget { PromisePrototypeCatch( core.opAsync( - this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", + "op_ws_close", this[_rid], code, reason, @@ -441,7 +439,7 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { const { 0: kind, 1: value } = await core.opAsync2( - this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event", + "op_ws_next_event", this[_rid], ); @@ -508,7 +506,7 @@ class WebSocket extends EventTarget { if (prevState === OPEN) { try { await core.opAsync( - this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", + "op_ws_close", this[_rid], code, value, @@ -537,7 +535,7 @@ class WebSocket extends EventTarget { this[_idleTimeoutTimeout] = setTimeout(async () => { if (this[_readyState] === OPEN) { await core.opAsync( - this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send", + "op_ws_send", this[_rid], { kind: "ping", @@ -548,7 +546,7 @@ class WebSocket extends EventTarget { this[_readyState] = CLOSING; const reason = "No response from ping frame."; await core.opAsync( - this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", + "op_ws_close", this[_rid], 1001, reason, diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 0ee7a70aa0..0d01e62eea 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -176,7 +176,7 @@ class WebSocketStream { create.rid, ); - if (kind > 6) { + if (kind > 5) { /* close */ break; } diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index 2f5ed95b30..03cb3076a6 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -16,7 +16,7 @@ path = "lib.rs" [dependencies] deno_core.workspace = true deno_tls.workspace = true -fastwebsockets = "0.1.3" +fastwebsockets = { version = "0.2.1", features = ["upgrade"] } http.workspace = true hyper.workspace = true serde.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 71f176070a..f63191a8ef 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -3,10 +3,6 @@ use deno_core::error::invalid_hostname; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::stream::SplitSink; -use deno_core::futures::stream::SplitStream; -use deno_core::futures::SinkExt; -use deno_core::futures::StreamExt; use deno_core::op; use deno_core::StringOrBuffer; @@ -21,42 +17,41 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use deno_tls::create_client_config; +use http::header::CONNECTION; +use http::header::UPGRADE; use http::HeaderName; use http::HeaderValue; use http::Method; use http::Request; use http::Uri; +use hyper::upgrade::Upgraded; +use hyper::Body; +use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; +use std::cell::Cell; use std::cell::RefCell; use std::convert::TryFrom; use std::fmt; use std::path::PathBuf; -use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::rustls::ServerName; use tokio_rustls::TlsConnector; -use tokio_tungstenite::client_async_with_config; -use tokio_tungstenite::tungstenite::handshake::client::Response; -use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; -use tokio_tungstenite::tungstenite::protocol::CloseFrame; -use tokio_tungstenite::tungstenite::protocol::Message; -use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::WebSocketStream; + +use fastwebsockets::CloseCode; +use fastwebsockets::FragmentCollector; +use fastwebsockets::Frame; +use fastwebsockets::OpCode; +use fastwebsockets::Role; +use fastwebsockets::WebSocket; pub use tokio_tungstenite; // Re-export tokio_tungstenite -mod server; - -pub use server::ws_create_server_stream; - #[derive(Clone)] pub struct WsRootStore(pub Option); #[derive(Clone)] @@ -76,100 +71,6 @@ pub trait WebSocketPermissions { /// would override previously used alias. pub struct UnsafelyIgnoreCertificateErrors(Option>); -type ClientWsStream = WebSocketStream>; -type ServerWsStream = WebSocketStream>>; - -pub enum WebSocketStreamType { - Client { - tx: AsyncRefCell>, - rx: AsyncRefCell>, - }, - Server { - tx: AsyncRefCell>, - rx: AsyncRefCell>, - }, -} - -pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {} - -pub struct WsStreamResource { - pub stream: WebSocketStreamType, - // When a `WsStreamResource` resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures are attached to this cancel handle. - pub cancel: CancelHandle, -} - -impl WsStreamResource { - async fn send(self: &Rc, message: Message) -> Result<(), AnyError> { - use tokio_tungstenite::tungstenite::Error; - let res = match self.stream { - WebSocketStreamType::Client { .. } => { - let mut tx = RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { tx, .. } => tx, - WebSocketStreamType::Server { .. } => unreachable!(), - }) - .borrow_mut() - .await; - tx.send(message).await - } - WebSocketStreamType::Server { .. } => { - let mut tx = RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { .. } => unreachable!(), - WebSocketStreamType::Server { tx, .. } => tx, - }) - .borrow_mut() - .await; - tx.send(message).await - } - }; - - match res { - Ok(()) => Ok(()), - Err(Error::ConnectionClosed) => Ok(()), - Err(tokio_tungstenite::tungstenite::Error::Protocol( - tokio_tungstenite::tungstenite::error::ProtocolError::SendAfterClosing, - )) => Ok(()), - Err(err) => Err(err.into()), - } - } - - async fn next_message( - self: &Rc, - cancel: RcRef, - ) -> Result< - Option>, - AnyError, - > { - match &self.stream { - WebSocketStreamType::Client { .. } => { - let mut rx = RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { rx, .. } => rx, - WebSocketStreamType::Server { .. } => unreachable!(), - }) - .borrow_mut() - .await; - rx.next().or_cancel(cancel).await.map_err(AnyError::from) - } - WebSocketStreamType::Server { .. } => { - let mut rx = RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { .. } => unreachable!(), - WebSocketStreamType::Server { rx, .. } => rx, - }) - .borrow_mut() - .await; - rx.next().or_cancel(cancel).await.map_err(AnyError::from) - } - } - } -} - -impl Resource for WsStreamResource { - fn name(&self) -> Cow { - "webSocketStream".into() - } -} - pub struct WsCancelResource(Rc); impl Resource for WsCancelResource { @@ -182,6 +83,15 @@ impl Resource for WsCancelResource { } } +#[derive(Deserialize)] +#[serde(tag = "kind", content = "value", rename_all = "camelCase")] +pub enum SendValue { + Text(String), + Binary(ZeroCopyBuf), + Pong, + Ping, +} + // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. @@ -257,7 +167,21 @@ where let uri: Uri = url.parse()?; let mut request = Request::builder().method(Method::GET).uri(&uri); - request = request.header("User-Agent", user_agent); + let authority = uri.authority().unwrap().as_str(); + let host = authority + .find('@') + .map(|idx| authority.split_at(idx + 1).1) + .unwrap_or_else(|| authority); + request = request + .header("User-Agent", user_agent) + .header("Host", host) + .header(UPGRADE, "websocket") + .header(CONNECTION, "upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13"); if !protocols.is_empty() { request = request.header("Sec-WebSocket-Protocol", protocols); @@ -287,7 +211,7 @@ where } } - let request = request.body(())?; + let request = request.body(Body::empty())?; let domain = &uri.host().unwrap().to_string(); let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { Some("wss") => 443, @@ -315,16 +239,9 @@ where _ => unreachable!(), }; - let client = client_async_with_config( - request, - socket, - Some(WebSocketConfig { - max_message_size: Some(128 << 20), - max_frame_size: Some(32 << 20), - ..Default::default() - }), - ); - let (stream, response): (ClientWsStream, Response) = + let client = fastwebsockets::handshake::client(request, socket); + + let (stream, response): (WebSocket, Response) = if let Some(cancel_resource) = cancel_resource { client.or_cancel(cancel_resource.0.to_owned()).await? } else { @@ -340,13 +257,9 @@ where state.borrow_mut().resource_table.close(cancel_rid).ok(); } - let (ws_tx, ws_rx) = stream.split(); - let resource = WsStreamResource { - stream: WebSocketStreamType::Client { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - }, - cancel: Default::default(), + let resource = ServerWebSocket { + ws: AsyncRefCell::new(FragmentCollector::new(stream)), + closed: Rc::new(Cell::new(false)), }; let mut state = state.borrow_mut(); let rid = state.resource_table.add(resource); @@ -368,13 +281,60 @@ where }) } -#[derive(Deserialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum SendValue { - Text(String), - Binary(ZeroCopyBuf), - Pong, - Ping, +#[repr(u16)] +pub enum MessageKind { + Text = 0, + Binary = 1, + Pong = 2, + Ping = 3, + Error = 5, + Closed = 6, +} + +pub struct ServerWebSocket { + ws: AsyncRefCell>, + closed: Rc>, +} + +impl ServerWebSocket { + #[inline] + pub async fn write_frame( + self: Rc, + frame: Frame, + ) -> Result<(), AnyError> { + // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket + // to populate the write buffer. We encounter an await point when writing + // to the socket after the frame has already been written to the buffer. + let ws = unsafe { &mut *self.ws.as_ptr() }; + ws.write_frame(frame) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) + } +} + +impl Resource for ServerWebSocket { + fn name(&self) -> Cow { + "serverWebSocket".into() + } +} +pub async fn ws_create_server_stream( + state: &Rc>, + transport: Upgraded, +) -> Result { + let mut ws = WebSocket::after_handshake(transport, Role::Server); + ws.set_writev(true); + ws.set_auto_close(true); + ws.set_auto_pong(true); + + let ws_resource = ServerWebSocket { + ws: AsyncRefCell::new(FragmentCollector::new(ws)), + closed: Rc::new(Cell::new(false)), + }; + + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add(ws_resource); + Ok(rid) } #[op] @@ -386,9 +346,10 @@ pub async fn op_ws_send_binary( let resource = state .borrow_mut() .resource_table - .get::(rid)?; - resource.send(Message::Binary(data.to_vec())).await?; - Ok(()) + .get::(rid)?; + resource + .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) + .await } #[op] @@ -400,9 +361,10 @@ pub async fn op_ws_send_text( let resource = state .borrow_mut() .resource_table - .get::(rid)?; - resource.send(Message::Text(data)).await?; - Ok(()) + .get::(rid)?; + resource + .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .await } #[op] @@ -412,18 +374,21 @@ pub async fn op_ws_send( value: SendValue, ) -> Result<(), AnyError> { let msg = match value { - SendValue::Text(text) => Message::Text(text), - SendValue::Binary(buf) => Message::Binary(buf.to_vec()), - SendValue::Pong => Message::Pong(vec![]), - SendValue::Ping => Message::Ping(vec![]), + SendValue::Text(text) => { + Frame::new(true, OpCode::Text, None, text.into_bytes()) + } + SendValue::Binary(buf) => { + Frame::new(true, OpCode::Binary, None, buf.to_vec()) + } + SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]), + SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]), }; let resource = state .borrow_mut() .resource_table - .get::(rid)?; - resource.send(msg).await?; - Ok(()) + .get::(rid)?; + resource.write_frame(msg).await } #[op(deferred)] @@ -433,34 +398,21 @@ pub async fn op_ws_close( code: Option, reason: Option, ) -> Result<(), AnyError> { - let rid = rid; - let msg = Message::Close(code.map(|c| CloseFrame { - code: CloseCode::from(c), - reason: match reason { - Some(reason) => Cow::from(reason), - None => Default::default(), - }, - })); - let resource = state .borrow_mut() .resource_table - .get::(rid)?; - resource.send(msg).await?; + .get::(rid)?; + let frame = reason + .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) + .unwrap_or_else(|| Frame::close_raw(vec![])); + + let cell = Rc::clone(&resource.closed); + cell.set(true); + resource.write_frame(frame).await?; Ok(()) } -#[repr(u16)] -pub enum MessageKind { - Text = 0, - Binary = 1, - Pong = 2, - Ping = 3, - Error = 5, - Closed = 6, -} - -#[op] +#[op(deferred)] pub async fn op_ws_next_event( state: Rc>, rid: ResourceId, @@ -468,45 +420,58 @@ pub async fn op_ws_next_event( let resource = state .borrow_mut() .resource_table - .get::(rid)?; + .get::(rid)?; - let cancel = RcRef::map(&resource, |r| &r.cancel); - let val = resource.next_message(cancel).await?; - let res = match val { - Some(Ok(Message::Text(text))) => { - (MessageKind::Text as u16, StringOrBuffer::String(text)) + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + let val = match ws.read_frame().await { + Ok(val) => val, + Err(err) => { + // No message was received, socket closed while we waited. + // Try close the stream, ignoring any errors, and report closed status to JavaScript. + if resource.closed.get() { + let _ = state.borrow_mut().resource_table.close(rid); + return Ok(( + MessageKind::Closed as u16, + StringOrBuffer::Buffer(vec![].into()), + )); + } + + return Ok(( + MessageKind::Error as u16, + StringOrBuffer::String(err.to_string()), + )); } - Some(Ok(Message::Binary(data))) => ( + }; + + let res = match val.opcode { + OpCode::Text => ( + MessageKind::Text as u16, + StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), + ), + OpCode::Binary => ( MessageKind::Binary as u16, - StringOrBuffer::Buffer(data.into()), + StringOrBuffer::Buffer(val.payload.into()), ), - Some(Ok(Message::Close(Some(frame)))) => ( - frame.code.into(), - StringOrBuffer::String(frame.reason.to_string()), - ), - Some(Ok(Message::Close(None))) => { - (1005, StringOrBuffer::String("".to_string())) + OpCode::Close => { + if val.payload.len() < 2 { + return Ok((1005, StringOrBuffer::String("".to_string()))); + } + + let close_code = + CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); + let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); + (close_code.into(), StringOrBuffer::String(reason)) } - Some(Ok(Message::Ping(_))) => ( + OpCode::Ping => ( MessageKind::Ping as u16, StringOrBuffer::Buffer(vec![].into()), ), - Some(Ok(Message::Pong(_))) => ( + OpCode::Pong => ( MessageKind::Pong as u16, StringOrBuffer::Buffer(vec![].into()), ), - Some(Err(e)) => ( - MessageKind::Error as u16, - StringOrBuffer::String(e.to_string()), - ), - None => { - // No message was received, presumably the socket closed while we waited. - // Try close the stream, ignoring any errors, and report closed status to JavaScript. - let _ = state.borrow_mut().resource_table.close(rid); - ( - MessageKind::Closed as u16, - StringOrBuffer::Buffer(vec![].into()), - ) + OpCode::Continuation => { + return Err(type_error("Unexpected continuation frame")) } }; Ok(res) @@ -523,11 +488,6 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, - server::op_server_ws_send, - server::op_server_ws_close, - server::op_server_ws_next_event, - server::op_server_ws_send_binary, - server::op_server_ws_send_text, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs deleted file mode 100644 index 44bc07e59b..0000000000 --- a/ext/websocket/server.rs +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use crate::MessageKind; -use crate::SendValue; -use crate::Upgraded; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::op; -use deno_core::AsyncRefCell; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::StringOrBuffer; -use deno_core::ZeroCopyBuf; -use std::borrow::Cow; -use std::cell::RefCell; -use std::pin::Pin; -use std::rc::Rc; - -use fastwebsockets::CloseCode; -use fastwebsockets::FragmentCollector; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::WebSocket; - -pub struct ServerWebSocket { - ws: AsyncRefCell>>>, -} - -impl ServerWebSocket { - #[inline] - pub async fn write_frame( - self: Rc, - frame: Frame, - ) -> Result<(), AnyError> { - // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket - // to populate the write buffer. We encounter an await point when writing - // to the socket after the frame has already been written to the buffer. - let ws = unsafe { &mut *self.ws.as_ptr() }; - ws.write_frame(frame) - .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) - } -} - -impl Resource for ServerWebSocket { - fn name(&self) -> Cow { - "serverWebSocket".into() - } -} -pub async fn ws_create_server_stream( - state: &Rc>, - transport: Pin>, -) -> Result { - let mut ws = WebSocket::after_handshake(transport); - ws.set_writev(false); - ws.set_auto_close(true); - ws.set_auto_pong(true); - - let ws_resource = ServerWebSocket { - ws: AsyncRefCell::new(FragmentCollector::new(ws)), - }; - - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); - Ok(rid) -} - -#[op] -pub async fn op_server_ws_send_binary( - state: Rc>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - resource - .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) - .await -} - -#[op] -pub async fn op_server_ws_send_text( - state: Rc>, - rid: ResourceId, - data: String, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) - .await -} - -#[op] -pub async fn op_server_ws_send( - state: Rc>, - rid: ResourceId, - value: SendValue, -) -> Result<(), AnyError> { - let msg = match value { - SendValue::Text(text) => { - Frame::new(true, OpCode::Text, None, text.into_bytes()) - } - SendValue::Binary(buf) => { - Frame::new(true, OpCode::Binary, None, buf.to_vec()) - } - SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]), - SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]), - }; - - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - resource.write_frame(msg).await -} - -#[op(deferred)] -pub async fn op_server_ws_close( - state: Rc>, - rid: ResourceId, - code: Option, - reason: Option, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - let frame = reason - .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) - .unwrap_or_else(|| Frame::close_raw(vec![])); - resource.write_frame(frame).await -} - -#[op(deferred)] -pub async fn op_server_ws_next_event( - state: Rc>, - rid: ResourceId, -) -> Result<(u16, StringOrBuffer), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - let val = match ws.read_frame().await { - Ok(val) => val, - Err(err) => { - return Ok(( - MessageKind::Error as u16, - StringOrBuffer::String(err.to_string()), - )) - } - }; - - let res = match val.opcode { - OpCode::Text => ( - MessageKind::Text as u16, - StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), - ), - OpCode::Binary => ( - MessageKind::Binary as u16, - StringOrBuffer::Buffer(val.payload.into()), - ), - OpCode::Close => { - if val.payload.len() < 2 { - return Ok((1005, StringOrBuffer::String("".to_string()))); - } - - let close_code = - CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); - let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); - (close_code.into(), StringOrBuffer::String(reason)) - } - OpCode::Ping => ( - MessageKind::Ping as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Pong => ( - MessageKind::Pong as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Continuation => { - return Err(type_error("Unexpected continuation frame")) - } - }; - Ok(res) -}