From 9845361153f35f6a68a82eb3a13845fddbeab026 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sun, 14 May 2023 15:40:01 -0600 Subject: [PATCH] refactor(core): bake single-thread assumptions into spawn/spawn_blocking (#19056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Partially supersedes #19016. This migrates `spawn` and `spawn_blocking` to `deno_core`, and removes the requirement for `spawn` tasks to be `Send` given our single-threaded executor. While we don't need to technically do anything w/`spawn_blocking`, this allows us to have a single `JoinHandle` type that works for both cases, and allows us to more easily experiment with alternative `spawn_blocking` implementations that do not require tokio (ie: rayon). Async ops (+~35%): Before: ``` time 1310 ms rate 763358 time 1267 ms rate 789265 time 1259 ms rate 794281 time 1266 ms rate 789889 ``` After: ``` time 956 ms rate 1046025 time 954 ms rate 1048218 time 924 ms rate 1082251 time 920 ms rate 1086956 ``` HTTP serve (+~4.4%): Before: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 68.78us 19.77us 1.43ms 86.84% Req/Sec 68.78k 5.00k 73.84k 91.58% 1381833 requests in 10.10s, 167.36MB read Requests/sec: 136823.29 Transfer/sec: 16.57MB ``` After: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 63.12us 17.43us 1.11ms 85.13% Req/Sec 71.82k 3.71k 77.02k 79.21% 1443195 requests in 10.10s, 174.79MB read Requests/sec: 142921.99 Transfer/sec: 17.31MB ``` Suggested-By: alice@ryhl.io Co-authored-by: Bartek IwaƄczuk --- cli/cache/cache_db.rs | 5 +- cli/cache/incremental.rs | 5 +- cli/lsp/client.rs | 7 +- cli/lsp/diagnostics.rs | 14 +- cli/lsp/language_server.rs | 3 +- cli/lsp/parent_process_checker.rs | 3 +- cli/lsp/testing/execution.rs | 14 +- cli/main.rs | 4 +- cli/npm/resolvers/common.rs | 3 +- cli/npm/resolvers/local.rs | 5 +- cli/tests/integration/cert_tests.rs | 173 +++++++++++------------ cli/tests/integration/inspector_tests.rs | 2 +- cli/tests/integration/run_tests.rs | 75 +++++----- cli/tools/bench.rs | 14 +- cli/tools/fmt.rs | 3 +- cli/tools/repl/mod.rs | 3 +- cli/tools/task.rs | 14 +- cli/tools/test.rs | 24 ++-- cli/tools/upgrade.rs | 3 +- cli/util/draw_thread.rs | 3 +- cli/util/fs.rs | 3 +- core/Cargo.toml | 2 +- core/lib.rs | 1 + core/task.rs | 131 +++++++++++++++++ core/task_queue.rs | 2 +- ext/cache/sqlite.rs | 13 +- ext/crypto/decrypt.rs | 3 +- ext/crypto/encrypt.rs | 3 +- ext/crypto/generate_key.rs | 3 +- ext/crypto/lib.rs | 3 +- ext/ffi/call.rs | 5 +- ext/fs/std_fs.rs | 43 +++--- ext/http/http_next.rs | 19 ++- ext/http/lib.rs | 8 +- ext/io/lib.rs | 5 +- ext/net/ops_tls.rs | 6 +- ext/node/ops/crypto/mod.rs | 48 +++---- ext/websocket/lib.rs | 2 +- runtime/inspector_server.rs | 5 +- runtime/tokio_util.rs | 13 +- runtime/web_worker.rs | 4 +- tools/wpt/expectation.json | 4 +- 42 files changed, 415 insertions(+), 288 deletions(-) create mode 100644 core/task.rs diff --git a/cli/cache/cache_db.rs b/cli/cache/cache_db.rs index 90840de1a0..e05ecd962b 100644 --- a/cli/cache/cache_db.rs +++ b/cli/cache/cache_db.rs @@ -3,6 +3,7 @@ use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::MutexGuard; +use deno_core::task::spawn_blocking; use deno_runtime::deno_webstorage::rusqlite; use deno_runtime::deno_webstorage::rusqlite::Connection; use deno_runtime::deno_webstorage::rusqlite::OptionalExtension; @@ -95,7 +96,7 @@ impl Drop for CacheDB { // Hand off SQLite connection to another thread to do the surprisingly expensive cleanup let inner = inner.into_inner().into_inner(); if let Some(conn) = inner { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { drop(conn); log::trace!( "Cleaned up SQLite connection at {}", @@ -168,7 +169,7 @@ impl CacheDB { fn spawn_eager_init_thread(&self) { let clone = self.clone(); debug_assert!(tokio::runtime::Handle::try_current().is_ok()); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let lock = clone.conn.lock(); clone.initialize(&lock); }); diff --git a/cli/cache/incremental.rs b/cli/cache/incremental.rs index deb30cdd18..c50b876fa9 100644 --- a/cli/cache/incremental.rs +++ b/cli/cache/incremental.rs @@ -7,9 +7,10 @@ use std::path::PathBuf; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::serde_json; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_runtime::deno_webstorage::rusqlite::params; use serde::Serialize; -use tokio::task::JoinHandle; use super::cache_db::CacheDB; use super::cache_db::CacheDBConfiguration; @@ -93,7 +94,7 @@ impl IncrementalCacheInner { tokio::sync::mpsc::unbounded_channel::(); // sqlite isn't `Sync`, so we do all the updating on a dedicated task - let handle = tokio::task::spawn(async move { + let handle = spawn(async move { while let Some(message) = receiver.recv().await { match message { ReceiverMessage::Update(path, hash) => { diff --git a/cli/lsp/client.rs b/cli/lsp/client.rs index d24d4c2a9e..4923a4585e 100644 --- a/cli/lsp/client.rs +++ b/cli/lsp/client.rs @@ -8,6 +8,7 @@ use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::Value; +use deno_core::task::spawn; use tower_lsp::lsp_types as lsp; use tower_lsp::lsp_types::ConfigurationItem; @@ -56,7 +57,7 @@ impl Client { ) { // do on a task in case the caller currently is in the lsp lock let client = self.0.clone(); - tokio::task::spawn(async move { + spawn(async move { client.send_registry_state_notification(params).await; }); } @@ -64,7 +65,7 @@ impl Client { pub fn send_test_notification(&self, params: TestingNotification) { // do on a task in case the caller currently is in the lsp lock let client = self.0.clone(); - tokio::task::spawn(async move { + spawn(async move { client.send_test_notification(params).await; }); } @@ -77,7 +78,7 @@ impl Client { // do on a task in case the caller currently is in the lsp lock let client = self.0.clone(); let message = message.to_string(); - tokio::task::spawn(async move { + spawn(async move { client.show_message(message_type, message).await; }); } diff --git a/cli/lsp/diagnostics.rs b/cli/lsp/diagnostics.rs index 0f96a498bd..7b5a30a0ea 100644 --- a/cli/lsp/diagnostics.rs +++ b/cli/lsp/diagnostics.rs @@ -25,6 +25,8 @@ use deno_core::resolve_url; use deno_core::serde::Deserialize; use deno_core::serde_json; use deno_core::serde_json::json; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_core::ModuleSpecifier; use deno_graph::Resolution; use deno_graph::ResolutionError; @@ -197,9 +199,9 @@ impl DiagnosticsServer { runtime.block_on(async { let mut token = CancellationToken::new(); - let mut ts_handle: Option> = None; - let mut lint_handle: Option> = None; - let mut deps_handle: Option> = None; + let mut ts_handle: Option> = None; + let mut lint_handle: Option> = None; + let mut deps_handle: Option> = None; let diagnostics_publisher = DiagnosticsPublisher::new(client.clone()); loop { @@ -213,7 +215,7 @@ impl DiagnosticsServer { diagnostics_publisher.clear().await; let previous_ts_handle = ts_handle.take(); - ts_handle = Some(tokio::spawn({ + ts_handle = Some(spawn({ let performance = performance.clone(); let diagnostics_publisher = diagnostics_publisher.clone(); let ts_server = ts_server.clone(); @@ -265,7 +267,7 @@ impl DiagnosticsServer { })); let previous_deps_handle = deps_handle.take(); - deps_handle = Some(tokio::spawn({ + deps_handle = Some(spawn({ let performance = performance.clone(); let diagnostics_publisher = diagnostics_publisher.clone(); let token = token.clone(); @@ -293,7 +295,7 @@ impl DiagnosticsServer { })); let previous_lint_handle = lint_handle.take(); - lint_handle = Some(tokio::spawn({ + lint_handle = Some(spawn({ let performance = performance.clone(); let diagnostics_publisher = diagnostics_publisher.clone(); let token = token.clone(); diff --git a/cli/lsp/language_server.rs b/cli/lsp/language_server.rs index de5cd6f09c..e76ea0040a 100644 --- a/cli/lsp/language_server.rs +++ b/cli/lsp/language_server.rs @@ -8,6 +8,7 @@ use deno_core::resolve_url; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::task::spawn; use deno_core::ModuleSpecifier; use deno_runtime::deno_fs; use deno_runtime::deno_node::NodeResolver; @@ -240,7 +241,7 @@ impl LanguageServer { let cli_options = result.cli_options; let roots = result.roots; let open_docs = result.open_docs; - let handle = tokio::task::spawn_local(async move { + let handle = spawn(async move { create_graph_for_caching(cli_options, roots, open_docs).await }); if let Err(err) = handle.await.unwrap() { diff --git a/cli/lsp/parent_process_checker.rs b/cli/lsp/parent_process_checker.rs index 4cc3bcef3b..f83543c04c 100644 --- a/cli/lsp/parent_process_checker.rs +++ b/cli/lsp/parent_process_checker.rs @@ -1,5 +1,6 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::task::spawn; use tokio::time::sleep; use tokio::time::Duration; @@ -7,7 +8,7 @@ use tokio::time::Duration; /// provided process id. Once that process no longer exists /// it will terminate the current process. pub fn start(parent_process_id: u32) { - tokio::task::spawn(async move { + spawn(async move { loop { sleep(Duration::from_secs(30)).await; diff --git a/cli/lsp/testing/execution.rs b/cli/lsp/testing/execution.rs index 4834cd0c9c..ce8c8b5acc 100644 --- a/cli/lsp/testing/execution.rs +++ b/cli/lsp/testing/execution.rs @@ -24,9 +24,11 @@ use deno_core::futures::stream; use deno_core::futures::StreamExt; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; +use deno_core::task::spawn; +use deno_core::task::spawn_blocking; use deno_core::ModuleSpecifier; use deno_runtime::permissions::Permissions; -use deno_runtime::tokio_util::run_local; +use deno_runtime::tokio_util::create_and_run_current_thread; use indexmap::IndexMap; use std::collections::HashMap; use std::collections::HashSet; @@ -284,7 +286,7 @@ impl TestRun { }; let token = self.token.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { if fail_fast_tracker.should_stop() { return Ok(()); } @@ -292,13 +294,13 @@ impl TestRun { let file_result = if token.is_cancelled() { Ok(()) } else { - run_local(test::test_specifier( - &worker_factory, + create_and_run_current_thread(test::test_specifier( + worker_factory, permissions, specifier, sender.clone(), fail_fast_tracker, - &test::TestSpecifierOptions { + test::TestSpecifierOptions { filter, shuffle: None, trace_ops: false, @@ -331,7 +333,7 @@ impl TestRun { )); let handler = { - tokio::task::spawn(async move { + spawn(async move { let earlier = Instant::now(); let mut summary = test::TestSummary::new(); let mut used_only = false; diff --git a/cli/main.rs b/cli/main.rs index 75425cf105..023d5a2084 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -46,7 +46,7 @@ use deno_core::error::AnyError; use deno_core::error::JsError; use deno_runtime::colors; use deno_runtime::fmt_errors::format_js_error; -use deno_runtime::tokio_util::run_local; +use deno_runtime::tokio_util::create_and_run_current_thread; use factory::CliFactory; use std::env; use std::env::current_exe; @@ -294,7 +294,7 @@ pub fn main() { run_subcommand(flags).await }; - let exit_code = unwrap_or_exit(run_local(future)); + let exit_code = unwrap_or_exit(create_and_run_current_thread(future)); std::process::exit(exit_code); } diff --git a/cli/npm/resolvers/common.rs b/cli/npm/resolvers/common.rs index fc040a7ccb..c91b206cf1 100644 --- a/cli/npm/resolvers/common.rs +++ b/cli/npm/resolvers/common.rs @@ -9,6 +9,7 @@ use async_trait::async_trait; use deno_ast::ModuleSpecifier; use deno_core::error::AnyError; use deno_core::futures; +use deno_core::task::spawn; use deno_core::url::Url; use deno_npm::NpmPackageId; use deno_npm::NpmResolutionPackage; @@ -71,7 +72,7 @@ pub async fn cache_packages( assert_eq!(package.copy_index, 0); // the caller should not provide any of these let cache = cache.clone(); let registry_url = registry_url.clone(); - let handle = tokio::task::spawn(async move { + let handle = spawn(async move { cache .ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url) .await diff --git a/cli/npm/resolvers/local.rs b/cli/npm/resolvers/local.rs index cd1dc36715..b2ad083576 100644 --- a/cli/npm/resolvers/local.rs +++ b/cli/npm/resolvers/local.rs @@ -18,6 +18,8 @@ use deno_ast::ModuleSpecifier; use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::AnyError; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_core::url::Url; use deno_npm::resolution::NpmResolutionSnapshot; use deno_npm::NpmPackageCacheFolderId; @@ -27,7 +29,6 @@ use deno_runtime::deno_fs; use deno_runtime::deno_node::NodePermissions; use deno_runtime::deno_node::NodeResolutionMode; use deno_runtime::deno_node::PackageJson; -use tokio::task::JoinHandle; use crate::npm::cache::mixed_case_package_name_encode; use crate::npm::cache::should_sync_download; @@ -277,7 +278,7 @@ async fn sync_resolution_with_fs( let cache = cache.clone(); let registry_url = registry_url.clone(); let package = package.clone(); - let handle = tokio::task::spawn(async move { + let handle = spawn(async move { cache .ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url) .await?; diff --git a/cli/tests/integration/cert_tests.rs b/cli/tests/integration/cert_tests.rs index d3da6d75af..b04f2d35e8 100644 --- a/cli/tests/integration/cert_tests.rs +++ b/cli/tests/integration/cert_tests.rs @@ -11,7 +11,6 @@ use std::process::Command; use std::sync::Arc; use test_util as util; use test_util::TempDir; -use tokio::task::LocalSet; use util::TestContext; itest_flaky!(cafile_url_imports { @@ -219,113 +218,99 @@ fn cafile_bundle_remote_exports() { #[tokio::test] async fn listen_tls_alpn() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./cert/listen_tls_alpn.ts") - .arg("4504") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut msg = [0; 5]; - let read = stdout.read(&mut msg).unwrap(); - assert_eq!(read, 5); - assert_eq!(&msg, b"READY"); + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./cert/listen_tls_alpn.ts") + .arg("4504") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut msg = [0; 5]; + let read = stdout.read(&mut msg).unwrap(); + assert_eq!(read, 5); + assert_eq!(&msg, b"READY"); - let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( - "../testdata/tls/RootCA.crt" - ))); - let certs = rustls_pemfile::certs(&mut reader).unwrap(); - let mut root_store = rustls::RootCertStore::empty(); - root_store.add_parsable_certificates(&certs); - let mut cfg = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); - cfg.alpn_protocols.push(b"foobar".to_vec()); - let cfg = Arc::new(cfg); + let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( + "../testdata/tls/RootCA.crt" + ))); + let certs = rustls_pemfile::certs(&mut reader).unwrap(); + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_parsable_certificates(&certs); + let mut cfg = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + cfg.alpn_protocols.push(b"foobar".to_vec()); + let cfg = Arc::new(cfg); - let hostname = rustls::ServerName::try_from("localhost").unwrap(); + let hostname = rustls::ServerName::try_from("localhost").unwrap(); - let tcp_stream = tokio::net::TcpStream::connect("localhost:4504") - .await - .unwrap(); - let mut tls_stream = - TlsStream::new_client_side(tcp_stream, cfg, hostname); + let tcp_stream = tokio::net::TcpStream::connect("localhost:4504") + .await + .unwrap(); + let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname); - tls_stream.handshake().await.unwrap(); + tls_stream.handshake().await.unwrap(); - let (_, rustls_connection) = tls_stream.get_ref(); - let alpn = rustls_connection.alpn_protocol().unwrap(); - assert_eq!(alpn, b"foobar"); + let (_, rustls_connection) = tls_stream.get_ref(); + let alpn = rustls_connection.alpn_protocol().unwrap(); + assert_eq!(alpn, b"foobar"); - let status = child.wait().unwrap(); - assert!(status.success()); - }) - .await; + let status = child.wait().unwrap(); + assert!(status.success()); } #[tokio::test] async fn listen_tls_alpn_fail() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./cert/listen_tls_alpn_fail.ts") - .arg("4505") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut msg = [0; 5]; - let read = stdout.read(&mut msg).unwrap(); - assert_eq!(read, 5); - assert_eq!(&msg, b"READY"); + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./cert/listen_tls_alpn_fail.ts") + .arg("4505") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut msg = [0; 5]; + let read = stdout.read(&mut msg).unwrap(); + assert_eq!(read, 5); + assert_eq!(&msg, b"READY"); - let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( - "../testdata/tls/RootCA.crt" - ))); - let certs = rustls_pemfile::certs(&mut reader).unwrap(); - let mut root_store = rustls::RootCertStore::empty(); - root_store.add_parsable_certificates(&certs); - let mut cfg = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); - cfg.alpn_protocols.push(b"boofar".to_vec()); - let cfg = Arc::new(cfg); + let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( + "../testdata/tls/RootCA.crt" + ))); + let certs = rustls_pemfile::certs(&mut reader).unwrap(); + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_parsable_certificates(&certs); + let mut cfg = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + cfg.alpn_protocols.push(b"boofar".to_vec()); + let cfg = Arc::new(cfg); - let hostname = rustls::ServerName::try_from("localhost").unwrap(); + let hostname = rustls::ServerName::try_from("localhost").unwrap(); - let tcp_stream = tokio::net::TcpStream::connect("localhost:4505") - .await - .unwrap(); - let mut tls_stream = - TlsStream::new_client_side(tcp_stream, cfg, hostname); + let tcp_stream = tokio::net::TcpStream::connect("localhost:4505") + .await + .unwrap(); + let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname); - tls_stream.handshake().await.unwrap_err(); + tls_stream.handshake().await.unwrap_err(); - let (_, rustls_connection) = tls_stream.get_ref(); - assert!(rustls_connection.alpn_protocol().is_none()); + let (_, rustls_connection) = tls_stream.get_ref(); + assert!(rustls_connection.alpn_protocol().is_none()); - let status = child.wait().unwrap(); - assert!(status.success()); - }) - .await; + let status = child.wait().unwrap(); + assert!(status.success()); } diff --git a/cli/tests/integration/inspector_tests.rs b/cli/tests/integration/inspector_tests.rs index cf66c4adc1..8fa9ec85c0 100644 --- a/cli/tests/integration/inspector_tests.rs +++ b/cli/tests/integration/inspector_tests.rs @@ -29,7 +29,7 @@ where Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn(fut); + deno_core::task::spawn(fut); } } diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index e6ea85da45..bc717351a0 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -9,7 +9,6 @@ use std::process::Stdio; use std::time::Duration; use test_util as util; use test_util::TempDir; -use tokio::task::LocalSet; use trust_dns_client::serialize::txt::Lexer; use trust_dns_client::serialize::txt::Parser; use util::assert_contains; @@ -3886,50 +3885,44 @@ async fn test_resolve_dns() { #[tokio::test] async fn http2_request_url() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./run/http2_request_url.ts") - .arg("4506") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 5]; - let read = stdout.read(&mut buffer).unwrap(); - assert_eq!(read, 5); - let msg = std::str::from_utf8(&buffer).unwrap(); - assert_eq!(msg, "READY"); + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./run/http2_request_url.ts") + .arg("4506") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut buffer = [0; 5]; + let read = stdout.read(&mut buffer).unwrap(); + assert_eq!(read, 5); + let msg = std::str::from_utf8(&buffer).unwrap(); + assert_eq!(msg, "READY"); - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); + let cert = reqwest::Certificate::from_pem(include_bytes!( + "../testdata/tls/RootCA.crt" + )) + .unwrap(); - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); + let client = reqwest::Client::builder() + .add_root_certificate(cert) + .http2_prior_knowledge() + .build() + .unwrap(); - let res = client.get("http://127.0.0.1:4506").send().await.unwrap(); - assert_eq!(200, res.status()); + let res = client.get("http://127.0.0.1:4506").send().await.unwrap(); + assert_eq!(200, res.status()); - let body = res.text().await.unwrap(); - assert_eq!(body, "http://127.0.0.1:4506/"); + let body = res.text().await.unwrap(); + assert_eq!(body, "http://127.0.0.1:4506/"); - child.kill().unwrap(); - child.wait().unwrap(); - }) - .await; + child.kill().unwrap(); + child.wait().unwrap(); } #[cfg(not(windows))] @@ -4173,7 +4166,7 @@ where Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn(fut); + deno_core::task::spawn(fut); } } diff --git a/cli/tools/bench.rs b/cli/tools/bench.rs index 3d5f99aba4..107fd2b9b0 100644 --- a/cli/tools/bench.rs +++ b/cli/tools/bench.rs @@ -27,11 +27,13 @@ use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; use deno_core::serde_v8; +use deno_core::task::spawn; +use deno_core::task::spawn_blocking; use deno_core::v8; use deno_core::ModuleSpecifier; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; -use deno_runtime::tokio_util::run_local; +use deno_runtime::tokio_util::create_and_run_current_thread; use indexmap::IndexMap; use indexmap::IndexSet; use log::Level; @@ -436,7 +438,7 @@ async fn check_specifiers( /// Run a single specifier as an executable bench module. async fn bench_specifier( - worker_factory: &CliMainWorkerFactory, + worker_factory: Arc, permissions: Permissions, specifier: ModuleSpecifier, sender: UnboundedSender, @@ -522,15 +524,15 @@ async fn bench_specifiers( let specifier = specifier; let sender = sender.clone(); let options = option_for_handles.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let future = bench_specifier( - &worker_factory, + worker_factory, permissions, specifier, sender, options.filter, ); - run_local(future) + create_and_run_current_thread(future) }) }); @@ -539,7 +541,7 @@ async fn bench_specifiers( .collect::, tokio::task::JoinError>>>(); let handler = { - tokio::task::spawn(async move { + spawn(async move { let mut used_only = false; let mut report = BenchReport::new(); let mut reporter = diff --git a/cli/tools/fmt.rs b/cli/tools/fmt.rs index 70d2bd6395..f2fec93023 100644 --- a/cli/tools/fmt.rs +++ b/cli/tools/fmt.rs @@ -28,6 +28,7 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures; use deno_core::parking_lot::Mutex; +use deno_core::task::spawn_blocking; use log::debug; use log::info; use log::warn; @@ -629,7 +630,7 @@ where let handles = file_paths.iter().map(|file_path| { let f = f.clone(); let file_path = file_path.clone(); - tokio::task::spawn_blocking(move || f(file_path)) + spawn_blocking(move || f(file_path)) }); let join_results = futures::future::join_all(handles).await; diff --git a/cli/tools/repl/mod.rs b/cli/tools/repl/mod.rs index 9f4b589196..dfd9931b8d 100644 --- a/cli/tools/repl/mod.rs +++ b/cli/tools/repl/mod.rs @@ -8,6 +8,7 @@ use crate::factory::CliFactory; use crate::file_fetcher::FileFetcher; use deno_core::error::AnyError; use deno_core::futures::StreamExt; +use deno_core::task::spawn_blocking; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use rustyline::error::ReadlineError; @@ -32,7 +33,7 @@ async fn read_line_and_poll( editor: ReplEditor, ) -> Result { #![allow(clippy::await_holding_refcell_ref)] - let mut line_fut = tokio::task::spawn_blocking(move || editor.readline()); + let mut line_fut = spawn_blocking(move || editor.readline()); let mut poll_worker = true; let notifications_rc = repl_session.notifications.clone(); let mut notifications = notifications_rc.borrow_mut(); diff --git a/cli/tools/task.rs b/cli/tools/task.rs index bf972e2db8..37a1aa1c97 100644 --- a/cli/tools/task.rs +++ b/cli/tools/task.rs @@ -21,6 +21,7 @@ use indexmap::IndexMap; use std::collections::HashMap; use std::path::PathBuf; use std::rc::Rc; +use tokio::task::LocalSet; pub async fn execute_script( flags: Flags, @@ -59,9 +60,10 @@ pub async fn execute_script( let seq_list = deno_task_shell::parser::parse(&script) .with_context(|| format!("Error parsing script '{task_name}'."))?; let env_vars = collect_env_vars(); - let exit_code = - deno_task_shell::execute(seq_list, env_vars, &cwd, Default::default()) - .await; + let local = LocalSet::new(); + let future = + deno_task_shell::execute(seq_list, env_vars, &cwd, Default::default()); + let exit_code = local.run_until(future).await; Ok(exit_code) } else if let Some(script) = package_json_scripts.get(task_name) { let package_json_deps_provider = factory.package_json_deps_provider(); @@ -109,8 +111,10 @@ pub async fn execute_script( .with_context(|| format!("Error parsing script '{task_name}'."))?; let npx_commands = resolve_npm_commands(npm_resolver, node_resolver)?; let env_vars = collect_env_vars(); - let exit_code = - deno_task_shell::execute(seq_list, env_vars, &cwd, npx_commands).await; + let local = LocalSet::new(); + let future = + deno_task_shell::execute(seq_list, env_vars, &cwd, npx_commands); + let exit_code = local.run_until(future).await; Ok(exit_code) } else { eprintln!("Task not found: {task_name}"); diff --git a/cli/tools/test.rs b/cli/tools/test.rs index 50e220a466..f78e325394 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -34,6 +34,8 @@ use deno_core::futures::StreamExt; use deno_core::located_script_name; use deno_core::parking_lot::Mutex; use deno_core::serde_v8; +use deno_core::task::spawn; +use deno_core::task::spawn_blocking; use deno_core::url::Url; use deno_core::v8; use deno_core::ModuleSpecifier; @@ -42,7 +44,7 @@ use deno_runtime::deno_io::StdioPipe; use deno_runtime::fmt_errors::format_js_error; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; -use deno_runtime::tokio_util::run_local; +use deno_runtime::tokio_util::create_and_run_current_thread; use indexmap::IndexMap; use indexmap::IndexSet; use log::Level; @@ -916,12 +918,12 @@ pub fn format_test_error(js_error: &JsError) -> String { /// Test a single specifier as documentation containing test programs, an executable test module or /// both. pub async fn test_specifier( - worker_factory: &CliMainWorkerFactory, + worker_factory: Arc, permissions: Permissions, specifier: ModuleSpecifier, mut sender: TestEventSender, fail_fast_tracker: FailFastTracker, - options: &TestSpecifierOptions, + options: TestSpecifierOptions, ) -> Result<(), AnyError> { if fail_fast_tracker.should_stop() { return Ok(()); @@ -1316,7 +1318,7 @@ async fn test_specifiers( let concurrent_jobs = options.concurrent_jobs; let sender_ = sender.downgrade(); - let sigint_handler_handle = tokio::task::spawn(async move { + let sigint_handler_handle = spawn(async move { signal::ctrl_c().await.unwrap(); sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok()); }); @@ -1328,14 +1330,14 @@ async fn test_specifiers( let sender = sender.clone(); let fail_fast_tracker = FailFastTracker::new(options.fail_fast); let specifier_options = options.specifier.clone(); - tokio::task::spawn_blocking(move || { - run_local(test_specifier( - &worker_factory, + spawn_blocking(move || { + create_and_run_current_thread(test_specifier( + worker_factory, permissions, specifier, sender.clone(), fail_fast_tracker, - &specifier_options, + specifier_options, )) }) }); @@ -1350,7 +1352,7 @@ async fn test_specifiers( )); let handler = { - tokio::task::spawn(async move { + spawn(async move { let earlier = Instant::now(); let mut tests = IndexMap::new(); let mut test_steps = IndexMap::new(); @@ -1887,7 +1889,7 @@ pub async fn run_tests_with_watch( // run, a process-scoped basic exit handler is required due to a tokio // limitation where it doesn't unbind its own handler for the entire process // once a user adds one. - tokio::task::spawn(async move { + spawn(async move { loop { signal::ctrl_c().await.unwrap(); if !HAS_TEST_RUN_SIGINT_HANDLER.load(Ordering::Relaxed) { @@ -2070,7 +2072,7 @@ fn start_output_redirect_thread( sender: UnboundedSender, flush_state: Arc>>>, ) { - tokio::task::spawn_blocking(move || loop { + spawn_blocking(move || loop { let mut buffer = [0; 512]; let size = match pipe_reader.read(&mut buffer) { Ok(0) | Err(_) => break, diff --git a/cli/tools/upgrade.rs b/cli/tools/upgrade.rs index b5aefe4798..cbd924755b 100644 --- a/cli/tools/upgrade.rs +++ b/cli/tools/upgrade.rs @@ -17,6 +17,7 @@ use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::futures::future::BoxFuture; use deno_core::futures::FutureExt; +use deno_core::task::spawn; use deno_semver::Version; use once_cell::sync::Lazy; use std::borrow::Cow; @@ -198,7 +199,7 @@ pub fn check_for_upgrades( if update_checker.should_check_for_new_version() { let env = update_checker.env.clone(); // do this asynchronously on a separate task - tokio::spawn(async move { + spawn(async move { // Sleep for a small amount of time to not unnecessarily impact startup // time. tokio::time::sleep(UPGRADE_CHECK_FETCH_DELAY).await; diff --git a/cli/util/draw_thread.rs b/cli/util/draw_thread.rs index 028b20d00e..2fd81a78ab 100644 --- a/cli/util/draw_thread.rs +++ b/cli/util/draw_thread.rs @@ -2,6 +2,7 @@ use console_static_text::ConsoleStaticText; use deno_core::parking_lot::Mutex; +use deno_core::task::spawn_blocking; use deno_runtime::ops::tty::ConsoleSize; use once_cell::sync::Lazy; use std::sync::Arc; @@ -162,7 +163,7 @@ impl DrawThread { internal_state.has_draw_thread = true; let drawer_id = internal_state.drawer_id; - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let mut previous_size = console_size(); loop { let mut delay_ms = 120; diff --git a/cli/util/fs.rs b/cli/util/fs.rs index 94ec24fe6b..658002e3b6 100644 --- a/cli/util/fs.rs +++ b/cli/util/fs.rs @@ -3,6 +3,7 @@ use deno_core::anyhow::Context; use deno_core::error::AnyError; pub use deno_core::normalize_path; +use deno_core::task::spawn_blocking; use deno_core::ModuleSpecifier; use deno_runtime::deno_crypto::rand; use deno_runtime::deno_node::PathClean; @@ -503,7 +504,7 @@ impl LaxSingleProcessFsFlag { // This uses a blocking task because we use a single threaded // runtime and this is time sensitive so we don't want it to update // at the whims of of whatever is occurring on the runtime thread. - tokio::task::spawn_blocking({ + spawn_blocking({ let token = token.clone(); let last_updated_path = last_updated_path.clone(); move || { diff --git a/core/Cargo.toml b/core/Cargo.toml index c0854fdb66..2abf5b10e9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,7 @@ serde_json = { workspace = true, features = ["preserve_order"] } serde_v8.workspace = true smallvec.workspace = true sourcemap = "6.1" +tokio.workspace = true url.workspace = true v8.workspace = true @@ -46,4 +47,3 @@ path = "examples/http_bench_json_ops/main.rs" # These dependencies are only used for the 'http_bench_*_ops' examples. [dev-dependencies] deno_ast.workspace = true -tokio.workspace = true diff --git a/core/lib.rs b/core/lib.rs index 1b2841a661..58140bb227 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -23,6 +23,7 @@ mod resources; mod runtime; pub mod snapshot_util; mod source_map; +pub mod task; mod task_queue; // Re-exports diff --git a/core/task.rs b/core/task.rs new file mode 100644 index 0000000000..46a4c8c261 --- /dev/null +++ b/core/task.rs @@ -0,0 +1,131 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; +use futures::Future; +use std::marker::PhantomData; +use tokio::runtime::Handle; +use tokio::runtime::RuntimeFlavor; + +/// Equivalent to [`tokio::task::JoinHandle`]. +#[repr(transparent)] +pub struct JoinHandle { + handle: tokio::task::JoinHandle>, + _r: PhantomData, +} + +impl JoinHandle { + /// Equivalent to [`tokio::task::JoinHandle::abort`]. + pub fn abort(&self) { + self.handle.abort() + } +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // SAFETY: We are sure that handle is valid here + unsafe { + let me: &mut Self = Pin::into_inner_unchecked(self); + let handle = Pin::new_unchecked(&mut me.handle); + match handle.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(r)) => Poll::Ready(Ok(r.into_inner())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } + } +} + +/// Equivalent to [`tokio::task::spawn`], but does not require the future to be [`Send`]. Must only be +/// used on a [`RuntimeFlavor::CurrentThread`] executor, though this is only checked when running with +/// debug assertions. +pub fn spawn + 'static, R: 'static>( + f: F, +) -> JoinHandle { + debug_assert!( + Handle::current().runtime_flavor() == RuntimeFlavor::CurrentThread + ); + // SAFETY: we know this is a current-thread executor + let future = unsafe { MaskFutureAsSend::new(f) }; + JoinHandle { + handle: tokio::task::spawn(future), + _r: Default::default(), + } +} + +/// Equivalent to [`tokio::task::spawn_blocking`]. Currently a thin wrapper around the tokio API, but this +/// may change in the future. +pub fn spawn_blocking< + F: (FnOnce() -> R) + Send + 'static, + R: Send + 'static, +>( + f: F, +) -> JoinHandle { + let handle = tokio::task::spawn_blocking(|| MaskResultAsSend { result: f() }); + JoinHandle { + handle, + _r: Default::default(), + } +} + +#[repr(transparent)] +#[doc(hidden)] +pub struct MaskResultAsSend { + result: R, +} + +/// SAFETY: We ensure that Send bounds are only faked when tokio is running on a current-thread executor +unsafe impl Send for MaskResultAsSend {} + +impl MaskResultAsSend { + #[inline(always)] + pub fn into_inner(self) -> R { + self.result + } +} + +pub struct MaskFutureAsSend { + future: F, +} + +impl MaskFutureAsSend { + /// Mark a non-`Send` future as `Send`. This is a trick to be able to use + /// `tokio::spawn()` (which requires `Send` futures) in a current thread + /// runtime. + /// + /// # Safety + /// + /// You must ensure that the future is actually used on the same + /// thread, ie. always use current thread runtime flavor from Tokio. + pub unsafe fn new(future: F) -> Self { + Self { future } + } +} + +// SAFETY: we are cheating here - this struct is NOT really Send, +// but we need to mark it Send so that we can use `spawn()` in Tokio. +unsafe impl Send for MaskFutureAsSend {} + +impl Future for MaskFutureAsSend { + type Output = MaskResultAsSend; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // SAFETY: We are sure that future is valid here + unsafe { + let me: &mut MaskFutureAsSend = Pin::into_inner_unchecked(self); + let future = Pin::new_unchecked(&mut me.future); + match future.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => Poll::Ready(MaskResultAsSend { result }), + } + } + } +} diff --git a/core/task_queue.rs b/core/task_queue.rs index 36a169650c..adb25a4f62 100644 --- a/core/task_queue.rs +++ b/core/task_queue.rs @@ -127,7 +127,7 @@ mod tests { for i in 0..100 { let data = data.clone(); tasks.push(task_queue.queue(async move { - tokio::task::spawn_blocking(move || { + crate::task::spawn_blocking(move || { let mut data = data.lock(); if *data != i { panic!("Value was not equal."); diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs index 2853f793db..4eb9924c7a 100644 --- a/ext/cache/sqlite.rs +++ b/ext/cache/sqlite.rs @@ -10,6 +10,7 @@ use std::time::UNIX_EPOCH; use async_trait::async_trait; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; +use deno_core::task::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; @@ -99,7 +100,7 @@ impl Cache for SqliteBackedCache { async fn storage_open(&self, cache_name: String) -> Result { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let db = db.lock(); db.execute( "INSERT OR IGNORE INTO cache_storage (cache_name) VALUES (?1)", @@ -124,7 +125,7 @@ impl Cache for SqliteBackedCache { /// Note: this doesn't check the disk, it only checks the sqlite db. async fn storage_has(&self, cache_name: String) -> Result { let db = self.connection.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let db = db.lock(); let cache_exists = db.query_row( "SELECT count(id) FROM cache_storage WHERE cache_name = ?1", @@ -143,7 +144,7 @@ impl Cache for SqliteBackedCache { async fn storage_delete(&self, cache_name: String) -> Result { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let db = db.lock(); let maybe_cache_id = db .query_row( @@ -210,7 +211,7 @@ impl Cache for SqliteBackedCache { > { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); - let query_result = tokio::task::spawn_blocking(move || { + let query_result = spawn_blocking(move || { let db = db.lock(); let result = db.query_row( "SELECT response_body_key, response_headers, response_status, response_status_text, request_headers @@ -269,7 +270,7 @@ impl Cache for SqliteBackedCache { request: CacheDeleteRequest, ) -> Result { let db = self.connection.clone(); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { // TODO(@satyarohith): remove the response body from disk if one exists let db = db.lock(); let rows_effected = db.execute( @@ -287,7 +288,7 @@ async fn insert_cache_asset( put: CachePutRequest, response_body_key: Option, ) -> Result, deno_core::anyhow::Error> { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let maybe_response_body = { let db = db.lock(); db.query_row( diff --git a/ext/crypto/decrypt.rs b/ext/crypto/decrypt.rs index 6c4d5b6ba5..fc54fe8182 100644 --- a/ext/crypto/decrypt.rs +++ b/ext/crypto/decrypt.rs @@ -20,6 +20,7 @@ use deno_core::error::custom_error; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; +use deno_core::task::spawn_blocking; use deno_core::ZeroCopyBuf; use rsa::pkcs1::DecodeRsaPrivateKey; use rsa::PaddingScheme; @@ -98,7 +99,7 @@ pub async fn op_crypto_decrypt( tag_length, } => decrypt_aes_gcm(key, length, tag_length, iv, additional_data, &data), }; - let buf = tokio::task::spawn_blocking(fun).await.unwrap()?; + let buf = spawn_blocking(fun).await.unwrap()?; Ok(buf.into()) } diff --git a/ext/crypto/encrypt.rs b/ext/crypto/encrypt.rs index f34e0cbc6b..2831ca0f4a 100644 --- a/ext/crypto/encrypt.rs +++ b/ext/crypto/encrypt.rs @@ -19,6 +19,7 @@ use ctr::Ctr64BE; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; +use deno_core::task::spawn_blocking; use deno_core::ZeroCopyBuf; use rand::rngs::OsRng; use rsa::pkcs1::DecodeRsaPublicKey; @@ -99,7 +100,7 @@ pub async fn op_crypto_encrypt( key_length, } => encrypt_aes_ctr(key, key_length, &counter, ctr_length, &data), }; - let buf = tokio::task::spawn_blocking(fun).await.unwrap()?; + let buf = spawn_blocking(fun).await.unwrap()?; Ok(buf.into()) } diff --git a/ext/crypto/generate_key.rs b/ext/crypto/generate_key.rs index 2a9452c433..426c61376e 100644 --- a/ext/crypto/generate_key.rs +++ b/ext/crypto/generate_key.rs @@ -2,6 +2,7 @@ use deno_core::error::AnyError; use deno_core::op; +use deno_core::task::spawn_blocking; use deno_core::ZeroCopyBuf; use elliptic_curve::rand_core::OsRng; use num_traits::FromPrimitive; @@ -56,7 +57,7 @@ pub async fn op_crypto_generate_key( generate_key_hmac(hash, length) } }; - let buf = tokio::task::spawn_blocking(fun).await.unwrap()?; + let buf = spawn_blocking(fun).await.unwrap()?; Ok(buf.into()) } diff --git a/ext/crypto/lib.rs b/ext/crypto/lib.rs index f481f97f6b..05349bf680 100644 --- a/ext/crypto/lib.rs +++ b/ext/crypto/lib.rs @@ -10,6 +10,7 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; +use deno_core::task::spawn_blocking; use deno_core::OpState; use deno_core::ZeroCopyBuf; use serde::Deserialize; @@ -601,7 +602,7 @@ pub async fn op_crypto_subtle_digest( algorithm: CryptoHash, data: ZeroCopyBuf, ) -> Result { - let output = tokio::task::spawn_blocking(move || { + let output = spawn_blocking(move || { digest::digest(algorithm.into(), &data) .as_ref() .to_vec() diff --git a/ext/ffi/call.rs b/ext/ffi/call.rs index 98186936cf..21358d851e 100644 --- a/ext/ffi/call.rs +++ b/ext/ffi/call.rs @@ -15,6 +15,7 @@ use deno_core::op; use deno_core::serde_json::Value; use deno_core::serde_v8; use deno_core::serde_v8::ExternalPointer; +use deno_core::task::spawn_blocking; use deno_core::v8; use deno_core::OpState; use deno_core::ResourceId; @@ -298,7 +299,7 @@ where .map(|v| v8::Local::::try_from(v.v8_value).unwrap()); let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer); - let join_handle = tokio::task::spawn_blocking(move || { + let join_handle = spawn_blocking(move || { let PtrSymbol { cif, ptr } = symbol.clone(); ffi_call( call_args, @@ -345,7 +346,7 @@ pub fn op_ffi_call_nonblocking<'scope>( .map(|v| v8::Local::::try_from(v.v8_value).unwrap()); let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer); - let join_handle = tokio::task::spawn_blocking(move || { + let join_handle = spawn_blocking(move || { let Symbol { cif, ptr, diff --git a/ext/fs/std_fs.rs b/ext/fs/std_fs.rs index 6ac935bbd2..9baf74a2a4 100644 --- a/ext/fs/std_fs.rs +++ b/ext/fs/std_fs.rs @@ -9,6 +9,7 @@ use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use deno_core::task::spawn_blocking; use deno_io::fs::File; use deno_io::fs::FsResult; use deno_io::fs::FsStat; @@ -86,8 +87,7 @@ impl FileSystem for RealFs { options: OpenOptions, ) -> FsResult> { let opts = open_options(options); - let std_file = - tokio::task::spawn_blocking(move || opts.open(path)).await??; + let std_file = spawn_blocking(move || opts.open(path)).await??; Ok(Rc::new(StdFileResourceInner::file(std_file))) } @@ -105,14 +105,14 @@ impl FileSystem for RealFs { recursive: bool, mode: u32, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || mkdir(&path, recursive, mode)).await? + spawn_blocking(move || mkdir(&path, recursive, mode)).await? } fn chmod_sync(&self, path: &Path, mode: u32) -> FsResult<()> { chmod(path, mode) } async fn chmod_async(&self, path: PathBuf, mode: u32) -> FsResult<()> { - tokio::task::spawn_blocking(move || chmod(&path, mode)).await? + spawn_blocking(move || chmod(&path, mode)).await? } fn chown_sync( @@ -129,53 +129,49 @@ impl FileSystem for RealFs { uid: Option, gid: Option, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || chown(&path, uid, gid)).await? + spawn_blocking(move || chown(&path, uid, gid)).await? } fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> { remove(path, recursive) } async fn remove_async(&self, path: PathBuf, recursive: bool) -> FsResult<()> { - tokio::task::spawn_blocking(move || remove(&path, recursive)).await? + spawn_blocking(move || remove(&path, recursive)).await? } fn copy_file_sync(&self, from: &Path, to: &Path) -> FsResult<()> { copy_file(from, to) } async fn copy_file_async(&self, from: PathBuf, to: PathBuf) -> FsResult<()> { - tokio::task::spawn_blocking(move || copy_file(&from, &to)).await? + spawn_blocking(move || copy_file(&from, &to)).await? } fn stat_sync(&self, path: &Path) -> FsResult { stat(path).map(Into::into) } async fn stat_async(&self, path: PathBuf) -> FsResult { - tokio::task::spawn_blocking(move || stat(&path)) - .await? - .map(Into::into) + spawn_blocking(move || stat(&path)).await?.map(Into::into) } fn lstat_sync(&self, path: &Path) -> FsResult { lstat(path).map(Into::into) } async fn lstat_async(&self, path: PathBuf) -> FsResult { - tokio::task::spawn_blocking(move || lstat(&path)) - .await? - .map(Into::into) + spawn_blocking(move || lstat(&path)).await?.map(Into::into) } fn realpath_sync(&self, path: &Path) -> FsResult { realpath(path) } async fn realpath_async(&self, path: PathBuf) -> FsResult { - tokio::task::spawn_blocking(move || realpath(&path)).await? + spawn_blocking(move || realpath(&path)).await? } fn read_dir_sync(&self, path: &Path) -> FsResult> { read_dir(path) } async fn read_dir_async(&self, path: PathBuf) -> FsResult> { - tokio::task::spawn_blocking(move || read_dir(&path)).await? + spawn_blocking(move || read_dir(&path)).await? } fn rename_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { @@ -186,7 +182,7 @@ impl FileSystem for RealFs { oldpath: PathBuf, newpath: PathBuf, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || fs::rename(oldpath, newpath)) + spawn_blocking(move || fs::rename(oldpath, newpath)) .await? .map_err(Into::into) } @@ -199,7 +195,7 @@ impl FileSystem for RealFs { oldpath: PathBuf, newpath: PathBuf, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || fs::hard_link(oldpath, newpath)) + spawn_blocking(move || fs::hard_link(oldpath, newpath)) .await? .map_err(Into::into) } @@ -218,15 +214,14 @@ impl FileSystem for RealFs { newpath: PathBuf, file_type: Option, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || symlink(&oldpath, &newpath, file_type)) - .await? + spawn_blocking(move || symlink(&oldpath, &newpath, file_type)).await? } fn read_link_sync(&self, path: &Path) -> FsResult { fs::read_link(path).map_err(Into::into) } async fn read_link_async(&self, path: PathBuf) -> FsResult { - tokio::task::spawn_blocking(move || fs::read_link(path)) + spawn_blocking(move || fs::read_link(path)) .await? .map_err(Into::into) } @@ -235,7 +230,7 @@ impl FileSystem for RealFs { truncate(path, len) } async fn truncate_async(&self, path: PathBuf, len: u64) -> FsResult<()> { - tokio::task::spawn_blocking(move || truncate(&path, len)).await? + spawn_blocking(move || truncate(&path, len)).await? } fn utime_sync( @@ -260,7 +255,7 @@ impl FileSystem for RealFs { ) -> FsResult<()> { let atime = filetime::FileTime::from_unix_time(atime_secs, atime_nanos); let mtime = filetime::FileTime::from_unix_time(mtime_secs, mtime_nanos); - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { filetime::set_file_times(path, atime, mtime).map_err(Into::into) }) .await? @@ -289,7 +284,7 @@ impl FileSystem for RealFs { options: OpenOptions, data: Vec, ) -> FsResult<()> { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let opts = open_options(options); let mut file = opts.open(path)?; #[cfg(unix)] @@ -307,7 +302,7 @@ impl FileSystem for RealFs { fs::read(path).map_err(Into::into) } async fn read_file_async(&self, path: PathBuf) -> FsResult> { - tokio::task::spawn_blocking(move || fs::read(path)) + spawn_blocking(move || fs::read(path)) .await? .map_err(Into::into) } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 12db29b1b7..8b2f91be06 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -17,6 +17,8 @@ use cache_control::CacheControl; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; @@ -68,9 +70,6 @@ use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; -use tokio::task::JoinHandle; - type Request = hyper1::Request; type Response = hyper1::Response; @@ -262,7 +261,7 @@ pub fn op_http_upgrade_raw( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - spawn_local(async move { + spawn(async move { let mut upgrade_stream = WebSocketUpgrade::::default(); // Stage 2: Extract the Upgraded connection @@ -285,7 +284,7 @@ pub fn op_http_upgrade_raw( // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = upgraded_rx.read(&mut buf).await?; @@ -296,7 +295,7 @@ pub fn op_http_upgrade_raw( } Ok::<_, AnyError>(()) }); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = write_rx.read(&mut buf).await?; @@ -792,11 +791,10 @@ fn serve_https( cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local( + spawn( async { io.handshake().await?; // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect @@ -820,11 +818,10 @@ fn serve_http( cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) + spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) } fn serve_http_on( @@ -916,7 +913,7 @@ where let cancel_clone = resource.cancel_handle(); let listen_properties_clone: HttpListenProperties = listen_properties.clone(); - let handle = spawn_local(async move { + let handle = spawn(async move { loop { let conn = listener .accept() diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 21d3dc6519..7a1a93f805 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -20,6 +20,7 @@ use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; @@ -68,7 +69,6 @@ use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; use crate::network_buffered_stream::NetworkBufferedStream; use crate::reader_stream::ExternallyAbortableReaderStream; @@ -184,7 +184,7 @@ impl HttpConnResource { }; let (task_fut, closed_fut) = task_fut.remote_handle(); let closed_fut = closed_fut.shared(); - spawn_local(task_fut); + spawn(task_fut); Self { addr, @@ -1005,7 +1005,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } @@ -1015,7 +1015,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } diff --git a/ext/io/lib.rs b/ext/io/lib.rs index 49e4ab714f..6dec7c3a7f 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -2,6 +2,7 @@ use deno_core::error::AnyError; use deno_core::op; +use deno_core::task::spawn_blocking; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -350,7 +351,7 @@ impl StdFileResourceInner { } } }; - let (cell_value, result) = tokio::task::spawn_blocking(move || { + let (cell_value, result) = spawn_blocking(move || { let result = action(&mut cell_value); (cell_value, result) }) @@ -372,7 +373,7 @@ impl StdFileResourceInner { // we want to restrict this to one async action at a time let _permit = self.cell_async_task_queue.acquire().await; - tokio::task::spawn_blocking(action).await.unwrap() + spawn_blocking(action).await.unwrap() } } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index b9b37b3282..7f451d0a84 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -26,6 +26,7 @@ use deno_core::futures::task::Waker; use deno_core::op; use deno_core::parking_lot::Mutex; +use deno_core::task::spawn; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; @@ -74,7 +75,6 @@ use tokio::io::AsyncWriteExt; use tokio::io::ReadBuf; use tokio::net::TcpListener; use tokio::net::TcpStream; -use tokio::task::spawn_local; #[derive(Copy, Clone, Debug, Eq, PartialEq)] enum Flow { @@ -224,9 +224,9 @@ impl Drop for TlsStream { let use_linger_task = inner.poll_close(&mut cx).is_pending(); if use_linger_task { - spawn_local(poll_fn(move |cx| inner.poll_close(cx))); + spawn(poll_fn(move |cx| inner.poll_close(cx))); } else if cfg!(debug_assertions) { - spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. + spawn(async {}); // Spawn dummy task to detect missing runtime. } } } diff --git a/ext/node/ops/crypto/mod.rs b/ext/node/ops/crypto/mod.rs index 9e1a3da989..0f8feb2a92 100644 --- a/ext/node/ops/crypto/mod.rs +++ b/ext/node/ops/crypto/mod.rs @@ -4,6 +4,7 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; use deno_core::serde_v8; +use deno_core::task::spawn_blocking; use deno_core::OpState; use deno_core::ResourceId; use deno_core::StringOrBuffer; @@ -57,12 +58,7 @@ pub async fn op_node_check_prime_async( checks: usize, ) -> Result { // TODO(@littledivy): use rayon for CPU-bound tasks - Ok( - tokio::task::spawn_blocking(move || { - primes::is_probably_prime(&num, checks) - }) - .await?, - ) + Ok(spawn_blocking(move || primes::is_probably_prime(&num, checks)).await?) } #[op] @@ -74,10 +70,8 @@ pub fn op_node_check_prime_bytes_async( // TODO(@littledivy): use rayon for CPU-bound tasks Ok(async move { Ok( - tokio::task::spawn_blocking(move || { - primes::is_probably_prime(&candidate, checks) - }) - .await?, + spawn_blocking(move || primes::is_probably_prime(&candidate, checks)) + .await?, ) }) } @@ -462,7 +456,7 @@ pub async fn op_node_pbkdf2_async( digest: String, keylen: usize, ) -> Result { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let mut derived_key = vec![0; keylen]; pbkdf2_sync(&password, &salt, iterations, &digest, &mut derived_key) .map(|_| derived_key.into()) @@ -477,7 +471,7 @@ pub fn op_node_generate_secret(buf: &mut [u8]) { #[op] pub async fn op_node_generate_secret_async(len: i32) -> ZeroCopyBuf { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let mut buf = vec![0u8; len as usize]; rand::thread_rng().fill(&mut buf[..]); buf.into() @@ -535,7 +529,7 @@ pub async fn op_node_hkdf_async( info: ZeroCopyBuf, okm_len: usize, ) -> Result { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let mut okm = vec![0u8; okm_len]; hkdf_sync(&hash, &ikm, &salt, &info, &mut okm)?; Ok(okm.into()) @@ -578,10 +572,7 @@ pub async fn op_node_generate_rsa_async( modulus_length: usize, public_exponent: usize, ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(move || { - generate_rsa(modulus_length, public_exponent) - }) - .await? + spawn_blocking(move || generate_rsa(modulus_length, public_exponent)).await? } fn dsa_generate( @@ -635,10 +626,7 @@ pub async fn op_node_dsa_generate_async( modulus_length: usize, divisor_length: usize, ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(move || { - dsa_generate(modulus_length, divisor_length) - }) - .await? + spawn_blocking(move || dsa_generate(modulus_length, divisor_length)).await? } fn ec_generate( @@ -677,7 +665,7 @@ pub fn op_node_ec_generate( pub async fn op_node_ec_generate_async( named_curve: String, ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(move || ec_generate(&named_curve)).await? + spawn_blocking(move || ec_generate(&named_curve)).await? } fn ed25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { @@ -704,7 +692,7 @@ pub fn op_node_ed25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError #[op] pub async fn op_node_ed25519_generate_async( ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(ed25519_generate).await? + spawn_blocking(ed25519_generate).await? } fn x25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { @@ -739,7 +727,7 @@ pub fn op_node_x25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> #[op] pub async fn op_node_x25519_generate_async( ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(x25519_generate).await? + spawn_blocking(x25519_generate).await? } fn dh_generate_group( @@ -772,7 +760,7 @@ pub fn op_node_dh_generate_group( pub async fn op_node_dh_generate_group_async( group_name: String, ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(move || dh_generate_group(&group_name)).await? + spawn_blocking(move || dh_generate_group(&group_name)).await? } fn dh_generate( @@ -806,10 +794,8 @@ pub async fn op_node_dh_generate_async( prime_len: usize, generator: usize, ) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> { - tokio::task::spawn_blocking(move || { - dh_generate(prime.as_deref(), prime_len, generator) - }) - .await? + spawn_blocking(move || dh_generate(prime.as_deref(), prime_len, generator)) + .await? } #[op] @@ -885,7 +871,7 @@ pub async fn op_node_scrypt_async( parallelization: u32, maxmem: u32, ) -> Result { - tokio::task::spawn_blocking(move || { + spawn_blocking(move || { let mut output_buffer = vec![0u8; keylen as usize]; let res = scrypt( password, @@ -1081,5 +1067,5 @@ pub fn op_node_gen_prime(size: usize) -> ZeroCopyBuf { pub async fn op_node_gen_prime_async( size: usize, ) -> Result { - Ok(tokio::task::spawn_blocking(move || gen_prime(size)).await?) + Ok(spawn_blocking(move || gen_prime(size)).await?) } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index e03a13789f..a002b774ce 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -591,6 +591,6 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); + deno_core::task::spawn(fut); } } diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index 25d0d796c1..1a67068964 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -15,6 +15,7 @@ use deno_core::futures::task::Poll; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::task::spawn; use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; @@ -109,7 +110,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); + deno_core::task::spawn(fut); } } @@ -160,7 +161,7 @@ fn handle_ws_request( // spawn a task that will wait for websocket connection and then pump messages between // the socket and inspector proxy - tokio::task::spawn_local(async move { + spawn(async move { let websocket = if let Ok(w) = fut.await { w } else { diff --git a/runtime/tokio_util.rs b/runtime/tokio_util.rs index a4db5e33fa..ce6ef305f0 100644 --- a/runtime/tokio_util.rs +++ b/runtime/tokio_util.rs @@ -1,5 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::task::MaskFutureAsSend; + pub fn create_basic_runtime() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .enable_io() @@ -14,11 +16,14 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime { .unwrap() } -pub fn run_local(future: F) -> R +pub fn create_and_run_current_thread(future: F) -> R where - F: std::future::Future, + F: std::future::Future + 'static, + R: Send + 'static, { let rt = create_basic_runtime(); - let local = tokio::task::LocalSet::new(); - local.block_on(&rt, future) + // SAFETY: this this is guaranteed to be running on a current-thread executor + let future = unsafe { MaskFutureAsSend::new(future) }; + let join_handle = rt.spawn(future); + rt.block_on(join_handle).unwrap().into_inner() } diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index d8c881ab7c..01262abcf2 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -3,7 +3,7 @@ use crate::colors; use crate::inspector_server::InspectorServer; use crate::ops; use crate::permissions::PermissionsContainer; -use crate::tokio_util::run_local; +use crate::tokio_util::create_and_run_current_thread; use crate::worker::FormatJsErrorFn; use crate::BootstrapOptions; use deno_broadcast_channel::InMemoryBroadcastChannel; @@ -838,5 +838,5 @@ pub fn run_web_worker( debug!("Worker thread shuts down {}", &name); result }; - run_local(fut) + create_and_run_current_thread(fut) } diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index 7cff6b6eb4..4fc0067e70 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -3729,7 +3729,9 @@ ], "request-referrer.any.html": false, "request-referrer.any.worker.html": false, - "response-null-body.any.html": true + "response-null-body.any.html": { + "ignore": true + } }, "response": { "json.any.html": true,