diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 36314675ac..8bbd0e9296 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -32,7 +32,9 @@ import process from "node:process"; const { JSONParse, JSONStringify, ObjectPrototypeIsPrototypeOf } = primordials; const { Error, + ObjectHasOwn, PromiseResolve, + SafeSet, Symbol, SymbolFor, SymbolIterator, @@ -369,7 +371,7 @@ internals.__initWorkerThreads = ( defaultExport.parentPort = parentPort; defaultExport.threadId = threadId; - workerData = patchMessagePortIfFound(workerData); + patchMessagePortIfFound(workerData); parentPort.off = parentPort.removeListener = function ( this: ParentPort, @@ -387,8 +389,8 @@ internals.__initWorkerThreads = ( ) { // deno-lint-ignore no-explicit-any const _listener = (ev: any) => { - let message = ev.data; - message = patchMessagePortIfFound(message); + const message = ev.data; + patchMessagePortIfFound(message); return listener(message); }; listeners.set(listener, _listener); @@ -484,7 +486,10 @@ const listeners = new SafeWeakMap< function webMessagePortToNodeMessagePort(port: MessagePort) { port.on = port.addListener = function (this: MessagePort, name, listener) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + patchMessagePortIfFound(ev.data); + listener(ev.data); + }; if (name == "message") { if (port.onmessage === null) { port.onmessage = _listener; @@ -534,19 +539,26 @@ function webMessagePortToNodeMessagePort(port: MessagePort) { return port; } +// TODO(@marvinhagemeister): Recursively iterating over all message +// properties seems slow. +// Maybe there is a way we can patch the prototype of MessagePort _only_ +// inside worker_threads? For now correctness is more important than perf. // deno-lint-ignore no-explicit-any -function patchMessagePortIfFound(data: any) { +function patchMessagePortIfFound(data: any, seen = new SafeSet()) { + if (data === null || typeof data !== "object" || seen.has(data)) { + return; + } + seen.add(data); + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data)) { - data = webMessagePortToNodeMessagePort(data); + webMessagePortToNodeMessagePort(data); } else { for (const obj in data as Record) { - if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data[obj])) { - data[obj] = webMessagePortToNodeMessagePort(data[obj] as MessagePort); - break; + if (ObjectHasOwn(data, obj)) { + patchMessagePortIfFound(data[obj], seen); } } } - return data; } export { diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index e16bc89666..9991e5787e 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -562,3 +562,31 @@ Deno.test({ port1.close(); }, }); + +// Test for https://github.com/denoland/deno/issues/23854 +Deno.test({ + name: "[node/worker_threads] MessagePort.addListener is present", + async fn() { + const channel = new workerThreads.MessageChannel(); + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + parentPort.addListener("message", message => { + if (message.foo) { + const success = typeof message.foo.bar.addListener === "function"; + parentPort.postMessage(success ? "it works" : "it doesn't work") + } + }) + `, + { + eval: true, + }, + ); + worker.postMessage({ foo: { bar: channel.port1 } }, [channel.port1]); + + assertEquals((await once(worker, "message"))[0], "it works"); + worker.terminate(); + channel.port1.close(); + channel.port2.close(); + }, +});