From 0b8d7d1d4b068df46a14895b2f55c00781bd1eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 16 Apr 2024 00:06:39 +0100 Subject: [PATCH] fix(ext/node): panic on 'worker_threads.receiveMessageOnPort' (#23386) Closes https://github.com/denoland/deno/issues/23362 Previously we were panicking if there was a pending read on a port and `receiveMessageOnPort` was called. This is now fixed by cancelling the pending read, trying to read a message and resuming reading in a loop. --- ext/node/polyfills/worker_threads.ts | 2 ++ ext/web/13_message_port.js | 18 +++++++++++++++++- ext/web/message_port.rs | 1 + tests/unit_node/worker_threads_test.ts | 22 ++++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 323095206a..f61e7e3e33 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -17,6 +17,7 @@ import { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + MessagePortReceiveMessageOnPortSymbol, nodeWorkerThreadCloseCb, refMessagePort, serializeJsMessageData, @@ -441,6 +442,7 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { err["code"] = "ERR_INVALID_ARG_TYPE"; throw err; } + port[MessagePortReceiveMessageOnPortSymbol] = true; const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]); if (data === null) return undefined; return { message: deserializeJsMessageData(data)[0] }; diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 62c0328c31..4e4184f2a3 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -88,6 +88,9 @@ const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); const MessagePortIdSymbol = _id; +const MessagePortReceiveMessageOnPortSymbol = Symbol( + "MessagePortReceiveMessageOnPort", +); const _enabled = Symbol("enabled"); const _refed = Symbol("refed"); const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); @@ -128,6 +131,10 @@ class MessagePort extends EventTarget { constructor() { super(); + ObjectDefineProperty(this, MessagePortReceiveMessageOnPortSymbol, { + value: false, + enumerable: false, + }); ObjectDefineProperty(this, nodeWorkerThreadCloseCb, { value: null, enumerable: false, @@ -189,7 +196,15 @@ class MessagePort extends EventTarget { this[_id], ); } catch (err) { - if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break; + if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) { + // If we were interrupted, check if the interruption is coming + // from `receiveMessageOnPort` API from Node compat, if so, continue. + if (this[MessagePortReceiveMessageOnPortSymbol]) { + this[MessagePortReceiveMessageOnPortSymbol] = false; + continue; + } + break; + } nodeWorkerThreadMaybeInvokeCloseCb(this); throw err; } @@ -444,6 +459,7 @@ export { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + MessagePortReceiveMessageOnPortSymbol, nodeWorkerThreadCloseCb, serializeJsMessageData, structuredClone, diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index 1cd29c64d8..ac33145b17 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -235,6 +235,7 @@ pub fn op_message_port_recv_message_sync( #[smi] rid: ResourceId, ) -> Result, AnyError> { let resource = state.resource_table.get::(rid)?; + resource.cancel.cancel(); let mut rx = resource.port.rx.borrow_mut(); match rx.try_recv() { diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 2351e10529..bd600469bf 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -414,3 +414,25 @@ Deno.test({ mainPort.close(); }, }); + +// Regression test for https://github.com/denoland/deno/issues/23362 +Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () { + const { port1, port2 } = new workerThreads.MessageChannel(); + + const message1 = { hello: "world" }; + const message2 = { foo: "bar" }; + + assertEquals(workerThreads.receiveMessageOnPort(port2), undefined); + port2.start(); + + port1.postMessage(message1); + port1.postMessage(message2); + assertEquals(workerThreads.receiveMessageOnPort(port2), { + message: message1, + }); + assertEquals(workerThreads.receiveMessageOnPort(port2), { + message: message2, + }); + port1.close(); + port2.close(); +});