mirror of
https://github.com/denoland/deno.git
synced 2025-01-21 21:50:00 -05:00
fix(ext/ffi): Fix UnsafeCallback ref'ing making Deno enter a live-loop (#16216)
Fixes #15136 Currently `UnsafeCallback` class' `ref()` and `unref()` methods rely on the `event_loop_middleware` implementation in core. If even a single `UnsafeCallback` is ref'ed, then the FFI event loop middleware will always return `true` to signify that there may still be more work for the event loop to do. The middleware handling in core does not wait a moment to check again, but will instead synchronously directly re-poll the event loop and middlewares for more work. This becomes a live-loop. This PR introduces a `Future` implementation for the `CallbackInfo` struct that acts as the intermediary data storage between an `UnsafeCallback` and the `libffi` C callback. Ref'ing a callback now means calling an async op that binds to the `CallbackInfo` Future and only resolves once the callback is unref'ed. The `libffi` C callback will call the waker of this Future when it fires to make sure that the main thread wakes up to receive the callback.
This commit is contained in:
parent
8283d37c51
commit
75acec0aea
4 changed files with 91 additions and 32 deletions
10
cli/dts/lib.deno.unstable.d.ts
vendored
10
cli/dts/lib.deno.unstable.d.ts
vendored
|
@ -831,20 +831,22 @@ declare namespace Deno {
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds one to this callback's reference counting.
|
* Adds one to this callback's reference counting and returns the
|
||||||
|
* new reference count.
|
||||||
*
|
*
|
||||||
* If the callback's reference count becomes non-zero, it will keep
|
* If the callback's reference count becomes non-zero, it will keep
|
||||||
* Deno's process from exiting.
|
* Deno's process from exiting.
|
||||||
*/
|
*/
|
||||||
ref(): void;
|
ref(): number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes one from this callback's reference counting.
|
* Removes one from this callback's reference counting and returns
|
||||||
|
* the new reference count.
|
||||||
*
|
*
|
||||||
* If the callback's reference counter becomes zero, it will no longer
|
* If the callback's reference counter becomes zero, it will no longer
|
||||||
* keep Deno's process from exiting.
|
* keep Deno's process from exiting.
|
||||||
*/
|
*/
|
||||||
unref(): void;
|
unref(): number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the C function pointer associated with the UnsafeCallback.
|
* Removes the C function pointer associated with the UnsafeCallback.
|
||||||
|
|
|
@ -184,6 +184,8 @@
|
||||||
|
|
||||||
class UnsafeCallback {
|
class UnsafeCallback {
|
||||||
#refcount;
|
#refcount;
|
||||||
|
// Internal promise only meant to keep Deno from exiting
|
||||||
|
#refpromise;
|
||||||
#rid;
|
#rid;
|
||||||
definition;
|
definition;
|
||||||
callback;
|
callback;
|
||||||
|
@ -208,23 +210,25 @@
|
||||||
|
|
||||||
ref() {
|
ref() {
|
||||||
if (this.#refcount++ === 0) {
|
if (this.#refcount++ === 0) {
|
||||||
ops.op_ffi_unsafe_callback_ref(true);
|
this.#refpromise = core.opAsync(
|
||||||
|
"op_ffi_unsafe_callback_ref",
|
||||||
|
this.#rid,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
return this.#refcount;
|
||||||
}
|
}
|
||||||
|
|
||||||
unref() {
|
unref() {
|
||||||
// Only decrement refcount if it is positive, and only
|
// Only decrement refcount if it is positive, and only
|
||||||
// unref the callback if refcount reaches zero.
|
// unref the callback if refcount reaches zero.
|
||||||
if (this.#refcount > 0 && --this.#refcount === 0) {
|
if (this.#refcount > 0 && --this.#refcount === 0) {
|
||||||
ops.op_ffi_unsafe_callback_ref(false);
|
ops.op_ffi_unsafe_callback_unref(this.#rid);
|
||||||
}
|
}
|
||||||
|
return this.#refcount;
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
if (this.#refcount) {
|
this.#refcount = 0;
|
||||||
this.#refcount = 0;
|
|
||||||
ops.op_ffi_unsafe_callback_ref(false);
|
|
||||||
}
|
|
||||||
core.close(this.#rid);
|
core.close(this.#rid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,8 @@ use deno_core::op;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
use deno_core::serde_v8;
|
use deno_core::serde_v8;
|
||||||
use deno_core::v8;
|
use deno_core::v8;
|
||||||
|
use deno_core::CancelFuture;
|
||||||
|
use deno_core::CancelHandle;
|
||||||
use deno_core::Extension;
|
use deno_core::Extension;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::Resource;
|
use deno_core::Resource;
|
||||||
|
@ -26,14 +28,18 @@ use std::cell::RefCell;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ffi::c_void;
|
use std::ffi::c_void;
|
||||||
use std::ffi::CStr;
|
use std::ffi::CStr;
|
||||||
|
use std::future::IntoFuture;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
use std::os::raw::c_char;
|
use std::os::raw::c_char;
|
||||||
use std::os::raw::c_short;
|
use std::os::raw::c_short;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::mpsc::sync_channel;
|
use std::sync::mpsc::sync_channel;
|
||||||
|
use std::task::Poll;
|
||||||
|
use std::task::Waker;
|
||||||
|
|
||||||
mod fast_call;
|
mod fast_call;
|
||||||
|
|
||||||
|
@ -156,7 +162,6 @@ type PendingFfiAsyncWork = Box<dyn FnOnce()>;
|
||||||
struct FfiState {
|
struct FfiState {
|
||||||
async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
|
async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
|
||||||
async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
|
async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
|
||||||
active_refed_functions: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
|
@ -188,6 +193,7 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
op_ffi_read_f64::decl::<P>(),
|
op_ffi_read_f64::decl::<P>(),
|
||||||
op_ffi_unsafe_callback_create::decl::<P>(),
|
op_ffi_unsafe_callback_create::decl::<P>(),
|
||||||
op_ffi_unsafe_callback_ref::decl(),
|
op_ffi_unsafe_callback_ref::decl(),
|
||||||
|
op_ffi_unsafe_callback_unref::decl(),
|
||||||
])
|
])
|
||||||
.event_loop_middleware(|op_state_rc, _cx| {
|
.event_loop_middleware(|op_state_rc, _cx| {
|
||||||
// FFI callbacks coming in from other threads will call in and get queued.
|
// FFI callbacks coming in from other threads will call in and get queued.
|
||||||
|
@ -207,10 +213,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
maybe_scheduling = true;
|
maybe_scheduling = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ffi_state.active_refed_functions > 0 {
|
|
||||||
maybe_scheduling = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(op_state);
|
drop(op_state);
|
||||||
}
|
}
|
||||||
while let Some(async_work_fut) = work_items.pop() {
|
while let Some(async_work_fut) = work_items.pop() {
|
||||||
|
@ -227,7 +229,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
mpsc::unbounded::<PendingFfiAsyncWork>();
|
mpsc::unbounded::<PendingFfiAsyncWork>();
|
||||||
|
|
||||||
state.put(FfiState {
|
state.put(FfiState {
|
||||||
active_refed_functions: 0,
|
|
||||||
async_work_receiver,
|
async_work_receiver,
|
||||||
async_work_sender,
|
async_work_sender,
|
||||||
});
|
});
|
||||||
|
@ -1320,11 +1321,12 @@ fn ffi_call(
|
||||||
}
|
}
|
||||||
|
|
||||||
struct UnsafeCallbackResource {
|
struct UnsafeCallbackResource {
|
||||||
|
cancel: Rc<CancelHandle>,
|
||||||
// Closure is never directly touched, but it keeps the C callback alive
|
// Closure is never directly touched, but it keeps the C callback alive
|
||||||
// until `close()` method is called.
|
// until `close()` method is called.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
closure: libffi::middle::Closure<'static>,
|
closure: libffi::middle::Closure<'static>,
|
||||||
info: *const CallbackInfo,
|
info: *mut CallbackInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Resource for UnsafeCallbackResource {
|
impl Resource for UnsafeCallbackResource {
|
||||||
|
@ -1333,15 +1335,16 @@ impl Resource for UnsafeCallbackResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close(self: Rc<Self>) {
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel();
|
||||||
// SAFETY: This drops the closure and the callback info associated with it.
|
// SAFETY: This drops the closure and the callback info associated with it.
|
||||||
// Any retained function pointers to the closure become dangling pointers.
|
// Any retained function pointers to the closure become dangling pointers.
|
||||||
// It is up to the user to know that it is safe to call the `close()` on the
|
// It is up to the user to know that it is safe to call the `close()` on the
|
||||||
// UnsafeCallback instance.
|
// UnsafeCallback instance.
|
||||||
unsafe {
|
unsafe {
|
||||||
let info = Box::from_raw(self.info as *mut CallbackInfo);
|
let info = Box::from_raw(self.info);
|
||||||
let isolate = info.isolate.as_mut().unwrap();
|
let isolate = info.isolate.as_mut().unwrap();
|
||||||
v8::Global::from_raw(isolate, info.callback);
|
let _ = v8::Global::from_raw(isolate, info.callback);
|
||||||
v8::Global::from_raw(isolate, info.context);
|
let _ = v8::Global::from_raw(isolate, info.context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1353,6 +1356,7 @@ struct CallbackInfo {
|
||||||
pub callback: NonNull<v8::Function>,
|
pub callback: NonNull<v8::Function>,
|
||||||
pub context: NonNull<v8::Context>,
|
pub context: NonNull<v8::Context>,
|
||||||
pub isolate: *mut v8::Isolate,
|
pub isolate: *mut v8::Isolate,
|
||||||
|
pub waker: Option<Waker>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe extern "C" fn deno_ffi_callback(
|
unsafe extern "C" fn deno_ffi_callback(
|
||||||
|
@ -1376,6 +1380,10 @@ unsafe extern "C" fn deno_ffi_callback(
|
||||||
response_sender.send(()).unwrap();
|
response_sender.send(()).unwrap();
|
||||||
});
|
});
|
||||||
async_work_sender.unbounded_send(fut).unwrap();
|
async_work_sender.unbounded_send(fut).unwrap();
|
||||||
|
if let Some(waker) = info.waker.as_ref() {
|
||||||
|
// Make sure event loop wakes up to receive our message before we start waiting for a response.
|
||||||
|
waker.wake_by_ref();
|
||||||
|
}
|
||||||
response_receiver.recv().unwrap();
|
response_receiver.recv().unwrap();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -1725,22 +1733,30 @@ where
|
||||||
let current_context = scope.get_current_context();
|
let current_context = scope.get_current_context();
|
||||||
let context = v8::Global::new(scope, current_context).into_raw();
|
let context = v8::Global::new(scope, current_context).into_raw();
|
||||||
|
|
||||||
let info = Box::leak(Box::new(CallbackInfo {
|
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
|
||||||
parameters: args.parameters.clone(),
|
parameters: args.parameters.clone(),
|
||||||
result: args.result,
|
result: args.result,
|
||||||
async_work_sender,
|
async_work_sender,
|
||||||
callback,
|
callback,
|
||||||
context,
|
context,
|
||||||
isolate,
|
isolate,
|
||||||
|
waker: None,
|
||||||
}));
|
}));
|
||||||
let cif = Cif::new(
|
let cif = Cif::new(
|
||||||
args.parameters.into_iter().map(libffi::middle::Type::from),
|
args.parameters.into_iter().map(libffi::middle::Type::from),
|
||||||
libffi::middle::Type::from(args.result),
|
libffi::middle::Type::from(args.result),
|
||||||
);
|
);
|
||||||
|
|
||||||
let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, info);
|
// SAFETY: CallbackInfo is leaked, is not null and stays valid as long as the callback exists.
|
||||||
|
let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, unsafe {
|
||||||
|
info.as_ref().unwrap()
|
||||||
|
});
|
||||||
let ptr = *closure.code_ptr() as usize;
|
let ptr = *closure.code_ptr() as usize;
|
||||||
let resource = UnsafeCallbackResource { closure, info };
|
let resource = UnsafeCallbackResource {
|
||||||
|
cancel: CancelHandle::new_rc(),
|
||||||
|
closure,
|
||||||
|
info,
|
||||||
|
};
|
||||||
let rid = state.resource_table.add(resource);
|
let rid = state.resource_table.add(resource);
|
||||||
|
|
||||||
let rid_local = v8::Integer::new_from_unsigned(scope, rid);
|
let rid_local = v8::Integer::new_from_unsigned(scope, rid);
|
||||||
|
@ -1790,17 +1806,53 @@ where
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
impl Future for CallbackInfo {
|
||||||
fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
|
type Output = ();
|
||||||
check_unstable(state, "Deno.dlopen");
|
fn poll(
|
||||||
let ffi_state = state.borrow_mut::<FfiState>();
|
mut self: Pin<&mut Self>,
|
||||||
if inc_dec {
|
cx: &mut std::task::Context<'_>,
|
||||||
ffi_state.active_refed_functions += 1;
|
) -> std::task::Poll<Self::Output> {
|
||||||
} else {
|
// Always replace the waker to make sure it's bound to the proper Future.
|
||||||
ffi_state.active_refed_functions -= 1;
|
self.waker.replace(cx.waker().clone());
|
||||||
|
// The future for the CallbackInfo never resolves: It can only be canceled.
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
fn op_ffi_unsafe_callback_ref(
|
||||||
|
state: &mut deno_core::OpState,
|
||||||
|
rid: ResourceId,
|
||||||
|
) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> {
|
||||||
|
let callback_resource =
|
||||||
|
state.resource_table.get::<UnsafeCallbackResource>(rid)?;
|
||||||
|
|
||||||
|
Ok(async move {
|
||||||
|
let info: &mut CallbackInfo =
|
||||||
|
// SAFETY: CallbackInfo pointer stays valid as long as the resource is still alive.
|
||||||
|
unsafe { callback_resource.info.as_mut().unwrap() };
|
||||||
|
// Ignore cancellation rejection
|
||||||
|
let _ = info
|
||||||
|
.into_future()
|
||||||
|
.or_cancel(callback_resource.cancel.clone())
|
||||||
|
.await;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op(fast)]
|
||||||
|
fn op_ffi_unsafe_callback_unref(
|
||||||
|
state: &mut deno_core::OpState,
|
||||||
|
rid: u32,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
state
|
||||||
|
.resource_table
|
||||||
|
.get::<UnsafeCallbackResource>(rid)?
|
||||||
|
.cancel
|
||||||
|
.cancel();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[op(v8)]
|
#[op(v8)]
|
||||||
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
|
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
|
||||||
scope: &mut v8::HandleScope<'scope>,
|
scope: &mut v8::HandleScope<'scope>,
|
||||||
|
|
|
@ -515,6 +515,7 @@ logCallback.unref();
|
||||||
console.log("Thread safe call counter:", counter);
|
console.log("Thread safe call counter:", counter);
|
||||||
returnU8Callback.ref();
|
returnU8Callback.ref();
|
||||||
await dylib.symbols.call_fn_ptr_return_u8_thread_safe(returnU8Callback.pointer);
|
await dylib.symbols.call_fn_ptr_return_u8_thread_safe(returnU8Callback.pointer);
|
||||||
|
// Purposefully do not unref returnU8Callback: Instead use it to test close() unrefing.
|
||||||
|
|
||||||
// Test statics
|
// Test statics
|
||||||
console.log("Static u32:", dylib.symbols.static_u32);
|
console.log("Static u32:", dylib.symbols.static_u32);
|
||||||
|
@ -585,7 +586,7 @@ After: ${postStr}`,
|
||||||
})();
|
})();
|
||||||
|
|
||||||
function assertIsOptimized(fn) {
|
function assertIsOptimized(fn) {
|
||||||
const status = % GetOptimizationStatus(fn);
|
const status = %GetOptimizationStatus(fn);
|
||||||
assert(status & (1 << 4), `expected ${fn.name} to be optimized, but wasn't`);
|
assert(status & (1 << 4), `expected ${fn.name} to be optimized, but wasn't`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue