mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
This reverts commit 36307c45
This commit is contained in:
parent
da23f7f876
commit
638b6ef554
2 changed files with 29 additions and 150 deletions
|
@ -26,6 +26,7 @@
|
||||||
ArrayPrototypeJoin,
|
ArrayPrototypeJoin,
|
||||||
ArrayPrototypeMap,
|
ArrayPrototypeMap,
|
||||||
ArrayPrototypeSome,
|
ArrayPrototypeSome,
|
||||||
|
DataView,
|
||||||
ErrorPrototypeToString,
|
ErrorPrototypeToString,
|
||||||
ObjectDefineProperties,
|
ObjectDefineProperties,
|
||||||
ObjectPrototypeIsPrototypeOf,
|
ObjectPrototypeIsPrototypeOf,
|
||||||
|
@ -34,14 +35,13 @@
|
||||||
Set,
|
Set,
|
||||||
// TODO(lucacasonato): add SharedArrayBuffer to primordials
|
// TODO(lucacasonato): add SharedArrayBuffer to primordials
|
||||||
// SharedArrayBufferPrototype
|
// SharedArrayBufferPrototype
|
||||||
|
String,
|
||||||
StringPrototypeEndsWith,
|
StringPrototypeEndsWith,
|
||||||
StringPrototypeToLowerCase,
|
StringPrototypeToLowerCase,
|
||||||
Symbol,
|
Symbol,
|
||||||
SymbolIterator,
|
SymbolIterator,
|
||||||
PromisePrototypeCatch,
|
PromisePrototypeCatch,
|
||||||
queueMicrotask,
|
|
||||||
SymbolFor,
|
SymbolFor,
|
||||||
Uint8Array,
|
|
||||||
} = window.__bootstrap.primordials;
|
} = window.__bootstrap.primordials;
|
||||||
|
|
||||||
webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => {
|
webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => {
|
||||||
|
@ -300,58 +300,40 @@
|
||||||
throw new DOMException("readyState not OPEN", "InvalidStateError");
|
throw new DOMException("readyState not OPEN", "InvalidStateError");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof data === "string") {
|
|
||||||
// try to send in one go!
|
|
||||||
const d = core.byteLength(data);
|
|
||||||
const sent = ops.op_ws_try_send_string(this[_rid], data);
|
|
||||||
this[_bufferedAmount] += d;
|
|
||||||
if (!sent) {
|
|
||||||
PromisePrototypeThen(
|
|
||||||
core.opAsync("op_ws_send_string", this[_rid], data),
|
|
||||||
() => {
|
|
||||||
this[_bufferedAmount] -= d;
|
|
||||||
},
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// Spec expects data to be start flushing on next tick but oh well...
|
|
||||||
// we already sent it so we can just decrement the bufferedAmount
|
|
||||||
// on the next tick.
|
|
||||||
queueMicrotask(() => {
|
|
||||||
this[_bufferedAmount] -= d;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const sendTypedArray = (ta) => {
|
const sendTypedArray = (ta) => {
|
||||||
// try to send in one go!
|
|
||||||
const sent = ops.op_ws_try_send_binary(this[_rid], ta);
|
|
||||||
this[_bufferedAmount] += ta.byteLength;
|
this[_bufferedAmount] += ta.byteLength;
|
||||||
if (!sent) {
|
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
core.opAsync("op_ws_send_binary", this[_rid], ta),
|
core.opAsync("op_ws_send", this[_rid], {
|
||||||
|
kind: "binary",
|
||||||
|
value: ta,
|
||||||
|
}),
|
||||||
() => {
|
() => {
|
||||||
this[_bufferedAmount] -= ta.byteLength;
|
this[_bufferedAmount] -= ta.byteLength;
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
} else {
|
|
||||||
// Spec expects data to be start flushing on next tick but oh well...
|
|
||||||
// we already sent it so we can just decrement the bufferedAmount
|
|
||||||
// on the next tick.
|
|
||||||
queueMicrotask(() => {
|
|
||||||
this[_bufferedAmount] -= ta.byteLength;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
|
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
||||||
sendTypedArray(new Uint8Array(data));
|
|
||||||
} else if (ArrayBufferIsView(data)) {
|
|
||||||
sendTypedArray(data);
|
|
||||||
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
data.slice().arrayBuffer(),
|
data.slice().arrayBuffer(),
|
||||||
(ab) => sendTypedArray(new Uint8Array(ab)),
|
(ab) => sendTypedArray(new DataView(ab)),
|
||||||
|
);
|
||||||
|
} else if (ArrayBufferIsView(data)) {
|
||||||
|
sendTypedArray(data);
|
||||||
|
} else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
|
||||||
|
sendTypedArray(new DataView(data));
|
||||||
|
} else {
|
||||||
|
const string = String(data);
|
||||||
|
const d = core.encode(string);
|
||||||
|
this[_bufferedAmount] += d.byteLength;
|
||||||
|
PromisePrototypeThen(
|
||||||
|
core.opAsync("op_ws_send", this[_rid], {
|
||||||
|
kind: "text",
|
||||||
|
value: string,
|
||||||
|
}),
|
||||||
|
() => {
|
||||||
|
this[_bufferedAmount] -= d.byteLength;
|
||||||
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,51 +161,6 @@ impl WsStreamResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> {
|
|
||||||
let waker = deno_core::futures::task::noop_waker();
|
|
||||||
let mut cx = std::task::Context::from_waker(&waker);
|
|
||||||
|
|
||||||
let res = match self.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => {
|
|
||||||
match RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { tx, .. } => tx,
|
|
||||||
WebSocketStreamType::Server { .. } => unreachable!(),
|
|
||||||
})
|
|
||||||
.try_borrow_mut()
|
|
||||||
{
|
|
||||||
Some(mut tx) => {
|
|
||||||
if tx.poll_ready_unpin(&mut cx).is_ready() {
|
|
||||||
tx.start_send_unpin(message)?;
|
|
||||||
tx.poll_flush_unpin(&mut cx).is_ready()
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
WebSocketStreamType::Server { .. } => {
|
|
||||||
match RcRef::map(self, |r| match &r.stream {
|
|
||||||
WebSocketStreamType::Client { .. } => unreachable!(),
|
|
||||||
WebSocketStreamType::Server { tx, .. } => tx,
|
|
||||||
})
|
|
||||||
.try_borrow_mut()
|
|
||||||
{
|
|
||||||
Some(mut tx) => {
|
|
||||||
if tx.poll_ready_unpin(&mut cx).is_ready() {
|
|
||||||
tx.start_send_unpin(message)?;
|
|
||||||
tx.poll_flush_unpin(&mut cx).is_ready()
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn next_message(
|
async fn next_message(
|
||||||
self: &Rc<Self>,
|
self: &Rc<Self>,
|
||||||
cancel: RcRef<CancelHandle>,
|
cancel: RcRef<CancelHandle>,
|
||||||
|
@ -471,60 +426,6 @@ pub async fn op_ws_send(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_ws_send_string(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
text: String,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<WsStreamResource>(rid)?;
|
|
||||||
resource.send(Message::Text(text)).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub async fn op_ws_send_binary(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
data: ZeroCopyBuf,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let resource = state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.get::<WsStreamResource>(rid)?;
|
|
||||||
resource.send(Message::Binary(data.to_vec())).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub fn op_ws_try_send_string(
|
|
||||||
state: &mut OpState,
|
|
||||||
rid: ResourceId,
|
|
||||||
text: String,
|
|
||||||
) -> bool {
|
|
||||||
let resource = match state.resource_table.get::<WsStreamResource>(rid) {
|
|
||||||
Ok(resource) => resource,
|
|
||||||
Err(_) => return false,
|
|
||||||
};
|
|
||||||
resource.try_send(Message::Text(text)).is_ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op(fast)]
|
|
||||||
pub fn op_ws_try_send_binary(
|
|
||||||
state: &mut OpState,
|
|
||||||
rid: u32,
|
|
||||||
value: &[u8],
|
|
||||||
) -> bool {
|
|
||||||
let resource = match state.resource_table.get::<WsStreamResource>(rid) {
|
|
||||||
Ok(resource) => resource,
|
|
||||||
Err(_) => return false,
|
|
||||||
};
|
|
||||||
resource.try_send(Message::Binary(value.to_vec())).is_ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op(deferred)]
|
#[op(deferred)]
|
||||||
pub async fn op_ws_close(
|
pub async fn op_ws_close(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
@ -615,10 +516,6 @@ pub fn init<P: WebSocketPermissions + 'static>(
|
||||||
op_ws_send::decl(),
|
op_ws_send::decl(),
|
||||||
op_ws_close::decl(),
|
op_ws_close::decl(),
|
||||||
op_ws_next_event::decl(),
|
op_ws_next_event::decl(),
|
||||||
op_ws_send_string::decl(),
|
|
||||||
op_ws_send_binary::decl(),
|
|
||||||
op_ws_try_send_string::decl(),
|
|
||||||
op_ws_try_send_binary::decl(),
|
|
||||||
])
|
])
|
||||||
.state(move |state| {
|
.state(move |state| {
|
||||||
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));
|
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));
|
||||||
|
|
Loading…
Add table
Reference in a new issue