mirror of
https://github.com/denoland/deno.git
synced 2025-02-08 07:16:56 -05:00
fix(node): patch MessagePort in worker_thread message (#23871)
Our `MessagePort` to Node's `MessagePort` conversion logic was missing the case where a `MessagePort` is sent _inside_ the message. This broke `tinypool` which is used by `vitest` as it relies on some node specific methods on `MessagePort`. Fixes https://github.com/denoland/deno/issues/23854 , Fixes https://github.com/denoland/deno/pull/23871
This commit is contained in:
parent
ac418ba229
commit
8d4eb2144f
2 changed files with 50 additions and 10 deletions
|
@ -32,7 +32,9 @@ import process from "node:process";
|
||||||
const { JSONParse, JSONStringify, ObjectPrototypeIsPrototypeOf } = primordials;
|
const { JSONParse, JSONStringify, ObjectPrototypeIsPrototypeOf } = primordials;
|
||||||
const {
|
const {
|
||||||
Error,
|
Error,
|
||||||
|
ObjectHasOwn,
|
||||||
PromiseResolve,
|
PromiseResolve,
|
||||||
|
SafeSet,
|
||||||
Symbol,
|
Symbol,
|
||||||
SymbolFor,
|
SymbolFor,
|
||||||
SymbolIterator,
|
SymbolIterator,
|
||||||
|
@ -369,7 +371,7 @@ internals.__initWorkerThreads = (
|
||||||
defaultExport.parentPort = parentPort;
|
defaultExport.parentPort = parentPort;
|
||||||
defaultExport.threadId = threadId;
|
defaultExport.threadId = threadId;
|
||||||
|
|
||||||
workerData = patchMessagePortIfFound(workerData);
|
patchMessagePortIfFound(workerData);
|
||||||
|
|
||||||
parentPort.off = parentPort.removeListener = function (
|
parentPort.off = parentPort.removeListener = function (
|
||||||
this: ParentPort,
|
this: ParentPort,
|
||||||
|
@ -387,8 +389,8 @@ internals.__initWorkerThreads = (
|
||||||
) {
|
) {
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
const _listener = (ev: any) => {
|
const _listener = (ev: any) => {
|
||||||
let message = ev.data;
|
const message = ev.data;
|
||||||
message = patchMessagePortIfFound(message);
|
patchMessagePortIfFound(message);
|
||||||
return listener(message);
|
return listener(message);
|
||||||
};
|
};
|
||||||
listeners.set(listener, _listener);
|
listeners.set(listener, _listener);
|
||||||
|
@ -484,7 +486,10 @@ const listeners = new SafeWeakMap<
|
||||||
function webMessagePortToNodeMessagePort(port: MessagePort) {
|
function webMessagePortToNodeMessagePort(port: MessagePort) {
|
||||||
port.on = port.addListener = function (this: MessagePort, name, listener) {
|
port.on = port.addListener = function (this: MessagePort, name, listener) {
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
const _listener = (ev: any) => listener(ev.data);
|
const _listener = (ev: any) => {
|
||||||
|
patchMessagePortIfFound(ev.data);
|
||||||
|
listener(ev.data);
|
||||||
|
};
|
||||||
if (name == "message") {
|
if (name == "message") {
|
||||||
if (port.onmessage === null) {
|
if (port.onmessage === null) {
|
||||||
port.onmessage = _listener;
|
port.onmessage = _listener;
|
||||||
|
@ -534,19 +539,26 @@ function webMessagePortToNodeMessagePort(port: MessagePort) {
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(@marvinhagemeister): Recursively iterating over all message
|
||||||
|
// properties seems slow.
|
||||||
|
// Maybe there is a way we can patch the prototype of MessagePort _only_
|
||||||
|
// inside worker_threads? For now correctness is more important than perf.
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
function patchMessagePortIfFound(data: any) {
|
function patchMessagePortIfFound(data: any, seen = new SafeSet<any>()) {
|
||||||
|
if (data === null || typeof data !== "object" || seen.has(data)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
seen.add(data);
|
||||||
|
|
||||||
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data)) {
|
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data)) {
|
||||||
data = webMessagePortToNodeMessagePort(data);
|
webMessagePortToNodeMessagePort(data);
|
||||||
} else {
|
} else {
|
||||||
for (const obj in data as Record<string, unknown>) {
|
for (const obj in data as Record<string, unknown>) {
|
||||||
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data[obj])) {
|
if (ObjectHasOwn(data, obj)) {
|
||||||
data[obj] = webMessagePortToNodeMessagePort(data[obj] as MessagePort);
|
patchMessagePortIfFound(data[obj], seen);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return data;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
|
|
@ -562,3 +562,31 @@ Deno.test({
|
||||||
port1.close();
|
port1.close();
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Test for https://github.com/denoland/deno/issues/23854
|
||||||
|
Deno.test({
|
||||||
|
name: "[node/worker_threads] MessagePort.addListener is present",
|
||||||
|
async fn() {
|
||||||
|
const channel = new workerThreads.MessageChannel();
|
||||||
|
const worker = new workerThreads.Worker(
|
||||||
|
`
|
||||||
|
import { parentPort } from "node:worker_threads";
|
||||||
|
parentPort.addListener("message", message => {
|
||||||
|
if (message.foo) {
|
||||||
|
const success = typeof message.foo.bar.addListener === "function";
|
||||||
|
parentPort.postMessage(success ? "it works" : "it doesn't work")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
`,
|
||||||
|
{
|
||||||
|
eval: true,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
worker.postMessage({ foo: { bar: channel.port1 } }, [channel.port1]);
|
||||||
|
|
||||||
|
assertEquals((await once(worker, "message"))[0], "it works");
|
||||||
|
worker.terminate();
|
||||||
|
channel.port1.close();
|
||||||
|
channel.port2.close();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
Loading…
Add table
Reference in a new issue