From a4f45f709278208cb61501df2792412f11aed3c4 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 11 Dec 2023 20:10:33 -0700 Subject: [PATCH] perf(ext/ffi): switch from middleware to tasks (#21239) Deno-side changes for https://github.com/denoland/deno_core/pull/350 --------- Co-authored-by: Aapo Alasuutari --- cli/tests/unit/ffi_test.ts | 39 ++++++- ext/ffi/call.rs | 2 +- ext/ffi/callback.rs | 119 ++++++++++++---------- ext/ffi/lib.rs | 41 -------- test_ffi/src/lib.rs | 14 +++ test_ffi/tests/ffi_callback_errors.ts | 141 ++++++++++++++++++++++++++ test_ffi/tests/integration_tests.rs | 41 ++++++++ 7 files changed, 300 insertions(+), 97 deletions(-) create mode 100644 test_ffi/tests/ffi_callback_errors.ts diff --git a/cli/tests/unit/ffi_test.ts b/cli/tests/unit/ffi_test.ts index 018cec6746..89133b9b25 100644 --- a/cli/tests/unit/ffi_test.ts +++ b/cli/tests/unit/ffi_test.ts @@ -1,6 +1,6 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -import { assertEquals, assertThrows } from "./test_util.ts"; +import { assertEquals, assertRejects, assertThrows } from "./test_util.ts"; Deno.test({ permissions: { ffi: true } }, function dlopenInvalidArguments() { const filename = "/usr/lib/libc.so.6"; @@ -98,3 +98,40 @@ Deno.test({ permissions: { ffi: true } }, function pointerOf() { ); assertEquals(Number(baseAddress) + 80, float64AddressOffset); }); + +Deno.test({ permissions: { ffi: true } }, function callWithError() { + const throwCb = () => { + throw new Error("Error"); + }; + const cb = new Deno.UnsafeCallback({ + parameters: [], + result: "void", + }, throwCb); + const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, { + parameters: [], + result: "void", + }); + assertThrows(() => fnPointer.call()); + cb.close(); +}); + +Deno.test( + { permissions: { ffi: true }, ignore: true }, + async function callNonBlockingWithError() { + const throwCb = () => { + throw new Error("Error"); + }; + const cb = new Deno.UnsafeCallback({ + parameters: [], + result: "void", + }, throwCb); + const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, { + parameters: [], + result: "void", + nonblocking: true, + }); + // TODO(mmastrac): currently ignored as we do not thread callback exceptions through nonblocking pointers + await assertRejects(async () => await fnPointer.call()); + cb.close(); + }, +); diff --git a/ext/ffi/call.rs b/ext/ffi/call.rs index ea78751083..d0162d7d55 100644 --- a/ext/ffi/call.rs +++ b/ext/ffi/call.rs @@ -368,7 +368,7 @@ pub fn op_ffi_call_nonblocking( }) } -#[op2] +#[op2(reentrant)] #[serde] pub fn op_ffi_call_ptr( scope: &mut v8::HandleScope, diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 4974c7f9fb..ea18189d85 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -3,21 +3,19 @@ use crate::check_unstable; use crate::symbol::NativeType; use crate::FfiPermissions; -use crate::FfiState; use crate::ForeignFunction; -use crate::PendingFfiAsyncWork; use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::task::AtomicWaker; use deno_core::op2; use deno_core::v8; +use deno_core::v8::TryCatch; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::V8CrossThreadTaskSpawner; use libffi::middle::Cif; use serde::Deserialize; use std::borrow::Cow; @@ -31,8 +29,6 @@ use std::ptr::NonNull; use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; -use std::sync::mpsc::sync_channel; -use std::sync::Arc; use std::task::Poll; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -93,13 +89,12 @@ impl Resource for UnsafeCallbackResource { } struct CallbackInfo { - pub async_work_sender: mpsc::UnboundedSender, + pub async_work_sender: V8CrossThreadTaskSpawner, pub callback: NonNull, pub context: NonNull, pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Arc, } impl Future for CallbackInfo { @@ -113,6 +108,32 @@ impl Future for CallbackInfo { } } +struct TaskArgs { + cif: NonNull, + result: NonNull, + args: *const *const c_void, + info: NonNull, +} + +// SAFETY: we know these are valid Send-safe pointers as they are for FFI +unsafe impl Send for TaskArgs {} + +impl TaskArgs { + fn run(&mut self, scope: &mut v8::HandleScope) { + // SAFETY: making a call using Send-safe pointers turned back into references. We know the + // lifetime of these will last because we block on the result of the spawn call. + unsafe { + do_ffi_callback( + scope, + self.cif.as_ref(), + self.info.as_ref(), + self.result.as_mut(), + self.args, + ) + } + } +} + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -121,51 +142,55 @@ unsafe extern "C" fn deno_ffi_callback( ) { LOCAL_THREAD_ID.with(|s| { if *s.borrow() == info.thread_id { - // Own isolate thread, okay to call directly - do_ffi_callback(cif, info, result, args); + // Call from main thread. If this callback is being triggered due to a + // function call coming from Deno itself, then this callback will build + // ontop of that stack. + // If this callback is being triggered outside of Deno (for example from a + // signal handler) then this will either create an empty new stack if + // Deno currently has nothing running and is waiting for promises to resolve, + // or will (very incorrectly) build ontop of whatever stack exists. + // The callback will even be called through from a `while (true)` liveloop, but + // it somehow cannot change the values that the loop sees, even if they both + // refer the same `let bool_value`. + let context: NonNull = info.context; + let context = std::mem::transmute::< + NonNull, + v8::Local, + >(context); + let mut cb_scope = v8::CallbackScope::new(context); + let scope = &mut v8::HandleScope::new(&mut cb_scope); + + do_ffi_callback(scope, cif, info, result, args); } else { let async_work_sender = &info.async_work_sender; - // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received. - let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif); - let result: &'static mut c_void = std::mem::transmute(result); - let info: &'static CallbackInfo = std::mem::transmute(info); - let (response_sender, response_receiver) = sync_channel::<()>(0); - let fut = Box::new(move || { - do_ffi_callback(cif, info, result, args); - response_sender.send(()).unwrap(); + + let mut args = TaskArgs { + cif: NonNull::from(cif), + result: NonNull::from(result), + args, + info: NonNull::from(info), + }; + + async_work_sender.spawn_blocking(move |scope| { + // We don't have a lot of choice here, so just print an unhandled exception message + let tc_scope = &mut TryCatch::new(scope); + args.run(tc_scope); + if tc_scope.exception().is_some() { + eprintln!("Illegal unhandled exception in nonblocking callback."); + } }); - async_work_sender.unbounded_send(fut).unwrap(); - // Make sure event loop wakes up to receive our message before we start waiting for a response. - info.waker.wake(); - response_receiver.recv().unwrap(); } }); } unsafe fn do_ffi_callback( + scope: &mut v8::HandleScope, cif: &libffi::low::ffi_cif, info: &CallbackInfo, result: &mut c_void, args: *const *const c_void, ) { let callback: NonNull = info.callback; - let context: NonNull = info.context; - let context = std::mem::transmute::< - NonNull, - v8::Local, - >(context); - // Call from main thread. If this callback is being triggered due to a - // function call coming from Deno itself, then this callback will build - // ontop of that stack. - // If this callback is being triggered outside of Deno (for example from a - // signal handler) then this will either create an empty new stack if - // Deno currently has nothing running and is waiting for promises to resolve, - // or will (very incorrectly) build ontop of whatever stack exists. - // The callback will even be called through from a `while (true)` liveloop, but - // it somehow cannot change the values that the loop sees, even if they both - // refer the same `let bool_value`. - let mut cb_scope = v8::CallbackScope::new(context); - let scope = &mut v8::HandleScope::new(&mut cb_scope); let func = std::mem::transmute::< NonNull, v8::Local, @@ -562,25 +587,12 @@ where panic!("Isolate ID counter overflowed u32"); } - let async_work_sender = - if let Some(ffi_state) = state.try_borrow_mut::() { - ffi_state.async_work_sender.clone() - } else { - let (async_work_sender, async_work_receiver) = - mpsc::unbounded::(); + let async_work_sender = state.borrow::().clone(); - state.put(FfiState { - async_work_receiver, - async_work_sender: async_work_sender.clone(), - }); - - async_work_sender - }; let callback = v8::Global::new(scope, cb).into_raw(); let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); - let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -588,7 +600,6 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker, })); let cif = Cif::new( args diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs index 0541e1eeb0..bb403d6c07 100644 --- a/ext/ffi/lib.rs +++ b/ext/ffi/lib.rs @@ -1,15 +1,12 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; use deno_core::OpState; -use std::cell::RefCell; use std::mem::size_of; use std::os::raw::c_char; use std::os::raw::c_short; use std::path::Path; -use std::rc::Rc; mod call; mod callback; @@ -59,13 +56,6 @@ pub trait FfiPermissions { fn check_partial(&mut self, path: Option<&Path>) -> Result<(), AnyError>; } -pub(crate) type PendingFfiAsyncWork = Box; - -pub(crate) struct FfiState { - pub(crate) async_work_sender: mpsc::UnboundedSender, - pub(crate) async_work_receiver: mpsc::UnboundedReceiver, -} - deno_core::extension!(deno_ffi, deps = [ deno_web ], parameters = [P: FfiPermissions], @@ -101,35 +91,4 @@ deno_core::extension!(deno_ffi, op_ffi_unsafe_callback_ref, ], esm = [ "00_ffi.js" ], - event_loop_middleware = event_loop_middleware, ); - -fn event_loop_middleware( - op_state_rc: Rc>, - _cx: &mut std::task::Context, -) -> bool { - // FFI callbacks coming in from other threads will call in and get queued. - let mut maybe_scheduling = false; - - let mut op_state = op_state_rc.borrow_mut(); - if let Some(ffi_state) = op_state.try_borrow_mut::() { - // TODO(mmastrac): This should be a SmallVec to avoid allocations in most cases - let mut work_items = Vec::with_capacity(1); - - while let Ok(Some(async_work_fut)) = - ffi_state.async_work_receiver.try_next() - { - // Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work. - work_items.push(async_work_fut); - maybe_scheduling = true; - } - - // Drop the op_state and ffi_state borrows - drop(op_state); - for async_work_fut in work_items.into_iter() { - async_work_fut(); - } - } - - maybe_scheduling -} diff --git a/test_ffi/src/lib.rs b/test_ffi/src/lib.rs index b1a2104174..c5c2c2d7a6 100644 --- a/test_ffi/src/lib.rs +++ b/test_ffi/src/lib.rs @@ -258,6 +258,20 @@ pub extern "C" fn call_stored_function_thread_safe_and_log() { }); } +#[no_mangle] +pub extern "C" fn call_stored_function_2_thread_safe(arg: u8) { + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(1500)); + unsafe { + if STORED_FUNCTION_2.is_none() { + return; + } + println!("Calling"); + STORED_FUNCTION_2.unwrap()(arg); + } + }); +} + #[no_mangle] pub extern "C" fn log_many_parameters( a: u8, diff --git a/test_ffi/tests/ffi_callback_errors.ts b/test_ffi/tests/ffi_callback_errors.ts new file mode 100644 index 0000000000..dda4de5fbd --- /dev/null +++ b/test_ffi/tests/ffi_callback_errors.ts @@ -0,0 +1,141 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +const targetDir = Deno.execPath().replace(/[^\/\\]+$/, ""); +const [libPrefix, libSuffix] = { + darwin: ["lib", "dylib"], + linux: ["lib", "so"], + windows: ["", "dll"], +}[Deno.build.os]; +const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`; + +const dylib = Deno.dlopen( + libPath, + { + store_function_2: { + parameters: ["function"], + result: "void", + }, + call_stored_function_2: { + parameters: ["u8"], + result: "void", + }, + call_stored_function_2_from_other_thread: { + name: "call_stored_function_2", + parameters: ["u8"], + result: "void", + nonblocking: true, + }, + call_stored_function_2_thread_safe: { + parameters: ["u8"], + result: "void", + }, + } as const, +); + +globalThis.addEventListener("error", (data) => { + console.log("Unhandled error"); + data.preventDefault(); +}); +globalThis.onerror = (data) => { + console.log("Unhandled error"); + if (typeof data !== "string") { + data.preventDefault(); + } +}; + +globalThis.addEventListener("unhandledrejection", (data) => { + console.log("Unhandled rejection"); + data.preventDefault(); +}); + +const timer = setTimeout(() => { + console.error( + "Test failed, final callback did not get picked up by Deno event loop", + ); + Deno.exit(-1); +}, 5_000); + +Deno.unrefTimer(timer); + +enum CallCase { + SyncSelf, + SyncFfi, + AsyncSelf, + AsyncSyncFfi, + AsyncFfi, +} +type U8CallCase = Deno.NativeU8Enum; + +const throwCb = (c: CallCase): number => { + console.log("CallCase:", CallCase[c]); + if (c === CallCase.AsyncFfi) { + cb.unref(); + } + throw new Error("Error"); +}; + +const THROW_CB_DEFINITION = { + parameters: ["u8" as U8CallCase], + result: "u8", +} as const; + +const cb = new Deno.UnsafeCallback(THROW_CB_DEFINITION, throwCb); + +try { + const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, THROW_CB_DEFINITION); + + fnPointer.call(CallCase.SyncSelf); +} catch (_err) { + console.log( + "Throwing errors from an UnsafeCallback called from a synchronous UnsafeFnPointer works. Terribly excellent.", + ); +} + +dylib.symbols.store_function_2(cb.pointer); +try { + dylib.symbols.call_stored_function_2(CallCase.SyncFfi); +} catch (_err) { + console.log( + "Throwing errors from an UnsafeCallback called from a synchronous FFI symbol works. Terribly excellent.", + ); +} + +try { + const fnPointer = new Deno.UnsafeFnPointer(cb.pointer, { + ...THROW_CB_DEFINITION, + nonblocking: true, + }); + await fnPointer.call(CallCase.AsyncSelf); +} catch (err) { + throw new Error( + "Nonblocking UnsafeFnPointer should not be threading through a JS error thrown on the other side of the call", + { + cause: err, + }, + ); +} + +try { + await dylib.symbols.call_stored_function_2_from_other_thread( + CallCase.AsyncSyncFfi, + ); +} catch (err) { + throw new Error( + "Nonblocking symbol call should not be threading through a JS error thrown on the other side of the call", + { + cause: err, + }, + ); +} +try { + // Ref the callback to make sure we do not exit before the call is done. + cb.ref(); + dylib.symbols.call_stored_function_2_thread_safe(CallCase.AsyncFfi); +} catch (err) { + throw new Error( + "Blocking symbol call should not be travelling 1.5 seconds forward in time to figure out that it call will trigger a JS error to be thrown", + { + cause: err, + }, + ); +} diff --git a/test_ffi/tests/integration_tests.rs b/test_ffi/tests/integration_tests.rs index 99707438ec..446f6774dc 100644 --- a/test_ffi/tests/integration_tests.rs +++ b/test_ffi/tests/integration_tests.rs @@ -237,3 +237,44 @@ fn event_loop_integration() { assert_eq!(stdout, expected); assert_eq!(stderr, ""); } + +#[test] +fn ffi_callback_errors_test() { + build(); + + let output = deno_cmd() + .arg("run") + .arg("--allow-ffi") + .arg("--allow-read") + .arg("--unstable") + .arg("--quiet") + .arg("tests/ffi_callback_errors.ts") + .env("NO_COLOR", "1") + .output() + .unwrap(); + let stdout = std::str::from_utf8(&output.stdout).unwrap(); + let stderr = std::str::from_utf8(&output.stderr).unwrap(); + if !output.status.success() { + println!("stdout {stdout}"); + println!("stderr {stderr}"); + } + println!("{:?}", output.status); + assert!(output.status.success()); + + let expected = "\ + CallCase: SyncSelf\n\ + Throwing errors from an UnsafeCallback called from a synchronous UnsafeFnPointer works. Terribly excellent.\n\ + CallCase: SyncFfi\n\ + 0\n\ + Throwing errors from an UnsafeCallback called from a synchronous FFI symbol works. Terribly excellent.\n\ + CallCase: AsyncSelf\n\ + CallCase: AsyncSyncFfi\n\ + 0\n\ + Calling\n\ + CallCase: AsyncFfi\n"; + assert_eq!(stdout, expected); + assert_eq!( + stderr, + "Illegal unhandled exception in nonblocking callback.\n".repeat(3) + ); +}