0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

cleanup(core): AsyncOpIterator (#11860)

This commit is contained in:
Aaron O'Mullan 2021-10-24 21:41:57 +02:00 committed by GitHub
parent 439a2914db
commit 53d38ad1e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -139,6 +139,23 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
struct AsyncOpIterator<'a, 'b, 'c> {
ops: &'b mut FuturesUnordered<PendingOpFuture>,
cx: &'a mut Context<'c>,
}
impl Iterator for AsyncOpIterator<'_, '_, '_> {
type Item = (PromiseId, OpId, OpResult);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
match self.ops.poll_next_unpin(self.cx) {
Poll::Ready(Some(item)) => Some(item),
_ => None,
}
}
}
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@ -759,8 +776,7 @@ impl JsRuntime {
// Ops
{
let async_responses = self.poll_pending_ops(cx);
self.async_op_response(async_responses)?;
self.resolve_async_ops(cx)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
}
@ -1458,45 +1474,6 @@ impl JsRuntime {
Ok(root_id)
}
fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Vec<(PromiseId, OpResult)> {
let state_rc = Self::state(self.v8_isolate());
let mut async_responses: Vec<(PromiseId, OpResult)> = Vec::new();
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
state.have_unpolled_ops = false;
loop {
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((promise_id, op_id, resp))) => {
state.op_state.borrow().tracker.track_async_completed(op_id);
async_responses.push((promise_id, resp));
}
};
}
loop {
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((promise_id, op_id, resp))) => {
state.op_state.borrow().tracker.track_unref_completed(op_id);
async_responses.push((promise_id, resp));
}
};
}
async_responses
}
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let mut state = state_rc.borrow_mut();
@ -1522,19 +1499,10 @@ impl JsRuntime {
}
// Send finished responses to JS
fn async_op_response(
&mut self,
async_responses: Vec<(PromiseId, OpResult)>,
) -> Result<(), AnyError> {
fn resolve_async_ops(&mut self, cx: &mut Context) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let async_responses_size = async_responses.len();
if async_responses_size == 0 {
return Ok(());
}
let js_recv_cb_handle = state_rc.borrow().js_recv_cb.clone().unwrap();
let scope = &mut self.handle_scope();
// We return async responses to JS in unbounded batches (may change),
@ -1544,12 +1512,36 @@ impl JsRuntime {
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
let mut args: Vec<v8::Local<v8::Value>> =
Vec::with_capacity(2 * async_responses_size);
for overflown_response in async_responses {
let (promise_id, resp) = overflown_response;
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
let mut args: Vec<v8::Local<v8::Value>> = vec![];
// Now handle actual ops.
{
let mut state = state_rc.borrow_mut();
state.have_unpolled_ops = false;
let op_state = state.op_state.clone();
let ops = AsyncOpIterator {
ops: &mut state.pending_ops,
cx,
};
for (promise_id, op_id, resp) in ops {
op_state.borrow().tracker.track_async_completed(op_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
let ops = AsyncOpIterator {
ops: &mut state.pending_unref_ops,
cx,
};
for (promise_id, op_id, resp) in ops {
op_state.borrow().tracker.track_unref_completed(op_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
}
if args.is_empty() {
return Ok(());
}
let tc_scope = &mut v8::TryCatch::new(scope);