From 7d7ebef4e2bd352a65f1d03541770cf1737dfbbd Mon Sep 17 00:00:00 2001 From: Satya Rohith Date: Mon, 11 Mar 2024 23:39:28 +0530 Subject: [PATCH] populate data on bootstrap --- cli/worker.rs | 2 +- ext/node/polyfills/02_init.js | 4 +- ext/node/polyfills/worker_threads.ts | 68 +++++++++------------------- runtime/js/99_main.js | 11 ++--- runtime/ops/worker_host.rs | 9 ++-- runtime/web_worker.rs | 19 +++----- 6 files changed, 39 insertions(+), 74 deletions(-) diff --git a/cli/worker.rs b/cli/worker.rs index c0126f648f..6975144778 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -841,7 +841,7 @@ fn create_web_worker_callback( stdio: stdio.clone(), cache_storage_dir, feature_checker, - maybe_worker_data: args.maybe_worker_data, + maybe_worker_metadata: args.maybe_worker_metadata, }; WebWorker::bootstrap_from_options( diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js index 9148d9d603..04820b837f 100644 --- a/ext/node/polyfills/02_init.js +++ b/ext/node/polyfills/02_init.js @@ -14,7 +14,7 @@ function initialize( usesLocalNodeModulesDir, argv0, runningOnMainThread, - maybeWorkerData, + maybeWorkerMetadata, ) { if (initialized) { throw Error("Node runtime already initialized"); @@ -39,7 +39,7 @@ function initialize( // FIXME(bartlomieju): not nice to depend on `Deno` namespace here // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); - internals.__initWorkerThreads(runningOnMainThread, maybeWorkerData); + internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata); internals.__setupChildProcessIpcChannel(); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index fafe4fbeee..d436ea4263 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -22,7 +22,7 @@ import { import * as webidl from "ext:deno_webidl/00_webidl.js"; import { log } from "ext:runtime/06_util.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; -import { EventEmitter, once } from "node:events"; +import { EventEmitter } from "node:events"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { isAbsolute, resolve } from "node:path"; @@ -42,7 +42,6 @@ const { SafeRegExp, SafeMap, TypeError, - PromisePrototypeThen, } = primordials; export interface WorkerOptions { @@ -196,12 +195,13 @@ class NodeWorker extends EventEmitter { name = "[worker eval]"; } this.#name = name; + this.threadId = ++threads; - const maybeWorkerData = options?.workerData; - const serializedWorkerData = maybeWorkerData - ? core.serialize(maybeWorkerData) - : undefined; - + const serializedWorkerMetadata = serializeJsMessageData({ + workerData: options?.workerData, + environmentData: environmentData, + threadId: this.threadId, + }, options?.transferList ?? []); const id = op_create_worker( { // deno-lint-ignore prefer-primordials @@ -212,15 +212,11 @@ class NodeWorker extends EventEmitter { name: this.#name, workerType: "module", }, + serializedWorkerMetadata, ); this.#id = id; this.#pollControl(); this.#pollMessages(); - - this.postMessage({ - environmentData, - threadId: (this.threadId = ++threads), - }, options?.transferList || []); // https://nodejs.org/api/worker_threads.html#event-online this.emit("online"); } @@ -394,7 +390,7 @@ let parentPort: ParentPort = null as any; internals.__initWorkerThreads = ( runningOnMainThread: boolean, - maybeWorkerData, + maybeWorkerMetadata, ) => { isMainThread = runningOnMainThread; @@ -417,29 +413,13 @@ internals.__initWorkerThreads = ( >(); parentPort = self as ParentPort; - workerData = maybeWorkerData; + const { 0: metadata, 1: _ } = maybeWorkerMetadata; + workerData = metadata.workerData; + environmentData = metadata.environmentData; + threadId = metadata.threadId; defaultExport.workerData = workerData; defaultExport.parentPort = parentPort; - - const initPromise = PromisePrototypeThen( - once( - parentPort, - "message", - ), - (result) => { - // TODO(bartlomieju): just so we don't error out here. It's still racy, - // but should be addressed by https://github.com/denoland/deno/issues/22783 - // shortly. - const data = result[0].data ?? {}; - // TODO(kt3k): The below values are set asynchronously - // using the first message from the parent. - // This should be done synchronously. - threadId = data.threadId; - environmentData = data.environmentData; - - defaultExport.threadId = threadId; - }, - ); + defaultExport.threadId = threadId; parentPort.off = parentPort.removeListener = function ( this: ParentPort, @@ -455,22 +435,18 @@ internals.__initWorkerThreads = ( name, listener, ) { - PromisePrototypeThen(initPromise, () => { - // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - }); + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); return this; }; parentPort.once = function (this: ParentPort, name, listener) { - PromisePrototypeThen(initPromise, () => { - // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - }); + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); return this; }; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index c2aa45f2e0..27ba488e73 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -786,7 +786,7 @@ function bootstrapWorkerRuntime( runtimeOptions, name, internalName, - maybeWorkerData, + maybeWorkerMetadata, ) { if (hasBootstrapped) { throw new Error("Worker runtime already bootstrapped"); @@ -909,17 +909,16 @@ function bootstrapWorkerRuntime( // existing global `Deno` with `Deno` namespace from "./deno.ts". ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs)); - let workerData = undefined; - if (maybeWorkerData) { - workerData = core.deserialize(maybeWorkerData); - } + const workerMetadata = maybeWorkerMetadata + ? messagePort.deserializeJsMessageData(maybeWorkerMetadata) + : undefined; if (nodeBootstrap) { nodeBootstrap( hasNodeModulesDir, argv0, /* runningOnMainThread */ false, - workerData, + workerMetadata, ); } } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 30754aab1a..d1b318f0f2 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -15,10 +15,8 @@ use crate::worker::FormatJsErrorFn; use deno_core::error::AnyError; use deno_core::op2; use deno_core::serde::Deserialize; -use deno_core::v8; use deno_core::CancelFuture; use deno_core::CancelHandle; -use deno_core::JsBuffer; use deno_core::ModuleSpecifier; use deno_core::OpState; use deno_web::JsMessageData; @@ -37,7 +35,7 @@ pub struct CreateWebWorkerArgs { pub permissions: PermissionsContainer, pub main_module: ModuleSpecifier, pub worker_type: WebWorkerType, - pub maybe_worker_data: Option>, + pub maybe_worker_metadata: Option, } pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) @@ -124,7 +122,7 @@ pub struct CreateWorkerArgs { fn op_create_worker( state: &mut OpState, #[serde] args: CreateWorkerArgs, - #[buffer] maybe_worker_data: Option, + #[serde] maybe_worker_metadata: Option, ) -> Result { let specifier = args.specifier.clone(); let maybe_source_code = if args.has_source_code { @@ -178,7 +176,6 @@ fn op_create_worker( // Setup new thread let thread_builder = std::thread::Builder::new().name(format!("{worker_id}")); - let maybe_worker_data = maybe_worker_data.map(|buf| buf.to_vec()); // Spawn it thread_builder.spawn(move || { // Any error inside this block is terminal: @@ -194,7 +191,7 @@ fn op_create_worker( permissions: worker_permissions, main_module: module_specifier.clone(), worker_type, - maybe_worker_data, + maybe_worker_metadata, }); // Send thread safe handle from newly created worker to host thread diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 4a0989c0ad..f35d389210 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -24,7 +24,6 @@ use deno_core::serde::Deserialize; use deno_core::serde::Serialize; use deno_core::serde_json::json; use deno_core::v8; -use deno_core::v8::ValueDeserializerHelper; use deno_core::CancelHandle; use deno_core::CompiledWasmModuleStore; use deno_core::Extension; @@ -49,6 +48,7 @@ use deno_terminal::colors; use deno_tls::RootCertStoreProvider; use deno_web::create_entangled_message_port; use deno_web::BlobStore; +use deno_web::JsMessageData; use deno_web::MessagePort; use log::debug; use std::cell::RefCell; @@ -333,7 +333,7 @@ pub struct WebWorker { poll_for_messages_fn: Option>, bootstrap_fn_global: Option>, // Consumed when `bootstrap_fn` is called - maybe_worker_data: Option>, + maybe_worker_metadata: Option, } pub struct WebWorkerOptions { @@ -359,7 +359,7 @@ pub struct WebWorkerOptions { pub cache_storage_dir: Option, pub stdio: Stdio, pub feature_checker: Arc, - pub maybe_worker_data: Option>, + pub maybe_worker_metadata: Option, } impl WebWorker { @@ -605,7 +605,7 @@ impl WebWorker { main_module, poll_for_messages_fn: None, bootstrap_fn_global: Some(bootstrap_fn_global), - maybe_worker_data: options.maybe_worker_data, + maybe_worker_metadata: options.maybe_worker_metadata, }, external_handle, ) @@ -622,15 +622,8 @@ impl WebWorker { let bootstrap_fn = v8::Local::new(scope, bootstrap_fn); let undefined = v8::undefined(scope); let mut worker_data: v8::Local = v8::undefined(scope).into(); - if let Some(buf) = self.maybe_worker_data.take() { - let len = buf.len(); - let store = v8::ArrayBuffer::new_backing_store_from_boxed_slice( - buf.into_boxed_slice(), - ); - let ab = - v8::ArrayBuffer::with_backing_store(scope, &store.make_shared()); - let v8_buf = v8::Uint8Array::new(scope, ab, 0, len).unwrap(); - worker_data = v8_buf.into(); + if let Some(data) = self.maybe_worker_metadata.take() { + worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap(); } let name_str: v8::Local = v8::String::new(scope, &self.name).unwrap().into();