1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 13:00:36 -05:00

fix(ext/node): worker_threads ESM handling (#22841)

Fixes #22840
Fixes #22964
This commit is contained in:
mash-graz 2024-03-20 04:42:22 +01:00 committed by GitHub
parent b9f8562754
commit 0d43a63636
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 312 additions and 130 deletions

4
Cargo.lock generated
View file

@ -5132,9 +5132,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "relative-path"
version = "1.9.0"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc"
[[package]]
name = "reqwest"

View file

@ -329,6 +329,7 @@ deno_core::extension!(deno_node,
ops::require::op_require_package_imports_resolve<P>,
ops::require::op_require_break_on_next_statement,
ops::util::op_node_guess_handle_type,
ops::worker_threads::op_worker_threads_filename<P>,
ops::crypto::op_node_create_private_key,
ops::crypto::op_node_create_public_key,
ops::ipc::op_node_child_ipc_pipe,

View file

@ -11,4 +11,5 @@ pub mod require;
pub mod util;
pub mod v8;
pub mod winerror;
pub mod worker_threads;
pub mod zlib;

View file

@ -195,7 +195,9 @@ pub fn op_require_resolve_deno_dir(
resolver
.resolve_package_folder_from_package(
&request,
&ModuleSpecifier::from_file_path(parent_filename).unwrap(),
&ModuleSpecifier::from_file_path(&parent_filename).unwrap_or_else(|_| {
panic!("Url::from_file_path: [{:?}]", parent_filename)
}),
NodeResolutionMode::Execution,
)
.ok()

View file

@ -0,0 +1,87 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::url::Url;
use deno_core::OpState;
use deno_fs::FileSystemRc;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use crate::resolution;
use crate::NodePermissions;
use crate::NodeResolver;
use crate::NpmResolverRc;
fn ensure_read_permission<P>(
state: &mut OpState,
file_path: &Path,
) -> Result<(), AnyError>
where
P: NodePermissions + 'static,
{
let resolver = state.borrow::<NpmResolverRc>();
let permissions = state.borrow::<P>();
resolver.ensure_read_permission(permissions, file_path)
}
#[op2]
#[string]
pub fn op_worker_threads_filename<P>(
state: &mut OpState,
#[string] specifier: String,
) -> Result<String, AnyError>
where
P: NodePermissions + 'static,
{
if specifier.starts_with("data:") {
return Ok(specifier);
}
let url: Url = if specifier.starts_with("file:") {
Url::parse(&specifier)?
} else {
let path = PathBuf::from(&specifier);
if path.is_relative() && !specifier.starts_with('.') {
return Err(generic_error(
"Relative path entries must start with '.' or '..'",
));
}
ensure_read_permission::<P>(state, &path)?;
let fs = state.borrow::<FileSystemRc>();
let canonicalized_path =
deno_core::strip_unc_prefix(fs.realpath_sync(&path)?);
Url::from_file_path(canonicalized_path)
.map_err(|e| generic_error(format!("URL from Path-String: {:#?}", e)))?
};
let url_path = url
.to_file_path()
.map_err(|e| generic_error(format!("URL to Path-String: {:#?}", e)))?;
ensure_read_permission::<P>(state, &url_path)?;
let fs = state.borrow::<FileSystemRc>();
if !fs.exists_sync(&url_path) {
return Err(generic_error(format!("File not found [{:?}]", url_path)));
}
let node_resolver = state.borrow::<Rc<NodeResolver>>();
match node_resolver.url_to_node_resolution(url)? {
resolution::NodeResolution::Esm(u) => Ok(u.to_string()),
resolution::NodeResolution::CommonJs(u) => wrap_cjs(u),
_ => Err(generic_error("Neither ESM nor CJS")),
}
}
///
/// Wrap a CJS file-URL and the required setup in a stringified `data:`-URL
///
fn wrap_cjs(url: Url) -> Result<String, AnyError> {
let path = url
.to_file_path()
.map_err(|e| generic_error(format!("URL to Path: {:#?}", e)))?;
let filename = path.file_name().unwrap().to_string_lossy();
Ok(format!(
"data:text/javascript,import {{ createRequire }} from \"node:module\";\
const require = createRequire(\"{}\"); require(\"./{}\");",
url, filename,
))
}

View file

@ -9,7 +9,7 @@ import {
op_host_recv_message,
op_host_terminate_worker,
op_message_port_recv_message_sync,
op_require_read_closest_package_json,
op_worker_threads_filename,
} from "ext:core/ops";
import {
deserializeJsMessageData,
@ -24,7 +24,6 @@ import { log } from "ext:runtime/06_util.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { isAbsolute, resolve } from "node:path";
const { ObjectPrototypeIsPrototypeOf } = primordials;
const {
@ -32,14 +31,8 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
StringPrototypeEndsWith,
StringPrototypeReplace,
StringPrototypeMatch,
StringPrototypeReplaceAll,
StringPrototypeToString,
StringPrototypeTrim,
SafeWeakMap,
SafeRegExp,
SafeMap,
TypeError,
} = primordials;
@ -66,74 +59,6 @@ export interface WorkerOptions {
name?: string;
}
const WHITESPACE_ENCODINGS: Record<string, string> = {
"\u0009": "%09",
"\u000A": "%0A",
"\u000B": "%0B",
"\u000C": "%0C",
"\u000D": "%0D",
"\u0020": "%20",
};
function encodeWhitespace(string: string): string {
return StringPrototypeReplaceAll(string, new SafeRegExp(/[\s]/g), (c) => {
return WHITESPACE_ENCODINGS[c] ?? c;
});
}
function toFileUrlPosix(path: string): URL {
if (!isAbsolute(path)) {
throw new TypeError("Must be an absolute path.");
}
const url = new URL("file:///");
url.pathname = encodeWhitespace(
StringPrototypeReplace(
StringPrototypeReplace(path, new SafeRegExp(/%/g), "%25"),
new SafeRegExp(/\\/g),
"%5C",
),
);
return url;
}
function toFileUrlWin32(path: string): URL {
if (!isAbsolute(path)) {
throw new TypeError("Must be an absolute path.");
}
const { 0: _, 1: hostname, 2: pathname } = StringPrototypeMatch(
path,
new SafeRegExp(/^(?:[/\\]{2}([^/\\]+)(?=[/\\](?:[^/\\]|$)))?(.*)/),
);
const url = new URL("file:///");
url.pathname = encodeWhitespace(
StringPrototypeReplace(pathname, new SafeRegExp(/%/g), "%25"),
);
if (hostname != null && hostname != "localhost") {
url.hostname = hostname;
if (!url.hostname) {
throw new TypeError("Invalid hostname.");
}
}
return url;
}
/**
* Converts a path string to a file URL.
*
* ```ts
* toFileUrl("/home/foo"); // new URL("file:///home/foo")
* toFileUrl("\\home\\foo"); // new URL("file:///home/foo")
* toFileUrl("C:\\Users\\foo"); // new URL("file:///C:/Users/foo")
* toFileUrl("\\\\127.0.0.1\\home\\foo"); // new URL("file://127.0.0.1/home/foo")
* ```
* @param path to convert to file URL
*/
function toFileUrl(path: string): URL {
return core.build.os == "windows"
? toFileUrlWin32(path)
: toFileUrlPosix(path);
}
const privateWorkerRef = Symbol("privateWorkerRef");
class NodeWorker extends EventEmitter {
#id = 0;
@ -162,29 +87,23 @@ class NodeWorker extends EventEmitter {
constructor(specifier: URL | string, options?: WorkerOptions) {
super();
if (options?.eval === true) {
if (
typeof specifier === "object" &&
!(specifier.protocol === "data:" || specifier.protocol === "file:")
) {
throw new TypeError(
"node:worker_threads support only 'file:' and 'data:' URLs",
);
}
if (options?.eval) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
specifier = resolve(specifier);
let pkg;
try {
pkg = op_require_read_closest_package_json(specifier);
} catch (_) {
// empty catch block when package json might not be present
}
if (
!(StringPrototypeEndsWith(
StringPrototypeToString(specifier),
".mjs",
)) ||
(pkg && pkg.exists && pkg.typ == "module")
) {
const cwdFileUrl = toFileUrl(Deno.cwd());
specifier =
`data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`;
} else {
specifier = toFileUrl(specifier as string);
}
} else if (
!(typeof specifier === "object" && specifier.protocol === "data:")
) {
// deno-lint-ignore prefer-primordials
specifier = specifier.toString();
specifier = op_worker_threads_filename(specifier);
}
// TODO(bartlomieu): this doesn't match the Node.js behavior, it should be

View file

@ -0,0 +1,23 @@
const { add } = require("./other_cjs_file.cjs");
const missing_toplevel_async = async () => {
return new Promise((resolve) => {
setTimeout(() => {
resolve;
}, 500);
});
};
async function main() {
/// async code doesn't seem to work within this CJS wrapper :(
//const p = await missing_toplevel_async();
const sum = add(2, 3);
if (sum != 5) {
throw ("Bad calculator!");
}
postMessage("hallo");
}
main();

View file

@ -1,3 +1,3 @@
import { myFunction } from "./other_file.js";
myFunction().then(() => {});
await myFunction();

View file

@ -0,0 +1,3 @@
import { myFunction } from "../other_file.js";
await myFunction();

View file

@ -0,0 +1,5 @@
module.exports = {
add: (a, b) => {
return a + b;
},
};

View file

@ -1,3 +1,8 @@
export async function myFunction() {
await new Promise((resolve) => setTimeout(resolve, 100));
await new Promise((resolve) =>
setTimeout(() => {
postMessage("hallo");
resolve;
}, 100)
);
}

View file

@ -0,0 +1,9 @@
export async function myFunction() {
await new Promise((resolve) =>
setTimeout(() => {
postMessage("hallo");
resolve;
}, 100)
);
}
await myFunction();

View file

@ -4,9 +4,10 @@ import {
assert,
assertEquals,
assertObjectMatch,
assertThrows,
fail,
} from "@std/assert/mod.ts";
import { fromFileUrl, relative } from "@std/path/mod.ts";
import { fromFileUrl, relative, sep } from "@std/path/mod.ts";
import * as workerThreads from "node:worker_threads";
import { EventEmitter, once } from "node:events";
@ -20,42 +21,42 @@ Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", (
});
Deno.test({
name: "[worker_threads] isMainThread",
name: "[node/worker_threads] isMainThread",
fn() {
assertEquals(workerThreads.isMainThread, true);
},
});
Deno.test({
name: "[worker_threads] threadId",
name: "[node/worker_threads] threadId",
fn() {
assertEquals(workerThreads.threadId, 0);
},
});
Deno.test({
name: "[worker_threads] resourceLimits",
name: "[node/worker_threads] resourceLimits",
fn() {
assertObjectMatch(workerThreads.resourceLimits, {});
},
});
Deno.test({
name: "[worker_threads] parentPort",
name: "[node/worker_threads] parentPort",
fn() {
assertEquals(workerThreads.parentPort, null);
},
});
Deno.test({
name: "[worker_threads] workerData",
name: "[node/worker_threads] workerData",
fn() {
assertEquals(workerThreads.workerData, null);
},
});
Deno.test({
name: "[worker_threads] setEnvironmentData / getEnvironmentData",
name: "[node/worker_threads] setEnvironmentData / getEnvironmentData",
fn() {
workerThreads.setEnvironmentData("test", "test");
assertEquals(workerThreads.getEnvironmentData("test"), "test");
@ -63,7 +64,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker threadId",
name: "[node/worker_threads] Worker threadId",
async fn() {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
@ -85,7 +86,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker basics",
name: "[node/worker_threads] Worker basics",
async fn() {
workerThreads.setEnvironmentData("test", "test");
workerThreads.setEnvironmentData(1, {
@ -118,7 +119,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker eval",
name: "[node/worker_threads] Worker eval",
async fn() {
const worker = new workerThreads.Worker(
`
@ -135,17 +136,141 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] worker thread with type module",
fn() {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_module/index.js", import.meta.url),
);
worker.terminate();
name: "[node/worker_threads] worker thread with type module",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_module/index.js", import.meta.url),
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[worker_threads] inheritances",
name: "[node/worker_threads] worker thread in nested module",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_module/nested/index.js", import.meta.url),
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[node/worker_threads] .cjs worker file within module",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_module/cjs-file.cjs", import.meta.url),
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[node/worker_threads] relativ path string",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
"./tests/unit_node/testdata/worker_module/index.js",
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[node/worker_threads] utf-8 path string",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
"./tests/unit_node/testdata/worker_module/βάρβαροι.js",
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[node/worker_threads] utf-8 path URL",
async fn() {
function p() {
return new Promise<workerThreads.Worker>((resolve, reject) => {
const worker = new workerThreads.Worker(
new URL(
"./testdata/worker_module/βάρβαροι.js",
import.meta.url,
),
);
worker.on("error", (e) => reject(e.message));
worker.on("message", () => resolve(worker));
});
}
await p();
},
});
Deno.test({
name: "[node/worker_threads] throws on relativ path without leading dot",
fn() {
assertThrows(
() => {
new workerThreads.Worker(
"tests/unit_node/testdata/worker_module/index.js",
);
},
);
},
});
Deno.test({
name: "[node/worker_threads] throws on unsupported URL protcol",
fn() {
assertThrows(
() => {
new workerThreads.Worker(new URL("https://example.com"));
},
);
},
});
Deno.test({
name: "[node/worker_threads] throws on non-existend file",
fn() {
assertThrows(
() => {
new workerThreads.Worker(new URL("file://very/unlikely"));
},
);
},
});
Deno.test({
name: "[node/worker_threads] inheritances",
async fn() {
const worker = new workerThreads.Worker(
`
@ -168,7 +293,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker workerData",
name: "[node/worker_threads] Worker workerData",
async fn() {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
@ -192,12 +317,14 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker with relative path",
name: "[node/worker_threads] Worker with relative path",
async fn() {
const worker = new workerThreads.Worker(relative(
Deno.cwd(),
fromFileUrl(new URL("./testdata/worker_threads.mjs", import.meta.url)),
));
const worker = new workerThreads.Worker(
`.${sep}` + relative(
Deno.cwd(),
fromFileUrl(new URL("./testdata/worker_threads.mjs", import.meta.url)),
),
);
worker.postMessage("Hello, how are you my thread?");
assertEquals((await once(worker, "message"))[0], "I'm fine!");
worker.terminate();
@ -205,7 +332,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] unref",
name: "[node/worker_threads] unref",
async fn() {
const timeout = setTimeout(() => fail("Test timed out"), 60_000);
const child = new Deno.Command(Deno.execPath(), {
@ -220,7 +347,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] SharedArrayBuffer",
name: "[node/worker_threads] SharedArrayBuffer",
async fn() {
const sab = new SharedArrayBuffer(Uint8Array.BYTES_PER_ELEMENT);
const uint = new Uint8Array(sab);
@ -240,7 +367,7 @@ Deno.test({
});
Deno.test({
name: "[worker_threads] Worker workerData with MessagePort",
name: "[node/worker_threads] Worker workerData with MessagePort",
async fn() {
const { port1: mainPort, port2: workerPort } = new workerThreads
.MessageChannel();
@ -256,7 +383,7 @@ Deno.test({
workerData,
} from "node:worker_threads";
parentPort.on("message", (msg) => {
console.log("message from main", msg);
/* console.log("message from main", msg); */
parentPort.postMessage("Hello from worker on parentPort!");
workerData.workerPort.postMessage("Hello from worker on workerPort!");
});