From dba1c589338ad48d13bbe5254e102a8104afcbfe Mon Sep 17 00:00:00 2001 From: Satya Rohith Date: Wed, 3 Apr 2024 16:42:16 +0530 Subject: [PATCH] fix(ext/node): patch MessagePort if provided as workerData (#23198) MessagePort if directly assigned to workerData property instead of embedding it in an object then it is not patched to a NodeMessagePort. This commit fixes the bug. --- ext/node/polyfills/worker_threads.ts | 37 +++++++++---------- .../workers/node_worker_message_port.mjs | 2 +- .../workers/node_worker_message_port_1.cjs | 2 +- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 49562d8921..c34f1fe23d 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -343,14 +343,7 @@ internals.__initWorkerThreads = ( defaultExport.parentPort = parentPort; defaultExport.threadId = threadId; - for (const obj in workerData as Record) { - if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) { - workerData[obj] = webMessagePortToNodeMessagePort( - workerData[obj] as MessagePort, - ); - break; - } - } + workerData = patchMessagePortIfFound(workerData); parentPort.off = parentPort.removeListener = function ( this: ParentPort, @@ -369,18 +362,7 @@ internals.__initWorkerThreads = ( // deno-lint-ignore no-explicit-any const _listener = (ev: any) => { let message = ev.data; - if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) { - message = webMessagePortToNodeMessagePort(message); - } else { - for (const obj in message) { - if ( - ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj]) - ) { - message[obj] = webMessagePortToNodeMessagePort(message[obj]); - break; - } - } - } + message = patchMessagePortIfFound(message); return listener(message); }; listeners.set(listener, _listener); @@ -481,6 +463,21 @@ function webMessagePortToNodeMessagePort(port: MessagePort) { return port; } +// deno-lint-ignore no-explicit-any +function patchMessagePortIfFound(data: any) { + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data)) { + data = webMessagePortToNodeMessagePort(data); + } else { + for (const obj in data as Record) { + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data[obj])) { + data[obj] = webMessagePortToNodeMessagePort(data[obj] as MessagePort); + break; + } + } + } + return data; +} + export { BroadcastChannel, MessagePort, diff --git a/tests/testdata/workers/node_worker_message_port.mjs b/tests/testdata/workers/node_worker_message_port.mjs index 71640fb400..91c7e062d1 100644 --- a/tests/testdata/workers/node_worker_message_port.mjs +++ b/tests/testdata/workers/node_worker_message_port.mjs @@ -9,7 +9,7 @@ const deferred = createDeferred(); const worker = new workerThreads.Worker( import.meta.resolve("./node_worker_message_port_1.cjs"), { - workerData: { workerPort }, + workerData: workerPort, transferList: [workerPort], }, ); diff --git a/tests/testdata/workers/node_worker_message_port_1.cjs b/tests/testdata/workers/node_worker_message_port_1.cjs index 01739c51ef..3f78cd539d 100644 --- a/tests/testdata/workers/node_worker_message_port_1.cjs +++ b/tests/testdata/workers/node_worker_message_port_1.cjs @@ -1,7 +1,7 @@ const { parentPort, workerData } = require("worker_threads"); parentPort.on("message", (msg) => { - const workerPort = workerData.workerPort; + const workerPort = workerData; parentPort.postMessage("Hello from worker on parentPort!"); workerPort.postMessage("Hello from worker on workerPort!"); workerPort.on("close", () => console.log("worker port closed"));