mirror of
https://github.com/denoland/deno.git
synced 2025-03-10 06:07:03 -04:00
perf(ext/websocket): Make send sync for non-stream websockets (#19376)
No need to go through the async machinery for `send(String | Buffer)` -- we can fire and forget, and then route any send errors into the async call we're already making (`op_ws_next_event`). Early benchmark on MacOS: Before: 155.8k msg/sec After: 166.2k msg/sec (+6.6%) Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
03ad309ccd
commit
61c65d671b
3 changed files with 93 additions and 59 deletions
|
@ -23,12 +23,10 @@ const primordials = globalThis.__bootstrap.primordials;
|
||||||
const {
|
const {
|
||||||
ArrayBufferPrototype,
|
ArrayBufferPrototype,
|
||||||
ArrayBufferIsView,
|
ArrayBufferIsView,
|
||||||
ArrayBufferPrototypeGetByteLength,
|
|
||||||
ArrayPrototypeJoin,
|
ArrayPrototypeJoin,
|
||||||
ArrayPrototypeMap,
|
ArrayPrototypeMap,
|
||||||
ArrayPrototypeSome,
|
ArrayPrototypeSome,
|
||||||
DataView,
|
DataView,
|
||||||
DataViewPrototypeGetByteLength,
|
|
||||||
ErrorPrototypeToString,
|
ErrorPrototypeToString,
|
||||||
ObjectDefineProperties,
|
ObjectDefineProperties,
|
||||||
ObjectPrototypeIsPrototypeOf,
|
ObjectPrototypeIsPrototypeOf,
|
||||||
|
@ -46,7 +44,6 @@ const {
|
||||||
PromisePrototypeCatch,
|
PromisePrototypeCatch,
|
||||||
SymbolFor,
|
SymbolFor,
|
||||||
TypedArrayPrototypeGetByteLength,
|
TypedArrayPrototypeGetByteLength,
|
||||||
TypedArrayPrototypeGetSymbolToStringTag,
|
|
||||||
} = primordials;
|
} = primordials;
|
||||||
const op_ws_check_permission_and_cancel_handle =
|
const op_ws_check_permission_and_cancel_handle =
|
||||||
core.ops.op_ws_check_permission_and_cancel_handle;
|
core.ops.op_ws_check_permission_and_cancel_handle;
|
||||||
|
@ -57,6 +54,7 @@ const {
|
||||||
op_ws_send_text,
|
op_ws_send_text,
|
||||||
op_ws_next_event,
|
op_ws_next_event,
|
||||||
op_ws_send_ping,
|
op_ws_send_ping,
|
||||||
|
op_ws_get_buffered_amount,
|
||||||
} = core.ensureFastOps();
|
} = core.ensureFastOps();
|
||||||
|
|
||||||
webidl.converters["sequence<DOMString> or DOMString"] = (
|
webidl.converters["sequence<DOMString> or DOMString"] = (
|
||||||
|
@ -111,7 +109,6 @@ const _role = Symbol("[[role]]");
|
||||||
const _extensions = Symbol("[[extensions]]");
|
const _extensions = Symbol("[[extensions]]");
|
||||||
const _protocol = Symbol("[[protocol]]");
|
const _protocol = Symbol("[[protocol]]");
|
||||||
const _binaryType = Symbol("[[binaryType]]");
|
const _binaryType = Symbol("[[binaryType]]");
|
||||||
const _bufferedAmount = Symbol("[[bufferedAmount]]");
|
|
||||||
const _eventLoop = Symbol("[[eventLoop]]");
|
const _eventLoop = Symbol("[[eventLoop]]");
|
||||||
|
|
||||||
const _server = Symbol("[[server]]");
|
const _server = Symbol("[[server]]");
|
||||||
|
@ -179,10 +176,13 @@ class WebSocket extends EventTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[_bufferedAmount] = 0;
|
|
||||||
get bufferedAmount() {
|
get bufferedAmount() {
|
||||||
webidl.assertBranded(this, WebSocketPrototype);
|
webidl.assertBranded(this, WebSocketPrototype);
|
||||||
return this[_bufferedAmount];
|
if (this[_readyState] === OPEN) {
|
||||||
|
return op_ws_get_buffered_amount(this[_rid]);
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
constructor(url, protocols = []) {
|
constructor(url, protocols = []) {
|
||||||
|
@ -318,55 +318,25 @@ class WebSocket extends EventTarget {
|
||||||
throw new DOMException("readyState not OPEN", "InvalidStateError");
|
throw new DOMException("readyState not OPEN", "InvalidStateError");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {ArrayBufferView} view
|
|
||||||
* @param {number} byteLength
|
|
||||||
*/
|
|
||||||
const sendTypedArray = (view, byteLength) => {
|
|
||||||
this[_bufferedAmount] += byteLength;
|
|
||||||
PromisePrototypeThen(
|
|
||||||
op_ws_send_binary(
|
|
||||||
this[_rid],
|
|
||||||
view,
|
|
||||||
),
|
|
||||||
() => {
|
|
||||||
this[_bufferedAmount] -= byteLength;
|
|
||||||
},
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
// deno-lint-ignore prefer-primordials
|
// deno-lint-ignore prefer-primordials
|
||||||
data.slice().arrayBuffer(),
|
data.slice().arrayBuffer(),
|
||||||
(ab) =>
|
(ab) =>
|
||||||
sendTypedArray(
|
op_ws_send_binary(
|
||||||
|
this[_rid],
|
||||||
new DataView(ab),
|
new DataView(ab),
|
||||||
ArrayBufferPrototypeGetByteLength(ab),
|
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
} else if (ArrayBufferIsView(data)) {
|
} else if (ArrayBufferIsView(data)) {
|
||||||
if (TypedArrayPrototypeGetSymbolToStringTag(data) === undefined) {
|
op_ws_send_binary(this[_rid], data);
|
||||||
// DataView
|
|
||||||
sendTypedArray(data, DataViewPrototypeGetByteLength(data));
|
|
||||||
} else {
|
|
||||||
// TypedArray
|
|
||||||
sendTypedArray(data, TypedArrayPrototypeGetByteLength(data));
|
|
||||||
}
|
|
||||||
} else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
|
} else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
|
||||||
sendTypedArray(data, ArrayBufferPrototypeGetByteLength(data));
|
op_ws_send_binary(this[_rid], data);
|
||||||
} else {
|
} else {
|
||||||
const string = String(data);
|
const string = String(data);
|
||||||
const d = core.encode(string);
|
op_ws_send_text(
|
||||||
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
|
this[_rid],
|
||||||
PromisePrototypeThen(
|
string,
|
||||||
op_ws_send_text(
|
|
||||||
this[_rid],
|
|
||||||
string,
|
|
||||||
),
|
|
||||||
() => {
|
|
||||||
this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d);
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,8 @@ const {
|
||||||
Uint8ArrayPrototype,
|
Uint8ArrayPrototype,
|
||||||
} = primordials;
|
} = primordials;
|
||||||
const {
|
const {
|
||||||
op_ws_send_text,
|
op_ws_send_text_async,
|
||||||
op_ws_send_binary,
|
op_ws_send_binary_async,
|
||||||
op_ws_next_event,
|
op_ws_next_event,
|
||||||
op_ws_create,
|
op_ws_create,
|
||||||
op_ws_close,
|
op_ws_close,
|
||||||
|
@ -210,11 +210,11 @@ class WebSocketStream {
|
||||||
const writable = new WritableStream({
|
const writable = new WritableStream({
|
||||||
write: async (chunk) => {
|
write: async (chunk) => {
|
||||||
if (typeof chunk === "string") {
|
if (typeof chunk === "string") {
|
||||||
await op_ws_send_text(this[_rid], chunk);
|
await op_ws_send_text_async(this[_rid], chunk);
|
||||||
} else if (
|
} else if (
|
||||||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)
|
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)
|
||||||
) {
|
) {
|
||||||
await op_ws_send_binary(this[_rid], chunk);
|
await op_ws_send_binary_async(this[_rid], chunk);
|
||||||
} else {
|
} else {
|
||||||
throw new TypeError(
|
throw new TypeError(
|
||||||
"A chunk may only be either a string or an Uint8Array",
|
"A chunk may only be either a string or an Uint8Array",
|
||||||
|
|
|
@ -281,8 +281,10 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
let resource = ServerWebSocket {
|
let resource = ServerWebSocket {
|
||||||
|
buffered: Cell::new(0),
|
||||||
|
errored: Cell::new(None),
|
||||||
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
|
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
|
||||||
closed: Rc::new(Cell::new(false)),
|
closed: Cell::new(false),
|
||||||
tx_lock: AsyncRefCell::new(()),
|
tx_lock: AsyncRefCell::new(()),
|
||||||
};
|
};
|
||||||
let mut state = state.borrow_mut();
|
let mut state = state.borrow_mut();
|
||||||
|
@ -315,18 +317,20 @@ pub enum MessageKind {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ServerWebSocket {
|
pub struct ServerWebSocket {
|
||||||
|
buffered: Cell<usize>,
|
||||||
|
errored: Cell<Option<AnyError>>,
|
||||||
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
|
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
|
||||||
closed: Rc<Cell<bool>>,
|
closed: Cell<bool>,
|
||||||
tx_lock: AsyncRefCell<()>,
|
tx_lock: AsyncRefCell<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerWebSocket {
|
impl ServerWebSocket {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn write_frame(
|
pub async fn write_frame(
|
||||||
self: Rc<Self>,
|
self: &Rc<Self>,
|
||||||
frame: Frame,
|
frame: Frame,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
let _lock = RcRef::map(&self, |r| &r.tx_lock).borrow_mut().await;
|
let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
|
||||||
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||||
// to populate the write buffer. We encounter an await point when writing
|
// to populate the write buffer. We encounter an await point when writing
|
||||||
// to the socket after the frame has already been written to the buffer.
|
// to the socket after the frame has already been written to the buffer.
|
||||||
|
@ -361,8 +365,10 @@ pub fn ws_create_server_stream(
|
||||||
ws.set_auto_pong(true);
|
ws.set_auto_pong(true);
|
||||||
|
|
||||||
let ws_resource = ServerWebSocket {
|
let ws_resource = ServerWebSocket {
|
||||||
|
buffered: Cell::new(0),
|
||||||
|
errored: Cell::new(None),
|
||||||
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
||||||
closed: Rc::new(Cell::new(false)),
|
closed: Cell::new(false),
|
||||||
tx_lock: AsyncRefCell::new(()),
|
tx_lock: AsyncRefCell::new(()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -370,8 +376,48 @@ pub fn ws_create_server_stream(
|
||||||
Ok(rid)
|
Ok(rid)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op(fast)]
|
||||||
pub async fn op_ws_send_binary(
|
pub fn op_ws_send_binary(
|
||||||
|
state: &mut OpState,
|
||||||
|
rid: ResourceId,
|
||||||
|
data: ZeroCopyBuf,
|
||||||
|
) {
|
||||||
|
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
||||||
|
let data = data.to_vec();
|
||||||
|
let len = data.len();
|
||||||
|
resource.buffered.set(resource.buffered.get() + len);
|
||||||
|
deno_core::task::spawn(async move {
|
||||||
|
if let Err(err) = resource
|
||||||
|
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
resource.errored.set(Some(err));
|
||||||
|
} else {
|
||||||
|
resource.buffered.set(resource.buffered.get() - len);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op(fast)]
|
||||||
|
pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
|
||||||
|
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
||||||
|
let len = data.len();
|
||||||
|
resource.buffered.set(resource.buffered.get() + len);
|
||||||
|
deno_core::task::spawn(async move {
|
||||||
|
if let Err(err) = resource
|
||||||
|
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
resource.errored.set(Some(err));
|
||||||
|
} else {
|
||||||
|
resource.buffered.set(resource.buffered.get() - len);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
||||||
|
#[op(fast)]
|
||||||
|
pub async fn op_ws_send_binary_async(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
rid: ResourceId,
|
rid: ResourceId,
|
||||||
data: ZeroCopyBuf,
|
data: ZeroCopyBuf,
|
||||||
|
@ -380,13 +426,15 @@ pub async fn op_ws_send_binary(
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
let data = data.to_vec();
|
||||||
resource
|
resource
|
||||||
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
||||||
pub async fn op_ws_send_text(
|
#[op(fast)]
|
||||||
|
pub async fn op_ws_send_text_async(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
rid: ResourceId,
|
rid: ResourceId,
|
||||||
data: String,
|
data: String,
|
||||||
|
@ -400,6 +448,16 @@ pub async fn op_ws_send_text(
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[op(fast)]
|
||||||
|
pub fn op_ws_get_buffered_amount(state: &mut OpState, rid: ResourceId) -> u32 {
|
||||||
|
state
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)
|
||||||
|
.unwrap()
|
||||||
|
.buffered
|
||||||
|
.get() as u32
|
||||||
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_ws_send_pong(
|
pub async fn op_ws_send_pong(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
@ -441,8 +499,7 @@ pub async fn op_ws_close(
|
||||||
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
||||||
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||||
|
|
||||||
let cell = Rc::clone(&resource.closed);
|
resource.closed.set(true);
|
||||||
cell.set(true);
|
|
||||||
resource.write_frame(frame).await?;
|
resource.write_frame(frame).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -457,6 +514,10 @@ pub async fn op_ws_next_event(
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
|
||||||
|
if let Some(err) = resource.errored.take() {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
loop {
|
loop {
|
||||||
let val = match ws.read_frame().await {
|
let val = match ws.read_frame().await {
|
||||||
|
@ -519,8 +580,11 @@ deno_core::extension!(deno_websocket,
|
||||||
op_ws_next_event,
|
op_ws_next_event,
|
||||||
op_ws_send_binary,
|
op_ws_send_binary,
|
||||||
op_ws_send_text,
|
op_ws_send_text,
|
||||||
|
op_ws_send_binary_async,
|
||||||
|
op_ws_send_text_async,
|
||||||
op_ws_send_ping,
|
op_ws_send_ping,
|
||||||
op_ws_send_pong,
|
op_ws_send_pong,
|
||||||
|
op_ws_get_buffered_amount,
|
||||||
],
|
],
|
||||||
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
||||||
options = {
|
options = {
|
||||||
|
|
Loading…
Add table
Reference in a new issue