mirror of
https://github.com/denoland/deno.git
synced 2025-01-21 13:00:36 -05:00
feat: MessageChannel
and MessagePort
(#11051)
This commit introduces support for MessageChannel and MessagePort. MessagePorts can be transfered across other MessagePorts.
This commit is contained in:
parent
a2f939b99c
commit
f9ff981daf
17 changed files with 720 additions and 72 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -781,6 +781,7 @@ dependencies = [
|
|||
"encoding_rs",
|
||||
"futures",
|
||||
"serde",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
|
22
cli/dts/lib.deno.shared_globals.d.ts
vendored
22
cli/dts/lib.deno.shared_globals.d.ts
vendored
|
@ -347,24 +347,6 @@ type BufferSource = ArrayBufferView | ArrayBuffer;
|
|||
|
||||
declare var console: Console;
|
||||
|
||||
interface MessageEventInit<T = any> extends EventInit {
|
||||
data?: T;
|
||||
origin?: string;
|
||||
lastEventId?: string;
|
||||
}
|
||||
|
||||
declare class MessageEvent<T = any> extends Event {
|
||||
/**
|
||||
* Returns the data of the message.
|
||||
*/
|
||||
readonly data: T;
|
||||
/**
|
||||
* Returns the last event ID string, for server-sent events.
|
||||
*/
|
||||
readonly lastEventId: string;
|
||||
constructor(type: string, eventInitDict?: MessageEventInit);
|
||||
}
|
||||
|
||||
interface ErrorEventInit extends EventInit {
|
||||
message?: string;
|
||||
filename?: string;
|
||||
|
@ -382,10 +364,6 @@ declare class ErrorEvent extends Event {
|
|||
constructor(type: string, eventInitDict?: ErrorEventInit);
|
||||
}
|
||||
|
||||
interface PostMessageOptions {
|
||||
transfer?: any[];
|
||||
}
|
||||
|
||||
interface AbstractWorkerEventMap {
|
||||
"error": ErrorEvent;
|
||||
}
|
||||
|
|
33
cli/tests/unit/message_channel_test.ts
Normal file
33
cli/tests/unit/message_channel_test.ts
Normal file
|
@ -0,0 +1,33 @@
|
|||
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||
// NOTE: these are just sometests to test the TypeScript types. Real coverage is
|
||||
// provided by WPT.
|
||||
import {
|
||||
assert,
|
||||
assertEquals,
|
||||
} from "../../../test_util/std/testing/asserts.ts";
|
||||
import { deferred } from "../../../test_util/std/async/deferred.ts";
|
||||
|
||||
Deno.test("messagechannel", async () => {
|
||||
const mc = new MessageChannel();
|
||||
const mc2 = new MessageChannel();
|
||||
assert(mc.port1);
|
||||
assert(mc.port2);
|
||||
|
||||
const promise = deferred();
|
||||
|
||||
mc.port2.onmessage = (e) => {
|
||||
assertEquals(e.data, "hello");
|
||||
assertEquals(e.ports.length, 1);
|
||||
assert(e.ports[0] instanceof MessagePort);
|
||||
e.ports[0].close();
|
||||
promise.resolve();
|
||||
};
|
||||
|
||||
mc.port1.postMessage("hello", [mc2.port1]);
|
||||
mc.port1.close();
|
||||
|
||||
await promise;
|
||||
|
||||
mc.port2.close();
|
||||
mc2.port2.close();
|
||||
});
|
|
@ -1129,6 +1129,7 @@
|
|||
});
|
||||
|
||||
this.data = eventInitDict?.data ?? null;
|
||||
this.ports = eventInitDict?.ports ?? [];
|
||||
this.origin = eventInitDict?.origin ?? "";
|
||||
this.lastEventId = eventInitDict?.lastEventId ?? "";
|
||||
}
|
||||
|
@ -1196,6 +1197,46 @@
|
|||
}
|
||||
}
|
||||
|
||||
const _eventHandlers = Symbol("eventHandlers");
|
||||
|
||||
function makeWrappedHandler(handler) {
|
||||
function wrappedHandler(...args) {
|
||||
if (typeof wrappedHandler.handler !== "function") {
|
||||
return;
|
||||
}
|
||||
return wrappedHandler.handler.call(this, ...args);
|
||||
}
|
||||
wrappedHandler.handler = handler;
|
||||
return wrappedHandler;
|
||||
}
|
||||
|
||||
// TODO(benjamingr) reuse this here and websocket where possible
|
||||
function defineEventHandler(emitter, name, init) {
|
||||
// HTML specification section 8.1.5.1
|
||||
Object.defineProperty(emitter, `on${name}`, {
|
||||
get() {
|
||||
return this[_eventHandlers]?.get(name)?.handler;
|
||||
},
|
||||
set(value) {
|
||||
if (!this[_eventHandlers]) {
|
||||
this[_eventHandlers] = new Map();
|
||||
}
|
||||
let handlerWrapper = this[_eventHandlers]?.get(name);
|
||||
if (handlerWrapper) {
|
||||
console.log("foo");
|
||||
handlerWrapper.handler = value;
|
||||
} else {
|
||||
handlerWrapper = makeWrappedHandler(value);
|
||||
this.addEventListener(name, handlerWrapper);
|
||||
init?.(this);
|
||||
}
|
||||
this[_eventHandlers].set(name, handlerWrapper);
|
||||
},
|
||||
configurable: true,
|
||||
enumerable: true,
|
||||
});
|
||||
}
|
||||
|
||||
window.Event = Event;
|
||||
window.EventTarget = EventTarget;
|
||||
window.ErrorEvent = ErrorEvent;
|
||||
|
@ -1213,5 +1254,6 @@
|
|||
window.__bootstrap.event = {
|
||||
setIsTrusted,
|
||||
setTarget,
|
||||
defineEventHandler,
|
||||
};
|
||||
})(this);
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
((window) => {
|
||||
const webidl = window.__bootstrap.webidl;
|
||||
const { setIsTrusted } = window.__bootstrap.event;
|
||||
const { setIsTrusted, defineEventHandler } = window.__bootstrap.event;
|
||||
|
||||
const add = Symbol("add");
|
||||
const signalAbort = Symbol("signalAbort");
|
||||
|
@ -81,43 +81,6 @@
|
|||
|
||||
webidl.configurePrototype(AbortController);
|
||||
|
||||
const handlerSymbol = Symbol("eventHandlers");
|
||||
|
||||
function makeWrappedHandler(handler) {
|
||||
function wrappedHandler(...args) {
|
||||
if (typeof wrappedHandler.handler !== "function") {
|
||||
return;
|
||||
}
|
||||
return wrappedHandler.handler.call(this, ...args);
|
||||
}
|
||||
wrappedHandler.handler = handler;
|
||||
return wrappedHandler;
|
||||
}
|
||||
// TODO(benjamingr) reuse this here and websocket where possible
|
||||
function defineEventHandler(emitter, name) {
|
||||
// HTML specification section 8.1.5.1
|
||||
Object.defineProperty(emitter, `on${name}`, {
|
||||
get() {
|
||||
return this[handlerSymbol]?.get(name)?.handler;
|
||||
},
|
||||
set(value) {
|
||||
if (!this[handlerSymbol]) {
|
||||
this[handlerSymbol] = new Map();
|
||||
}
|
||||
let handlerWrapper = this[handlerSymbol]?.get(name);
|
||||
if (handlerWrapper) {
|
||||
handlerWrapper.handler = value;
|
||||
} else {
|
||||
handlerWrapper = makeWrappedHandler(value);
|
||||
this.addEventListener(name, handlerWrapper);
|
||||
}
|
||||
this[handlerSymbol].set(name, handlerWrapper);
|
||||
},
|
||||
configurable: true,
|
||||
enumerable: true,
|
||||
});
|
||||
}
|
||||
|
||||
webidl.converters["AbortSignal"] = webidl.createInterfaceConverter(
|
||||
"AbortSignal",
|
||||
AbortSignal,
|
||||
|
|
253
extensions/web/13_message_port.js
Normal file
253
extensions/web/13_message_port.js
Normal file
|
@ -0,0 +1,253 @@
|
|||
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
// @ts-check
|
||||
/// <reference path="../../core/lib.deno_core.d.ts" />
|
||||
/// <reference path="../webidl/internal.d.ts" />
|
||||
/// <reference path="./internal.d.ts" />
|
||||
/// <reference path="./lib.deno_web.d.ts" />
|
||||
|
||||
"use strict";
|
||||
|
||||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const webidl = window.__bootstrap.webidl;
|
||||
const { setEventTargetData } = window.__bootstrap.eventTarget;
|
||||
const { defineEventHandler } = window.__bootstrap.event;
|
||||
|
||||
class MessageChannel {
|
||||
/** @type {MessagePort} */
|
||||
#port1;
|
||||
/** @type {MessagePort} */
|
||||
#port2;
|
||||
|
||||
constructor() {
|
||||
this[webidl.brand] = webidl.brand;
|
||||
const [port1Id, port2Id] = opCreateEntangledMessagePort();
|
||||
const port1 = createMessagePort(port1Id);
|
||||
const port2 = createMessagePort(port2Id);
|
||||
this.#port1 = port1;
|
||||
this.#port2 = port2;
|
||||
}
|
||||
|
||||
get port1() {
|
||||
webidl.assertBranded(this, MessageChannel);
|
||||
return this.#port1;
|
||||
}
|
||||
|
||||
get port2() {
|
||||
webidl.assertBranded(this, MessageChannel);
|
||||
return this.#port2;
|
||||
}
|
||||
|
||||
[Symbol.for("Deno.inspect")](inspect) {
|
||||
return `MessageChannel ${
|
||||
inspect({ port1: this.port1, port2: this.port2 })
|
||||
}`;
|
||||
}
|
||||
|
||||
get [Symbol.toStringTag]() {
|
||||
return "MessageChannel";
|
||||
}
|
||||
}
|
||||
|
||||
webidl.configurePrototype(MessageChannel);
|
||||
|
||||
const _id = Symbol("id");
|
||||
const _enabled = Symbol("enabled");
|
||||
|
||||
/**
|
||||
* @param {number} id
|
||||
* @returns {MessagePort}
|
||||
*/
|
||||
function createMessagePort(id) {
|
||||
const port = webidl.createBranded(MessagePort);
|
||||
setEventTargetData(port);
|
||||
port[_id] = id;
|
||||
return port;
|
||||
}
|
||||
|
||||
class MessagePort extends EventTarget {
|
||||
/** @type {number | null} */
|
||||
[_id] = null;
|
||||
/** @type {boolean} */
|
||||
[_enabled] = false;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
webidl.illegalConstructor();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {any} message
|
||||
* @param {object[] | PostMessageOptions} transferOrOptions
|
||||
*/
|
||||
postMessage(message, transferOrOptions = {}) {
|
||||
webidl.assertBranded(this, MessagePort);
|
||||
const prefix = "Failed to execute 'postMessage' on 'MessagePort'";
|
||||
webidl.requiredArguments(arguments.length, 1, { prefix });
|
||||
message = webidl.converters.any(message);
|
||||
let options;
|
||||
if (
|
||||
webidl.type(transferOrOptions) === "Object" &&
|
||||
transferOrOptions !== undefined &&
|
||||
transferOrOptions[Symbol.iterator] !== undefined
|
||||
) {
|
||||
const transfer = webidl.converters["sequence<object>"](
|
||||
transferOrOptions,
|
||||
{ prefix, context: "Argument 2" },
|
||||
);
|
||||
options = { transfer };
|
||||
} else {
|
||||
options = webidl.converters.PostMessageOptions(transferOrOptions, {
|
||||
prefix,
|
||||
context: "Argument 2",
|
||||
});
|
||||
}
|
||||
const { transfer } = options;
|
||||
if (transfer.includes(this)) {
|
||||
throw new DOMException("Can not tranfer self", "DataCloneError");
|
||||
}
|
||||
const data = serializeJsMessageData(message, transfer);
|
||||
if (this[_id] === null) return;
|
||||
core.opSync("op_message_port_post_message", this[_id], data);
|
||||
}
|
||||
|
||||
start() {
|
||||
webidl.assertBranded(this, MessagePort);
|
||||
if (this[_enabled]) return;
|
||||
(async () => {
|
||||
this[_enabled] = true;
|
||||
while (true) {
|
||||
if (this[_id] === null) break;
|
||||
const data = await core.opAsync(
|
||||
"op_message_port_recv_message",
|
||||
this[_id],
|
||||
);
|
||||
if (data === null) break;
|
||||
let message, transfer;
|
||||
try {
|
||||
const v = deserializeJsMessageData(data);
|
||||
message = v[0];
|
||||
transfer = v[1];
|
||||
} catch (err) {
|
||||
const event = new MessageEvent("messageerror", { data: err });
|
||||
this.dispatchEvent(event);
|
||||
return;
|
||||
}
|
||||
const event = new MessageEvent("message", {
|
||||
data: message,
|
||||
ports: transfer,
|
||||
});
|
||||
this.dispatchEvent(event);
|
||||
}
|
||||
this[_enabled] = false;
|
||||
})();
|
||||
}
|
||||
|
||||
close() {
|
||||
webidl.assertBranded(this, MessagePort);
|
||||
if (this[_id] !== null) {
|
||||
core.close(this[_id]);
|
||||
this[_id] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
defineEventHandler(MessagePort.prototype, "message", function (self) {
|
||||
self.start();
|
||||
});
|
||||
defineEventHandler(MessagePort.prototype, "messageerror");
|
||||
|
||||
webidl.configurePrototype(MessagePort);
|
||||
|
||||
/**
|
||||
* @returns {[number, number]}
|
||||
*/
|
||||
function opCreateEntangledMessagePort() {
|
||||
return core.opSync("op_message_port_create_entangled");
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {globalThis.__bootstrap.messagePort.MessageData} messageData
|
||||
* @returns {[any, object[]]}
|
||||
*/
|
||||
function deserializeJsMessageData(messageData) {
|
||||
/** @type {object[]} */
|
||||
const transferables = [];
|
||||
|
||||
for (const transferable of messageData.transferables) {
|
||||
switch (transferable.kind) {
|
||||
case "messagePort": {
|
||||
const port = createMessagePort(transferable.data);
|
||||
transferables.push(port);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new TypeError("Unreachable");
|
||||
}
|
||||
}
|
||||
|
||||
const data = core.deserialize(messageData.data);
|
||||
|
||||
return [data, transferables];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {any} data
|
||||
* @param {object[]} tranferables
|
||||
* @returns {globalThis.__bootstrap.messagePort.MessageData}
|
||||
*/
|
||||
function serializeJsMessageData(data, tranferables) {
|
||||
let serializedData;
|
||||
try {
|
||||
serializedData = core.serialize(data);
|
||||
} catch (err) {
|
||||
throw new DOMException(err.message, "DataCloneError");
|
||||
}
|
||||
|
||||
/** @type {globalThis.__bootstrap.messagePort.Transferable[]} */
|
||||
const serializedTransferables = [];
|
||||
|
||||
for (const transferable of tranferables) {
|
||||
if (transferable instanceof MessagePort) {
|
||||
webidl.assertBranded(transferable, MessagePort);
|
||||
const id = transferable[_id];
|
||||
if (id === null) {
|
||||
throw new DOMException(
|
||||
"Can not transfer disentangled message port",
|
||||
"DataCloneError",
|
||||
);
|
||||
}
|
||||
transferable[_id] = null;
|
||||
serializedTransferables.push({ kind: "messagePort", data: id });
|
||||
} else {
|
||||
throw new DOMException("Value not transferable", "DataCloneError");
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
data: serializedData,
|
||||
transferables: serializedTransferables,
|
||||
};
|
||||
}
|
||||
|
||||
webidl.converters.PostMessageOptions = webidl.createDictionaryConverter(
|
||||
"PostMessageOptions",
|
||||
[
|
||||
{
|
||||
key: "transfer",
|
||||
converter: webidl.converters["sequence<object>"],
|
||||
get defaultValue() {
|
||||
return [];
|
||||
},
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
window.__bootstrap.messagePort = {
|
||||
MessageChannel,
|
||||
MessagePort,
|
||||
deserializeJsMessageData,
|
||||
serializeJsMessageData,
|
||||
};
|
||||
})(globalThis);
|
|
@ -18,6 +18,7 @@ base64 = "0.13.0"
|
|||
deno_core = { version = "0.91.0", path = "../../core" }
|
||||
encoding_rs = "0.8.28"
|
||||
serde = "1.0"
|
||||
tokio = "1.7"
|
||||
uuid = { version = "0.8.2", features = ["v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
14
extensions/web/internal.d.ts
vendored
14
extensions/web/internal.d.ts
vendored
|
@ -4,9 +4,6 @@
|
|||
/// <reference lib="esnext" />
|
||||
|
||||
declare namespace globalThis {
|
||||
declare var TextEncoder: typeof TextEncoder;
|
||||
declare var TextDecoder: typeof TextDecoder;
|
||||
|
||||
declare namespace __bootstrap {
|
||||
declare var infra: {
|
||||
collectSequenceOfCodepoints(
|
||||
|
@ -85,5 +82,16 @@ declare namespace globalThis {
|
|||
ReadableStream: typeof ReadableStream;
|
||||
isReadableStreamDisturbed(stream: ReadableStream): boolean;
|
||||
};
|
||||
|
||||
declare namespace messagePort {
|
||||
declare type Transferable = {
|
||||
kind: "messagePort";
|
||||
data: number;
|
||||
};
|
||||
declare interface MessageData {
|
||||
data: Uint8Array;
|
||||
transferables: Transferable[];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
89
extensions/web/lib.deno_web.d.ts
vendored
89
extensions/web/lib.deno_web.d.ts
vendored
|
@ -648,3 +648,92 @@ interface TransformStreamDefaultControllerTransformCallback<I, O> {
|
|||
controller: TransformStreamDefaultController<O>,
|
||||
): void | PromiseLike<void>;
|
||||
}
|
||||
|
||||
interface MessageEventInit<T = any> extends EventInit {
|
||||
data?: T;
|
||||
origin?: string;
|
||||
lastEventId?: string;
|
||||
}
|
||||
|
||||
declare class MessageEvent<T = any> extends Event {
|
||||
/**
|
||||
* Returns the data of the message.
|
||||
*/
|
||||
readonly data: T;
|
||||
/**
|
||||
* Returns the last event ID string, for server-sent events.
|
||||
*/
|
||||
readonly lastEventId: string;
|
||||
/**
|
||||
* Returns transfered ports.
|
||||
*/
|
||||
readonly ports: ReadonlyArray<MessagePort>;
|
||||
constructor(type: string, eventInitDict?: MessageEventInit);
|
||||
}
|
||||
|
||||
type Transferable = ArrayBuffer | MessagePort;
|
||||
|
||||
interface PostMessageOptions {
|
||||
transfer?: Transferable[];
|
||||
}
|
||||
|
||||
/** The MessageChannel interface of the Channel Messaging API allows us to
|
||||
* create a new message channel and send data through it via its two MessagePort
|
||||
* properties. */
|
||||
declare class MessageChannel {
|
||||
constructor();
|
||||
readonly port1: MessagePort;
|
||||
readonly port2: MessagePort;
|
||||
}
|
||||
|
||||
interface MessagePortEventMap {
|
||||
"message": MessageEvent;
|
||||
"messageerror": MessageEvent;
|
||||
}
|
||||
|
||||
/** The MessagePort interface of the Channel Messaging API represents one of the
|
||||
* two ports of a MessageChannel, allowing messages to be sent from one port and
|
||||
* listening out for them arriving at the other. */
|
||||
declare class MessagePort extends EventTarget {
|
||||
onmessage: ((this: MessagePort, ev: MessageEvent) => any) | null;
|
||||
onmessageerror: ((this: MessagePort, ev: MessageEvent) => any) | null;
|
||||
/**
|
||||
* Disconnects the port, so that it is no longer active.
|
||||
*/
|
||||
close(): void;
|
||||
/**
|
||||
* Posts a message through the channel. Objects listed in transfer are
|
||||
* transferred, not just cloned, meaning that they are no longer usable on the
|
||||
* sending side.
|
||||
*
|
||||
* Throws a "DataCloneError" DOMException if transfer contains duplicate
|
||||
* objects or port, or if message could not be cloned.
|
||||
*/
|
||||
postMessage(message: any, transfer: Transferable[]): void;
|
||||
postMessage(message: any, options?: PostMessageOptions): void;
|
||||
/**
|
||||
* Begins dispatching messages received on the port. This is implictly called
|
||||
* when assiging a value to `this.onmessage`.
|
||||
*/
|
||||
start(): void;
|
||||
addEventListener<K extends keyof MessagePortEventMap>(
|
||||
type: K,
|
||||
listener: (this: MessagePort, ev: MessagePortEventMap[K]) => any,
|
||||
options?: boolean | AddEventListenerOptions,
|
||||
): void;
|
||||
addEventListener(
|
||||
type: string,
|
||||
listener: EventListenerOrEventListenerObject,
|
||||
options?: boolean | AddEventListenerOptions,
|
||||
): void;
|
||||
removeEventListener<K extends keyof MessagePortEventMap>(
|
||||
type: K,
|
||||
listener: (this: MessagePort, ev: MessagePortEventMap[K]) => any,
|
||||
options?: boolean | EventListenerOptions,
|
||||
): void;
|
||||
removeEventListener(
|
||||
type: string,
|
||||
listener: EventListenerOrEventListenerObject,
|
||||
options?: boolean | EventListenerOptions,
|
||||
): void;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
mod message_port;
|
||||
|
||||
pub use crate::message_port::JsMessageData;
|
||||
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::null_opbuf;
|
||||
use deno_core::error::range_error;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::include_js_files;
|
||||
use deno_core::op_async;
|
||||
use deno_core::op_sync;
|
||||
use deno_core::url::Url;
|
||||
use deno_core::Extension;
|
||||
|
@ -30,6 +35,10 @@ use std::sync::Mutex;
|
|||
use std::usize;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::message_port::op_message_port_create_entangled;
|
||||
use crate::message_port::op_message_port_post_message;
|
||||
use crate::message_port::op_message_port_recv_message;
|
||||
|
||||
/// Load and execute the javascript code.
|
||||
pub fn init(
|
||||
blob_url_store: BlobUrlStore,
|
||||
|
@ -52,6 +61,7 @@ pub fn init(
|
|||
"10_filereader.js",
|
||||
"11_blob_url.js",
|
||||
"12_location.js",
|
||||
"13_message_port.js",
|
||||
))
|
||||
.ops(vec![
|
||||
("op_base64_decode", op_sync(op_base64_decode)),
|
||||
|
@ -71,6 +81,18 @@ pub fn init(
|
|||
"op_file_revoke_object_url",
|
||||
op_sync(op_file_revoke_object_url),
|
||||
),
|
||||
(
|
||||
"op_message_port_create_entangled",
|
||||
op_sync(op_message_port_create_entangled),
|
||||
),
|
||||
(
|
||||
"op_message_port_post_message",
|
||||
op_sync(op_message_port_post_message),
|
||||
),
|
||||
(
|
||||
"op_message_port_recv_message",
|
||||
op_async(op_message_port_recv_message),
|
||||
),
|
||||
])
|
||||
.state(move |state| {
|
||||
state.put(blob_url_store.clone());
|
||||
|
|
208
extensions/web/message_port.rs
Normal file
208
extensions/web/message_port.rs
Normal file
|
@ -0,0 +1,208 @@
|
|||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use deno_core::{CancelFuture, Resource};
|
||||
use deno_core::{CancelHandle, OpState};
|
||||
use deno_core::{RcRef, ResourceId};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
enum Transferable {
|
||||
MessagePort(MessagePort),
|
||||
}
|
||||
|
||||
type MessagePortMessage = (Vec<u8>, Vec<Transferable>);
|
||||
|
||||
pub struct MessagePort {
|
||||
rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
|
||||
tx: UnboundedSender<MessagePortMessage>,
|
||||
}
|
||||
|
||||
impl MessagePort {
|
||||
pub fn send(
|
||||
&self,
|
||||
state: &mut OpState,
|
||||
data: JsMessageData,
|
||||
) -> Result<(), AnyError> {
|
||||
let transferables =
|
||||
deserialize_js_transferables(state, data.transferables)?;
|
||||
|
||||
// Swallow the failed to send error. It means the channel was disentangled,
|
||||
// but not cleaned up.
|
||||
self.tx.send((data.data.to_vec(), transferables)).ok();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn recv(
|
||||
&self,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
) -> Result<Option<JsMessageData>, AnyError> {
|
||||
let mut rx = self
|
||||
.rx
|
||||
.try_borrow_mut()
|
||||
.map_err(|_| type_error("Port receiver is already borrowed"))?;
|
||||
if let Some((data, transferables)) = rx.recv().await {
|
||||
let js_transferables =
|
||||
serialize_transferables(&mut state.borrow_mut(), transferables);
|
||||
return Ok(Some(JsMessageData {
|
||||
data: ZeroCopyBuf::from(data),
|
||||
transferables: js_transferables,
|
||||
}));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
|
||||
let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
|
||||
let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
|
||||
|
||||
let port1 = MessagePort {
|
||||
rx: RefCell::new(port1_rx),
|
||||
tx: port1_tx,
|
||||
};
|
||||
|
||||
let port2 = MessagePort {
|
||||
rx: RefCell::new(port2_rx),
|
||||
tx: port2_tx,
|
||||
};
|
||||
|
||||
(port1, port2)
|
||||
}
|
||||
|
||||
pub struct MessagePortResource {
|
||||
port: MessagePort,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for MessagePortResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"messagePort".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn op_message_port_create_entangled(
|
||||
state: &mut OpState,
|
||||
_: (),
|
||||
_: (),
|
||||
) -> Result<(ResourceId, ResourceId), AnyError> {
|
||||
let (port1, port2) = create_entangled_message_port();
|
||||
|
||||
let port1_id = state.resource_table.add(MessagePortResource {
|
||||
port: port1,
|
||||
cancel: CancelHandle::new(),
|
||||
});
|
||||
|
||||
let port2_id = state.resource_table.add(MessagePortResource {
|
||||
port: port2,
|
||||
cancel: CancelHandle::new(),
|
||||
});
|
||||
|
||||
Ok((port1_id, port2_id))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
|
||||
pub enum JsTransferable {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
MessagePort(ResourceId),
|
||||
}
|
||||
|
||||
fn deserialize_js_transferables(
|
||||
state: &mut OpState,
|
||||
js_transferables: Vec<JsTransferable>,
|
||||
) -> Result<Vec<Transferable>, AnyError> {
|
||||
let mut transferables = Vec::with_capacity(js_transferables.len());
|
||||
for js_transferable in js_transferables {
|
||||
match js_transferable {
|
||||
JsTransferable::MessagePort(id) => {
|
||||
let resource = state
|
||||
.resource_table
|
||||
.take::<MessagePortResource>(id)
|
||||
.ok_or_else(|| type_error("Invalid message port transfer"))?;
|
||||
resource.cancel.cancel();
|
||||
let resource = Rc::try_unwrap(resource)
|
||||
.map_err(|_| type_error("Message port is not ready for transfer"))?;
|
||||
transferables.push(Transferable::MessagePort(resource.port));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(transferables)
|
||||
}
|
||||
|
||||
fn serialize_transferables(
|
||||
state: &mut OpState,
|
||||
transferables: Vec<Transferable>,
|
||||
) -> Vec<JsTransferable> {
|
||||
let mut js_transferables = Vec::with_capacity(transferables.len());
|
||||
for transferable in transferables {
|
||||
match transferable {
|
||||
Transferable::MessagePort(port) => {
|
||||
let rid = state.resource_table.add(MessagePortResource {
|
||||
port,
|
||||
cancel: CancelHandle::new(),
|
||||
});
|
||||
js_transferables.push(JsTransferable::MessagePort(rid));
|
||||
}
|
||||
}
|
||||
}
|
||||
js_transferables
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
pub struct JsMessageData {
|
||||
data: ZeroCopyBuf,
|
||||
transferables: Vec<JsTransferable>,
|
||||
}
|
||||
|
||||
pub fn op_message_port_post_message(
|
||||
state: &mut OpState,
|
||||
rid: ResourceId,
|
||||
data: JsMessageData,
|
||||
) -> Result<(), AnyError> {
|
||||
for js_transferable in &data.transferables {
|
||||
match js_transferable {
|
||||
JsTransferable::MessagePort(id) => {
|
||||
if *id == rid {
|
||||
return Err(type_error("Can not transfer self message port"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get::<MessagePortResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
resource.port.send(state, data)
|
||||
}
|
||||
|
||||
pub async fn op_message_port_recv_message(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
_: (),
|
||||
) -> Result<Option<JsMessageData>, AnyError> {
|
||||
let resource = {
|
||||
let state = state.borrow();
|
||||
match state.resource_table.get::<MessagePortResource>(rid) {
|
||||
Some(resource) => resource,
|
||||
None => return Ok(None),
|
||||
}
|
||||
};
|
||||
let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
|
||||
resource.port.recv(state.clone()).or_cancel(cancel).await?
|
||||
}
|
|
@ -564,7 +564,10 @@
|
|||
converters.USVString,
|
||||
);
|
||||
converters["sequence<double>"] = createSequenceConverter(
|
||||
converters["double"],
|
||||
converters.double,
|
||||
);
|
||||
converters["sequence<object>"] = createSequenceConverter(
|
||||
converters.object,
|
||||
);
|
||||
converters["Promise<undefined>"] = createPromiseConverter(() => undefined);
|
||||
|
||||
|
@ -630,6 +633,7 @@
|
|||
get() {
|
||||
return member.defaultValue;
|
||||
},
|
||||
enumerable: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
15
extensions/webidl/internal.d.ts
vendored
15
extensions/webidl/internal.d.ts
vendored
|
@ -321,6 +321,21 @@ declare namespace globalThis {
|
|||
* Configure prototype properties enumerability / writability / configurability.
|
||||
*/
|
||||
declare function configurePrototype(prototype: any);
|
||||
|
||||
/**
|
||||
* Get the WebIDL / ES type of a value.
|
||||
*/
|
||||
declare function type(
|
||||
v: any,
|
||||
):
|
||||
| "Null"
|
||||
| "Undefined"
|
||||
| "Boolean"
|
||||
| "Number"
|
||||
| "String"
|
||||
| "Symbol"
|
||||
| "BigInt"
|
||||
| "Object";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ delete Object.prototype.__proto__;
|
|||
const formData = window.__bootstrap.formData;
|
||||
const fetch = window.__bootstrap.fetch;
|
||||
const prompt = window.__bootstrap.prompt;
|
||||
const messagePort = window.__bootstrap.messagePort;
|
||||
const denoNs = window.__bootstrap.denoNs;
|
||||
const denoNsUnstable = window.__bootstrap.denoNsUnstable;
|
||||
const errors = window.__bootstrap.errors.errors;
|
||||
|
@ -299,6 +300,8 @@ delete Object.prototype.__proto__;
|
|||
URLSearchParams: util.nonEnumerable(url.URLSearchParams),
|
||||
WebSocket: util.nonEnumerable(webSocket.WebSocket),
|
||||
BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel),
|
||||
MessageChannel: util.nonEnumerable(messagePort.MessageChannel),
|
||||
MessagePort: util.nonEnumerable(messagePort.MessagePort),
|
||||
Worker: util.nonEnumerable(worker.Worker),
|
||||
WritableStream: util.nonEnumerable(streams.WritableStream),
|
||||
WritableStreamDefaultWriter: util.nonEnumerable(
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit a8e5772a0f1c4d666acc5aee2423c38da7c9a71c
|
||||
Subproject commit 02ebfda73367c0419e19f83048fa5895a78cb418
|
|
@ -234,7 +234,9 @@
|
|||
},
|
||||
"hr-time": {
|
||||
"monotonic-clock.any.html": true,
|
||||
"basic.any.html": ["Performance interface extends EventTarget."],
|
||||
"basic.any.html": [
|
||||
"Performance interface extends EventTarget."
|
||||
],
|
||||
"idlharness.any.html": [
|
||||
"Performance interface: existence and properties of interface object",
|
||||
"Performance interface: existence and properties of interface prototype object",
|
||||
|
@ -587,6 +589,8 @@
|
|||
],
|
||||
"idlharness.any.html": true,
|
||||
"url-constructor.any.html": [
|
||||
"Parsing: <foo://ho|st/> against <about:blank>",
|
||||
"Parsing: <http://ho%7Cst/> against <about:blank>",
|
||||
"Parsing: <file://%43%7C> against <about:blank>",
|
||||
"Parsing: <file://%43|> against <about:blank>",
|
||||
"Parsing: <file://C%7C> against <about:blank>",
|
||||
|
@ -898,9 +902,7 @@
|
|||
"FileAPI": {
|
||||
"blob": {
|
||||
"Blob-array-buffer.any.html": true,
|
||||
"Blob-constructor.any.html": [
|
||||
"Passing a FrozenArray as the blobParts array should work (FrozenArray<MessagePort>)."
|
||||
],
|
||||
"Blob-constructor.any.html": true,
|
||||
"Blob-slice-overflow.any.html": true,
|
||||
"Blob-slice.any.html": true,
|
||||
"Blob-stream.any.html": true,
|
||||
|
@ -1010,7 +1012,33 @@
|
|||
"postMessage results in correct event"
|
||||
],
|
||||
"interface.any.html": true
|
||||
}
|
||||
},
|
||||
"message-channels": {
|
||||
"basics.any.html": true,
|
||||
"close.any.html": true,
|
||||
"dictionary-transferrable.any.html": false,
|
||||
"implied-start.any.html": true,
|
||||
"no-start.any.html": true,
|
||||
"user-activation.tentative.any.html": false,
|
||||
"worker-post-after-close.any.html": false,
|
||||
"worker.any.html": false
|
||||
},
|
||||
"Channel_postMessage_Blob.any.html": false,
|
||||
"Channel_postMessage_DataCloneErr.any.html": true,
|
||||
"Channel_postMessage_clone_port.any.html": true,
|
||||
"Channel_postMessage_clone_port_error.any.html": true,
|
||||
"Channel_postMessage_event_properties.any.html": true,
|
||||
"Channel_postMessage_ports_readonly_array.any.html": false,
|
||||
"Channel_postMessage_target_source.any.html": true,
|
||||
"Channel_postMessage_transfer_xsite_incoming_messages.window.html": false,
|
||||
"Channel_postMessage_with_transfer_entangled.any.html": true,
|
||||
"Channel_postMessage_with_transfer_incoming_messages.any.html": true,
|
||||
"Channel_postMessage_with_transfer_outgoing_messages.any.html": true,
|
||||
"MessageEvent-trusted.any.html": false,
|
||||
"MessageEvent-trusted.window.html": false,
|
||||
"MessageEvent.any.html": true,
|
||||
"MessagePort_initial_disabled.any.html": true,
|
||||
"MessagePort_onmessage_start.any.html": true
|
||||
},
|
||||
"xhr": {
|
||||
"formdata": {
|
||||
|
|
|
@ -122,7 +122,7 @@ export async function runSingleTest(
|
|||
harnessStatus = JSON.parse(line.slice(5));
|
||||
} else {
|
||||
stderr += line + "\n";
|
||||
console.error(stderr);
|
||||
console.error(line);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue