mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
refactor(ext/websocket): use fastwebsockets client (#18725)
This commit is contained in:
parent
f520284081
commit
1976504c63
7 changed files with 183 additions and 451 deletions
9
Cargo.lock
generated
9
Cargo.lock
generated
|
@ -1819,11 +1819,16 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fastwebsockets"
|
name = "fastwebsockets"
|
||||||
version = "0.1.3"
|
version = "0.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d57e99c3fa6d0e1c6aeb84f4c904b26425128215fd318a251d8e785e373d43b6"
|
checksum = "99a248d92ac4e9048a30d147d7897eaaadd0a5230f11982ab7d6935d7d268902"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"base64 0.21.0",
|
||||||
"cc",
|
"cc",
|
||||||
|
"hyper",
|
||||||
|
"pin-project",
|
||||||
|
"rand",
|
||||||
|
"sha1",
|
||||||
"simdutf8",
|
"simdutf8",
|
||||||
"tokio",
|
"tokio",
|
||||||
"utf-8",
|
"utf-8",
|
||||||
|
|
|
@ -1129,41 +1129,6 @@ async fn op_http_upgrade_early(
|
||||||
Ok(rid)
|
Ok(rid)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct UpgradedStream(hyper::upgrade::Upgraded);
|
|
||||||
impl tokio::io::AsyncRead for UpgradedStream {
|
|
||||||
fn poll_read(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
buf: &mut tokio::io::ReadBuf,
|
|
||||||
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
|
|
||||||
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl tokio::io::AsyncWrite for UpgradedStream {
|
|
||||||
fn poll_write(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
buf: &[u8],
|
|
||||||
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
|
||||||
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
fn poll_flush(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
||||||
Pin::new(&mut self.get_mut().0).poll_flush(cx)
|
|
||||||
}
|
|
||||||
fn poll_shutdown(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
||||||
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl deno_websocket::Upgraded for UpgradedStream {}
|
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
async fn op_http_upgrade_websocket(
|
async fn op_http_upgrade_websocket(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
@ -1183,9 +1148,7 @@ async fn op_http_upgrade_websocket(
|
||||||
};
|
};
|
||||||
|
|
||||||
let transport = hyper::upgrade::on(request).await?;
|
let transport = hyper::upgrade::on(request).await?;
|
||||||
let ws_rid =
|
let ws_rid = ws_create_server_stream(&state, transport).await?;
|
||||||
ws_create_server_stream(&state, Box::pin(UpgradedStream(transport)))
|
|
||||||
.await?;
|
|
||||||
Ok(ws_rid)
|
Ok(ws_rid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -317,9 +317,7 @@ class WebSocket extends EventTarget {
|
||||||
this[_bufferedAmount] += byteLength;
|
this[_bufferedAmount] += byteLength;
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
core.opAsync2(
|
core.opAsync2(
|
||||||
this[_role] === SERVER
|
"op_ws_send_binary",
|
||||||
? "op_server_ws_send_binary"
|
|
||||||
: "op_ws_send_binary",
|
|
||||||
this[_rid],
|
this[_rid],
|
||||||
view,
|
view,
|
||||||
),
|
),
|
||||||
|
@ -357,7 +355,7 @@ class WebSocket extends EventTarget {
|
||||||
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
|
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
core.opAsync2(
|
core.opAsync2(
|
||||||
this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text",
|
"op_ws_send_text",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
string,
|
string,
|
||||||
),
|
),
|
||||||
|
@ -416,7 +414,7 @@ class WebSocket extends EventTarget {
|
||||||
|
|
||||||
PromisePrototypeCatch(
|
PromisePrototypeCatch(
|
||||||
core.opAsync(
|
core.opAsync(
|
||||||
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
"op_ws_close",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
code,
|
code,
|
||||||
reason,
|
reason,
|
||||||
|
@ -441,7 +439,7 @@ class WebSocket extends EventTarget {
|
||||||
async [_eventLoop]() {
|
async [_eventLoop]() {
|
||||||
while (this[_readyState] !== CLOSED) {
|
while (this[_readyState] !== CLOSED) {
|
||||||
const { 0: kind, 1: value } = await core.opAsync2(
|
const { 0: kind, 1: value } = await core.opAsync2(
|
||||||
this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event",
|
"op_ws_next_event",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -508,7 +506,7 @@ class WebSocket extends EventTarget {
|
||||||
if (prevState === OPEN) {
|
if (prevState === OPEN) {
|
||||||
try {
|
try {
|
||||||
await core.opAsync(
|
await core.opAsync(
|
||||||
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
"op_ws_close",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
code,
|
code,
|
||||||
value,
|
value,
|
||||||
|
@ -537,7 +535,7 @@ class WebSocket extends EventTarget {
|
||||||
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
||||||
if (this[_readyState] === OPEN) {
|
if (this[_readyState] === OPEN) {
|
||||||
await core.opAsync(
|
await core.opAsync(
|
||||||
this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send",
|
"op_ws_send",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
{
|
{
|
||||||
kind: "ping",
|
kind: "ping",
|
||||||
|
@ -548,7 +546,7 @@ class WebSocket extends EventTarget {
|
||||||
this[_readyState] = CLOSING;
|
this[_readyState] = CLOSING;
|
||||||
const reason = "No response from ping frame.";
|
const reason = "No response from ping frame.";
|
||||||
await core.opAsync(
|
await core.opAsync(
|
||||||
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
"op_ws_close",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
1001,
|
1001,
|
||||||
reason,
|
reason,
|
||||||
|
|
|
@ -176,7 +176,7 @@ class WebSocketStream {
|
||||||
create.rid,
|
create.rid,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (kind > 6) {
|
if (kind > 5) {
|
||||||
/* close */
|
/* close */
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ path = "lib.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
deno_core.workspace = true
|
deno_core.workspace = true
|
||||||
deno_tls.workspace = true
|
deno_tls.workspace = true
|
||||||
fastwebsockets = "0.1.3"
|
fastwebsockets = { version = "0.2.1", features = ["upgrade"] }
|
||||||
http.workspace = true
|
http.workspace = true
|
||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
|
|
@ -3,10 +3,6 @@
|
||||||
use deno_core::error::invalid_hostname;
|
use deno_core::error::invalid_hostname;
|
||||||
use deno_core::error::type_error;
|
use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures::stream::SplitSink;
|
|
||||||
use deno_core::futures::stream::SplitStream;
|
|
||||||
use deno_core::futures::SinkExt;
|
|
||||||
use deno_core::futures::StreamExt;
|
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
use deno_core::StringOrBuffer;
|
use deno_core::StringOrBuffer;
|
||||||
|
|
||||||
|
@ -21,42 +17,41 @@ use deno_core::Resource;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
use deno_core::ZeroCopyBuf;
|
use deno_core::ZeroCopyBuf;
|
||||||
use deno_tls::create_client_config;
|
use deno_tls::create_client_config;
|
||||||
|
use http::header::CONNECTION;
|
||||||
|
use http::header::UPGRADE;
|
||||||
use http::HeaderName;
|
use http::HeaderName;
|
||||||
use http::HeaderValue;
|
use http::HeaderValue;
|
||||||
use http::Method;
|
use http::Method;
|
||||||
use http::Request;
|
use http::Request;
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
|
use hyper::upgrade::Upgraded;
|
||||||
|
use hyper::Body;
|
||||||
|
use hyper::Response;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::cell::Cell;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::AsyncRead;
|
|
||||||
use tokio::io::AsyncWrite;
|
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio_rustls::rustls::RootCertStore;
|
use tokio_rustls::rustls::RootCertStore;
|
||||||
use tokio_rustls::rustls::ServerName;
|
use tokio_rustls::rustls::ServerName;
|
||||||
use tokio_rustls::TlsConnector;
|
use tokio_rustls::TlsConnector;
|
||||||
use tokio_tungstenite::client_async_with_config;
|
|
||||||
use tokio_tungstenite::tungstenite::handshake::client::Response;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
|
|
||||||
use tokio_tungstenite::MaybeTlsStream;
|
use tokio_tungstenite::MaybeTlsStream;
|
||||||
use tokio_tungstenite::WebSocketStream;
|
|
||||||
|
use fastwebsockets::CloseCode;
|
||||||
|
use fastwebsockets::FragmentCollector;
|
||||||
|
use fastwebsockets::Frame;
|
||||||
|
use fastwebsockets::OpCode;
|
||||||
|
use fastwebsockets::Role;
|
||||||
|
use fastwebsockets::WebSocket;
|
||||||
|
|
||||||
pub use tokio_tungstenite; // Re-export tokio_tungstenite
|
pub use tokio_tungstenite; // Re-export tokio_tungstenite
|
||||||
|
|
||||||
mod server;
|
|
||||||
|
|
||||||
pub use server::ws_create_server_stream;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct WsRootStore(pub Option<RootCertStore>);
|
pub struct WsRootStore(pub Option<RootCertStore>);
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -76,100 +71,6 @@ pub trait WebSocketPermissions {
|
||||||
/// would override previously used alias.
|
/// would override previously used alias.
|
||||||
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
|
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
|
||||||
|
|
||||||
type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
|
||||||
type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>;
|
|
||||||
|
|
||||||
pub enum WebSocketStreamType {
|
|
||||||
Client {
|
|
||||||
tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>,
|
|
||||||
rx: AsyncRefCell<SplitStream<ClientWsStream>>,
|
|
||||||
},
|
|
||||||
Server {
|
|
||||||
tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>,
|
|
||||||
rx: AsyncRefCell<SplitStream<ServerWsStream>>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
|
|
||||||
|
|
||||||
pub struct WsStreamResource {
|
|
||||||
pub stream: WebSocketStreamType,
|
|
||||||
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
|
|
||||||
// canceled, while 'write' ops are allowed to complete. Therefore only
|
|
||||||
// 'read' futures are attached to this cancel handle.
|
|
||||||
pub cancel: CancelHandle,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WsStreamResource {
|
|
||||||
async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> {
|
|
||||||
use tokio_tungstenite::tungstenite::Error;
|
|
||||||
let res = match self.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => {
|
|
||||||
let mut tx = RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { tx, .. } => tx,
|
|
||||||
WebSocketStreamType::Server { .. } => unreachable!(),
|
|
||||||
})
|
|
||||||
.borrow_mut()
|
|
||||||
.await;
|
|
||||||
tx.send(message).await
|
|
||||||
}
|
|
||||||
WebSocketStreamType::Server { .. } => {
|
|
||||||
let mut tx = RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => unreachable!(),
|
|
||||||
WebSocketStreamType::Server { tx, .. } => tx,
|
|
||||||
})
|
|
||||||
.borrow_mut()
|
|
||||||
.await;
|
|
||||||
tx.send(message).await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(Error::ConnectionClosed) => Ok(()),
|
|
||||||
Err(tokio_tungstenite::tungstenite::Error::Protocol(
|
|
||||||
tokio_tungstenite::tungstenite::error::ProtocolError::SendAfterClosing,
|
|
||||||
)) => Ok(()),
|
|
||||||
Err(err) => Err(err.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn next_message(
|
|
||||||
self: &Rc<Self>,
|
|
||||||
cancel: RcRef<CancelHandle>,
|
|
||||||
) -> Result<
|
|
||||||
Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
|
||||||
AnyError,
|
|
||||||
> {
|
|
||||||
match &self.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => {
|
|
||||||
let mut rx = RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { rx, .. } => rx,
|
|
||||||
WebSocketStreamType::Server { .. } => unreachable!(),
|
|
||||||
})
|
|
||||||
.borrow_mut()
|
|
||||||
.await;
|
|
||||||
rx.next().or_cancel(cancel).await.map_err(AnyError::from)
|
|
||||||
}
|
|
||||||
WebSocketStreamType::Server { .. } => {
|
|
||||||
let mut rx = RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => unreachable!(),
|
|
||||||
WebSocketStreamType::Server { rx, .. } => rx,
|
|
||||||
})
|
|
||||||
.borrow_mut()
|
|
||||||
.await;
|
|
||||||
rx.next().or_cancel(cancel).await.map_err(AnyError::from)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Resource for WsStreamResource {
|
|
||||||
fn name(&self) -> Cow<str> {
|
|
||||||
"webSocketStream".into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WsCancelResource(Rc<CancelHandle>);
|
pub struct WsCancelResource(Rc<CancelHandle>);
|
||||||
|
|
||||||
impl Resource for WsCancelResource {
|
impl Resource for WsCancelResource {
|
||||||
|
@ -182,6 +83,15 @@ impl Resource for WsCancelResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
|
||||||
|
pub enum SendValue {
|
||||||
|
Text(String),
|
||||||
|
Binary(ZeroCopyBuf),
|
||||||
|
Pong,
|
||||||
|
Ping,
|
||||||
|
}
|
||||||
|
|
||||||
// This op is needed because creating a WS instance in JavaScript is a sync
|
// This op is needed because creating a WS instance in JavaScript is a sync
|
||||||
// operation and should throw error when permissions are not fulfilled,
|
// operation and should throw error when permissions are not fulfilled,
|
||||||
// but actual op that connects WS is async.
|
// but actual op that connects WS is async.
|
||||||
|
@ -257,7 +167,21 @@ where
|
||||||
let uri: Uri = url.parse()?;
|
let uri: Uri = url.parse()?;
|
||||||
let mut request = Request::builder().method(Method::GET).uri(&uri);
|
let mut request = Request::builder().method(Method::GET).uri(&uri);
|
||||||
|
|
||||||
request = request.header("User-Agent", user_agent);
|
let authority = uri.authority().unwrap().as_str();
|
||||||
|
let host = authority
|
||||||
|
.find('@')
|
||||||
|
.map(|idx| authority.split_at(idx + 1).1)
|
||||||
|
.unwrap_or_else(|| authority);
|
||||||
|
request = request
|
||||||
|
.header("User-Agent", user_agent)
|
||||||
|
.header("Host", host)
|
||||||
|
.header(UPGRADE, "websocket")
|
||||||
|
.header(CONNECTION, "upgrade")
|
||||||
|
.header(
|
||||||
|
"Sec-WebSocket-Key",
|
||||||
|
fastwebsockets::handshake::generate_key(),
|
||||||
|
)
|
||||||
|
.header("Sec-WebSocket-Version", "13");
|
||||||
|
|
||||||
if !protocols.is_empty() {
|
if !protocols.is_empty() {
|
||||||
request = request.header("Sec-WebSocket-Protocol", protocols);
|
request = request.header("Sec-WebSocket-Protocol", protocols);
|
||||||
|
@ -287,7 +211,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let request = request.body(())?;
|
let request = request.body(Body::empty())?;
|
||||||
let domain = &uri.host().unwrap().to_string();
|
let domain = &uri.host().unwrap().to_string();
|
||||||
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
|
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
|
||||||
Some("wss") => 443,
|
Some("wss") => 443,
|
||||||
|
@ -315,16 +239,9 @@ where
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = client_async_with_config(
|
let client = fastwebsockets::handshake::client(request, socket);
|
||||||
request,
|
|
||||||
socket,
|
let (stream, response): (WebSocket<Upgraded>, Response<Body>) =
|
||||||
Some(WebSocketConfig {
|
|
||||||
max_message_size: Some(128 << 20),
|
|
||||||
max_frame_size: Some(32 << 20),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
let (stream, response): (ClientWsStream, Response) =
|
|
||||||
if let Some(cancel_resource) = cancel_resource {
|
if let Some(cancel_resource) = cancel_resource {
|
||||||
client.or_cancel(cancel_resource.0.to_owned()).await?
|
client.or_cancel(cancel_resource.0.to_owned()).await?
|
||||||
} else {
|
} else {
|
||||||
|
@ -340,13 +257,9 @@ where
|
||||||
state.borrow_mut().resource_table.close(cancel_rid).ok();
|
state.borrow_mut().resource_table.close(cancel_rid).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
let (ws_tx, ws_rx) = stream.split();
|
let resource = ServerWebSocket {
|
||||||
let resource = WsStreamResource {
|
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
|
||||||
stream: WebSocketStreamType::Client {
|
closed: Rc::new(Cell::new(false)),
|
||||||
rx: AsyncRefCell::new(ws_rx),
|
|
||||||
tx: AsyncRefCell::new(ws_tx),
|
|
||||||
},
|
|
||||||
cancel: Default::default(),
|
|
||||||
};
|
};
|
||||||
let mut state = state.borrow_mut();
|
let mut state = state.borrow_mut();
|
||||||
let rid = state.resource_table.add(resource);
|
let rid = state.resource_table.add(resource);
|
||||||
|
@ -368,13 +281,60 @@ where
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[repr(u16)]
|
||||||
#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
|
pub enum MessageKind {
|
||||||
pub enum SendValue {
|
Text = 0,
|
||||||
Text(String),
|
Binary = 1,
|
||||||
Binary(ZeroCopyBuf),
|
Pong = 2,
|
||||||
Pong,
|
Ping = 3,
|
||||||
Ping,
|
Error = 5,
|
||||||
|
Closed = 6,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ServerWebSocket {
|
||||||
|
ws: AsyncRefCell<FragmentCollector<Upgraded>>,
|
||||||
|
closed: Rc<Cell<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServerWebSocket {
|
||||||
|
#[inline]
|
||||||
|
pub async fn write_frame(
|
||||||
|
self: Rc<Self>,
|
||||||
|
frame: Frame,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||||
|
// to populate the write buffer. We encounter an await point when writing
|
||||||
|
// to the socket after the frame has already been written to the buffer.
|
||||||
|
let ws = unsafe { &mut *self.ws.as_ptr() };
|
||||||
|
ws.write_frame(frame)
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for ServerWebSocket {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"serverWebSocket".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn ws_create_server_stream(
|
||||||
|
state: &Rc<RefCell<OpState>>,
|
||||||
|
transport: Upgraded,
|
||||||
|
) -> Result<ResourceId, AnyError> {
|
||||||
|
let mut ws = WebSocket::after_handshake(transport, Role::Server);
|
||||||
|
ws.set_writev(true);
|
||||||
|
ws.set_auto_close(true);
|
||||||
|
ws.set_auto_pong(true);
|
||||||
|
|
||||||
|
let ws_resource = ServerWebSocket {
|
||||||
|
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
||||||
|
closed: Rc::new(Cell::new(false)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let resource_table = &mut state.borrow_mut().resource_table;
|
||||||
|
let rid = resource_table.add(ws_resource);
|
||||||
|
Ok(rid)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -386,9 +346,10 @@ pub async fn op_ws_send_binary(
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<WsStreamResource>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
resource.send(Message::Binary(data.to_vec())).await?;
|
resource
|
||||||
Ok(())
|
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -400,9 +361,10 @@ pub async fn op_ws_send_text(
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<WsStreamResource>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
resource.send(Message::Text(data)).await?;
|
resource
|
||||||
Ok(())
|
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -412,18 +374,21 @@ pub async fn op_ws_send(
|
||||||
value: SendValue,
|
value: SendValue,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let msg = match value {
|
let msg = match value {
|
||||||
SendValue::Text(text) => Message::Text(text),
|
SendValue::Text(text) => {
|
||||||
SendValue::Binary(buf) => Message::Binary(buf.to_vec()),
|
Frame::new(true, OpCode::Text, None, text.into_bytes())
|
||||||
SendValue::Pong => Message::Pong(vec![]),
|
}
|
||||||
SendValue::Ping => Message::Ping(vec![]),
|
SendValue::Binary(buf) => {
|
||||||
|
Frame::new(true, OpCode::Binary, None, buf.to_vec())
|
||||||
|
}
|
||||||
|
SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
|
||||||
|
SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
|
||||||
};
|
};
|
||||||
|
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<WsStreamResource>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
resource.send(msg).await?;
|
resource.write_frame(msg).await
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(deferred)]
|
#[op(deferred)]
|
||||||
|
@ -433,34 +398,21 @@ pub async fn op_ws_close(
|
||||||
code: Option<u16>,
|
code: Option<u16>,
|
||||||
reason: Option<String>,
|
reason: Option<String>,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let rid = rid;
|
|
||||||
let msg = Message::Close(code.map(|c| CloseFrame {
|
|
||||||
code: CloseCode::from(c),
|
|
||||||
reason: match reason {
|
|
||||||
Some(reason) => Cow::from(reason),
|
|
||||||
None => Default::default(),
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
|
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<WsStreamResource>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
resource.send(msg).await?;
|
let frame = reason
|
||||||
|
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
||||||
|
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||||
|
|
||||||
|
let cell = Rc::clone(&resource.closed);
|
||||||
|
cell.set(true);
|
||||||
|
resource.write_frame(frame).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[repr(u16)]
|
#[op(deferred)]
|
||||||
pub enum MessageKind {
|
|
||||||
Text = 0,
|
|
||||||
Binary = 1,
|
|
||||||
Pong = 2,
|
|
||||||
Ping = 3,
|
|
||||||
Error = 5,
|
|
||||||
Closed = 6,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_ws_next_event(
|
pub async fn op_ws_next_event(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
rid: ResourceId,
|
rid: ResourceId,
|
||||||
|
@ -468,45 +420,58 @@ pub async fn op_ws_next_event(
|
||||||
let resource = state
|
let resource = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<WsStreamResource>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
|
||||||
let cancel = RcRef::map(&resource, |r| &r.cancel);
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
let val = resource.next_message(cancel).await?;
|
let val = match ws.read_frame().await {
|
||||||
let res = match val {
|
Ok(val) => val,
|
||||||
Some(Ok(Message::Text(text))) => {
|
Err(err) => {
|
||||||
(MessageKind::Text as u16, StringOrBuffer::String(text))
|
// No message was received, socket closed while we waited.
|
||||||
|
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
|
||||||
|
if resource.closed.get() {
|
||||||
|
let _ = state.borrow_mut().resource_table.close(rid);
|
||||||
|
return Ok((
|
||||||
|
MessageKind::Closed as u16,
|
||||||
|
StringOrBuffer::Buffer(vec![].into()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok((
|
||||||
|
MessageKind::Error as u16,
|
||||||
|
StringOrBuffer::String(err.to_string()),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Binary(data))) => (
|
};
|
||||||
|
|
||||||
|
let res = match val.opcode {
|
||||||
|
OpCode::Text => (
|
||||||
|
MessageKind::Text as u16,
|
||||||
|
StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
|
||||||
|
),
|
||||||
|
OpCode::Binary => (
|
||||||
MessageKind::Binary as u16,
|
MessageKind::Binary as u16,
|
||||||
StringOrBuffer::Buffer(data.into()),
|
StringOrBuffer::Buffer(val.payload.into()),
|
||||||
),
|
),
|
||||||
Some(Ok(Message::Close(Some(frame)))) => (
|
OpCode::Close => {
|
||||||
frame.code.into(),
|
if val.payload.len() < 2 {
|
||||||
StringOrBuffer::String(frame.reason.to_string()),
|
return Ok((1005, StringOrBuffer::String("".to_string())));
|
||||||
),
|
}
|
||||||
Some(Ok(Message::Close(None))) => {
|
|
||||||
(1005, StringOrBuffer::String("".to_string()))
|
let close_code =
|
||||||
|
CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
|
||||||
|
let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
|
||||||
|
(close_code.into(), StringOrBuffer::String(reason))
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Ping(_))) => (
|
OpCode::Ping => (
|
||||||
MessageKind::Ping as u16,
|
MessageKind::Ping as u16,
|
||||||
StringOrBuffer::Buffer(vec![].into()),
|
StringOrBuffer::Buffer(vec![].into()),
|
||||||
),
|
),
|
||||||
Some(Ok(Message::Pong(_))) => (
|
OpCode::Pong => (
|
||||||
MessageKind::Pong as u16,
|
MessageKind::Pong as u16,
|
||||||
StringOrBuffer::Buffer(vec![].into()),
|
StringOrBuffer::Buffer(vec![].into()),
|
||||||
),
|
),
|
||||||
Some(Err(e)) => (
|
OpCode::Continuation => {
|
||||||
MessageKind::Error as u16,
|
return Err(type_error("Unexpected continuation frame"))
|
||||||
StringOrBuffer::String(e.to_string()),
|
|
||||||
),
|
|
||||||
None => {
|
|
||||||
// No message was received, presumably the socket closed while we waited.
|
|
||||||
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
|
|
||||||
let _ = state.borrow_mut().resource_table.close(rid);
|
|
||||||
(
|
|
||||||
MessageKind::Closed as u16,
|
|
||||||
StringOrBuffer::Buffer(vec![].into()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(res)
|
Ok(res)
|
||||||
|
@ -523,11 +488,6 @@ deno_core::extension!(deno_websocket,
|
||||||
op_ws_next_event,
|
op_ws_next_event,
|
||||||
op_ws_send_binary,
|
op_ws_send_binary,
|
||||||
op_ws_send_text,
|
op_ws_send_text,
|
||||||
server::op_server_ws_send,
|
|
||||||
server::op_server_ws_close,
|
|
||||||
server::op_server_ws_next_event,
|
|
||||||
server::op_server_ws_send_binary,
|
|
||||||
server::op_server_ws_send_text,
|
|
||||||
],
|
],
|
||||||
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
||||||
options = {
|
options = {
|
||||||
|
|
|
@ -1,194 +0,0 @@
|
||||||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
|
||||||
|
|
||||||
use crate::MessageKind;
|
|
||||||
use crate::SendValue;
|
|
||||||
use crate::Upgraded;
|
|
||||||
use deno_core::error::type_error;
|
|
||||||
use deno_core::error::AnyError;
|
|
||||||
use deno_core::op;
|
|
||||||
use deno_core::AsyncRefCell;
|
|
||||||
use deno_core::OpState;
|
|
||||||
use deno_core::RcRef;
|
|
||||||
use deno_core::Resource;
|
|
||||||
use deno_core::ResourceId;
|
|
||||||
use deno_core::StringOrBuffer;
|
|
||||||
use deno_core::ZeroCopyBuf;
|
|
||||||
use std::borrow::Cow;
|
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::rc::Rc;
|
|
||||||
|
|
||||||
use fastwebsockets::CloseCode;
|
|
||||||
use fastwebsockets::FragmentCollector;
|
|
||||||
use fastwebsockets::Frame;
|
|
||||||
use fastwebsockets::OpCode;
|
|
||||||
use fastwebsockets::WebSocket;
|
|
||||||
|
|
||||||
pub struct ServerWebSocket {
|
|
||||||
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerWebSocket {
|
|
||||||
#[inline]
|
|
||||||
pub async fn write_frame(
|
|
||||||
self: Rc<Self>,
|
|
||||||
frame: Frame,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
|
||||||
// to populate the write buffer. We encounter an await point when writing
|
|
||||||
// to the socket after the frame has already been written to the buffer.
|
|
||||||
let ws = unsafe { &mut *self.ws.as_ptr() };
|
|
||||||
ws.write_frame(frame)
|
|
||||||
.await
|
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Resource for ServerWebSocket {
|
|
||||||
fn name(&self) -> Cow<str> {
|
|
||||||
"serverWebSocket".into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub async fn ws_create_server_stream(
|
|
||||||
state: &Rc<RefCell<OpState>>,
|
|
||||||
transport: Pin<Box<dyn Upgraded>>,
|
|
||||||
) -> Result<ResourceId, AnyError> {
|
|
||||||
let mut ws = WebSocket::after_handshake(transport);
|
|
||||||
ws.set_writev(false);
|
|
||||||
ws.set_auto_close(true);
|
|
||||||
ws.set_auto_pong(true);
|
|
||||||
|
|
||||||
let ws_resource = ServerWebSocket {
|
|
||||||
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let resource_table = &mut state.borrow_mut().resource_table;
|
|
||||||
let rid = resource_table.add(ws_resource);
|
|
||||||
Ok(rid)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_server_ws_send_binary(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
data: ZeroCopyBuf,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<ServerWebSocket>(rid)?;
|
|
||||||
resource
|
|
||||||
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_server_ws_send_text(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
data: String,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<ServerWebSocket>(rid)?;
|
|
||||||
resource
|
|
||||||
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_server_ws_send(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
value: SendValue,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let msg = match value {
|
|
||||||
SendValue::Text(text) => {
|
|
||||||
Frame::new(true, OpCode::Text, None, text.into_bytes())
|
|
||||||
}
|
|
||||||
SendValue::Binary(buf) => {
|
|
||||||
Frame::new(true, OpCode::Binary, None, buf.to_vec())
|
|
||||||
}
|
|
||||||
SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
|
|
||||||
SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
|
|
||||||
};
|
|
||||||
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<ServerWebSocket>(rid)?;
|
|
||||||
resource.write_frame(msg).await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op(deferred)]
|
|
||||||
pub async fn op_server_ws_close(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
code: Option<u16>,
|
|
||||||
reason: Option<String>,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<ServerWebSocket>(rid)?;
|
|
||||||
let frame = reason
|
|
||||||
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
|
||||||
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
|
||||||
resource.write_frame(frame).await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op(deferred)]
|
|
||||||
pub async fn op_server_ws_next_event(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
) -> Result<(u16, StringOrBuffer), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<ServerWebSocket>(rid)?;
|
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
|
||||||
let val = match ws.read_frame().await {
|
|
||||||
Ok(val) => val,
|
|
||||||
Err(err) => {
|
|
||||||
return Ok((
|
|
||||||
MessageKind::Error as u16,
|
|
||||||
StringOrBuffer::String(err.to_string()),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = match val.opcode {
|
|
||||||
OpCode::Text => (
|
|
||||||
MessageKind::Text as u16,
|
|
||||||
StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
|
|
||||||
),
|
|
||||||
OpCode::Binary => (
|
|
||||||
MessageKind::Binary as u16,
|
|
||||||
StringOrBuffer::Buffer(val.payload.into()),
|
|
||||||
),
|
|
||||||
OpCode::Close => {
|
|
||||||
if val.payload.len() < 2 {
|
|
||||||
return Ok((1005, StringOrBuffer::String("".to_string())));
|
|
||||||
}
|
|
||||||
|
|
||||||
let close_code =
|
|
||||||
CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
|
|
||||||
let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
|
|
||||||
(close_code.into(), StringOrBuffer::String(reason))
|
|
||||||
}
|
|
||||||
OpCode::Ping => (
|
|
||||||
MessageKind::Ping as u16,
|
|
||||||
StringOrBuffer::Buffer(vec![].into()),
|
|
||||||
),
|
|
||||||
OpCode::Pong => (
|
|
||||||
MessageKind::Pong as u16,
|
|
||||||
StringOrBuffer::Buffer(vec![].into()),
|
|
||||||
),
|
|
||||||
OpCode::Continuation => {
|
|
||||||
return Err(type_error("Unexpected continuation frame"))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
Loading…
Add table
Reference in a new issue