From dccf5e0c5c7f04409809104dd23472bcc058e170 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 23 Feb 2021 13:08:50 +0100 Subject: [PATCH] refactor(core): Allow multiple overflown responses in single poll (#9433) This commit rewrites "JsRuntime::poll" function to fix a corner case that might caused "overflown_response" to be overwritten by other overflown response. The logic has been changed to allow returning multiple overflown response alongside responses from shared queue. --- cli/bench/main.rs | 9 ++ cli/tests/workers/large_message_worker.js | 14 +++ cli/tests/workers_large_message_bench.ts | 35 ++++++ core/bindings.rs | 6 +- core/core.js | 11 +- core/runtime.rs | 129 +++++++++++++++------- 6 files changed, 156 insertions(+), 48 deletions(-) create mode 100644 cli/tests/workers/large_message_worker.js create mode 100644 cli/tests/workers_large_message_bench.ts diff --git a/cli/bench/main.rs b/cli/bench/main.rs index b69bbc7ac6..c6f786e318 100644 --- a/cli/bench/main.rs +++ b/cli/bench/main.rs @@ -67,6 +67,15 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option)] = &[ ], None, ), + ( + "workers_large_message", + &[ + "run", + "--allow-read", + "cli/tests/workers_large_message_bench.ts", + ], + None, + ), ( "text_decoder", &["run", "cli/tests/text_decoder_perf.js"], diff --git a/cli/tests/workers/large_message_worker.js b/cli/tests/workers/large_message_worker.js new file mode 100644 index 0000000000..f7b7da8a02 --- /dev/null +++ b/cli/tests/workers/large_message_worker.js @@ -0,0 +1,14 @@ +// Copyright 2020 the Deno authors. All rights reserved. MIT license. + +const dataSmall = ""; +const dataLarge = "x".repeat(10 * 1024); + +onmessage = function (e) { + for (let i = 0; i <= 10; i++) { + if (i % 2 == 0) { + postMessage(dataLarge); + } else { + postMessage(dataSmall); + } + } +}; diff --git a/cli/tests/workers_large_message_bench.ts b/cli/tests/workers_large_message_bench.ts new file mode 100644 index 0000000000..9cda5a40d6 --- /dev/null +++ b/cli/tests/workers_large_message_bench.ts @@ -0,0 +1,35 @@ +// Copyright 2020 the Deno authors. All rights reserved. MIT license. + +// deno-lint-ignore-file + +import { deferred } from "../../test_util/std/async/deferred.ts"; + +function oneWorker(i: any): Promise { + return new Promise((resolve) => { + let countDown = 10; + const worker = new Worker( + new URL("workers/large_message_worker.js", import.meta.url).href, + { type: "module" }, + ); + worker.onmessage = (e): void => { + if (countDown > 0) { + countDown--; + return; + } + worker.terminate(); + resolve(); + }; + worker.postMessage("hi " + i); + }); +} + +function bench(): Promise { + let promises = []; + for (let i = 0; i < 50; i++) { + promises.push(oneWorker(i)); + } + + return Promise.all(promises); +} + +bench(); diff --git a/core/bindings.rs b/core/bindings.rs index bb5589080b..157b58a9d1 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -368,7 +368,7 @@ fn send<'s>( mut rv: v8::ReturnValue, ) { let state_rc = JsRuntime::state(scope); - let state = state_rc.borrow_mut(); + let mut state = state_rc.borrow_mut(); let op_id = match v8::Local::::try_from(args.get(0)) .map_err(AnyError::from) @@ -412,12 +412,12 @@ fn send<'s>( Op::Async(fut) => { let fut2 = fut.map(move |buf| (op_id, buf)); state.pending_ops.push(fut2.boxed_local()); - state.have_unpolled_ops.set(true); + state.have_unpolled_ops = true; } Op::AsyncUnref(fut) => { let fut2 = fut.map(move |buf| (op_id, buf)); state.pending_unref_ops.push(fut2.boxed_local()); - state.have_unpolled_ops.set(true); + state.have_unpolled_ops = true; } Op::NotFound => { let msg = format!("Unknown op id: {}", op_id); diff --git a/core/core.js b/core/core.js index a96ce81d74..bda6739a2a 100644 --- a/core/core.js +++ b/core/core.js @@ -155,12 +155,7 @@ SharedQueue Binary Layout asyncHandlers[opId] = cb; } - function handleAsyncMsgFromRust(opId, buf) { - if (buf) { - // This is the overflow_response case of deno::JsRuntime::poll(). - asyncHandlers[opId](buf); - return; - } + function handleAsyncMsgFromRust() { while (true) { const opIdBuf = shift(); if (opIdBuf == null) { @@ -169,6 +164,10 @@ SharedQueue Binary Layout assert(asyncHandlers[opIdBuf[0]] != null); asyncHandlers[opIdBuf[0]](opIdBuf[1]); } + + for (let i = 0; i < arguments.length; i += 2) { + asyncHandlers[arguments[i]](arguments[i + 1]); + } } function dispatch(opName, control, ...zeroCopy) { diff --git a/core/runtime.rs b/core/runtime.rs index f7cc0fa997..67161d5e7b 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -32,7 +32,6 @@ use futures::stream::StreamFuture; use futures::task::AtomicWaker; use futures::Future; use std::any::Any; -use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; use std::convert::TryFrom; @@ -110,8 +109,7 @@ pub(crate) struct JsRuntimeState { pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered, pub(crate) pending_unref_ops: FuturesUnordered, - pub(crate) have_unpolled_ops: Cell, - //pub(crate) op_table: OpTable, + pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, pub loader: Rc, pub modules: Modules, @@ -287,7 +285,7 @@ impl JsRuntime { pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), op_state: Rc::new(RefCell::new(op_state)), - have_unpolled_ops: Cell::new(false), + have_unpolled_ops: false, modules: Modules::new(), loader, dyn_import_map: HashMap::new(), @@ -562,7 +560,7 @@ impl JsRuntime { // Check if more async ops have been dispatched // during this turn of event loop. - if state.have_unpolled_ops.get() { + if state.have_unpolled_ops { state.waker.wake(); } @@ -1346,18 +1344,16 @@ impl JsRuntime { self.mod_instantiate(root_id, None).map(|_| root_id) } - fn poll_pending_ops( - &mut self, - cx: &mut Context, - ) -> Option<(OpId, Box<[u8]>)> { + fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> { let state_rc = Self::state(self.v8_isolate()); - let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; + let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new(); + + let mut state = state_rc.borrow_mut(); + + // Now handle actual ops. + state.have_unpolled_ops = false; loop { - let mut state = state_rc.borrow_mut(); - // Now handle actual ops. - state.have_unpolled_ops.set(false); - let pending_r = state.pending_ops.poll_next_unpin(cx); match pending_r { Poll::Ready(None) => break, @@ -1365,31 +1361,21 @@ impl JsRuntime { Poll::Ready(Some((op_id, buf))) => { let successful_push = state.shared.push(op_id, &buf); if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; + overflow_response.push((op_id, buf)); } } }; } loop { - let mut state = state_rc.borrow_mut(); let unref_r = state.pending_unref_ops.poll_next_unpin(cx); - #[allow(clippy::match_wild_err_arm)] match unref_r { Poll::Ready(None) => break, Poll::Pending => break, Poll::Ready(Some((op_id, buf))) => { let successful_push = state.shared.push(op_id, &buf); if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; + overflow_response.push((op_id, buf)); } } }; @@ -1427,13 +1413,14 @@ impl JsRuntime { // Respond using shared queue and optionally overflown response fn async_op_response( &mut self, - maybe_overflown_response: Option<(OpId, Box<[u8]>)>, + overflown_responses: Vec<(OpId, Box<[u8]>)>, ) -> Result<(), AnyError> { let state_rc = Self::state(self.v8_isolate()); let shared_queue_size = state_rc.borrow().shared.size(); + let overflown_responses_size = overflown_responses.len(); - if shared_queue_size == 0 && maybe_overflown_response.is_none() { + if shared_queue_size == 0 && overflown_responses_size == 0 { return Ok(()); } @@ -1454,22 +1441,21 @@ impl JsRuntime { let tc_scope = &mut v8::TryCatch::new(scope); - if shared_queue_size > 0 { - js_recv_cb.call(tc_scope, global, &[]); + let mut args: Vec> = + Vec::with_capacity(2 * overflown_responses_size); + for overflown_response in overflown_responses { + let (op_id, buf) = overflown_response; + args.push(v8::Integer::new(tc_scope, op_id as i32).into()); + args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into()); + } + + if shared_queue_size > 0 || overflown_responses_size > 0 { + js_recv_cb.call(tc_scope, global, args.as_slice()); // The other side should have shifted off all the messages. let shared_queue_size = state_rc.borrow().shared.size(); assert_eq!(shared_queue_size, 0); } - if let Some(overflown_response) = maybe_overflown_response { - let (op_id, buf) = overflown_response; - let op_id: v8::Local = - v8::Integer::new(tc_scope, op_id as i32).into(); - let ui8: v8::Local = - bindings::boxed_slice_to_uint8array(tc_scope, buf).into(); - js_recv_cb.call(tc_scope, global, &[op_id, ui8]); - } - match tc_scope.exception() { None => Ok(()), Some(exception) => exception_to_err_result(tc_scope, exception, false), @@ -1924,6 +1910,71 @@ pub mod tests { }); } + #[test] + fn overflow_res_async_combined_with_unref() { + run_in_task(|cx| { + let mut runtime = JsRuntime::new(Default::default()); + + runtime.register_op( + "test1", + |_op_state: Rc>, _bufs: BufVec| -> Op { + let mut vec = vec![0u8; 100 * 1024 * 1024]; + vec[0] = 4; + let buf = vec.into_boxed_slice(); + Op::Async(futures::future::ready(buf).boxed()) + }, + ); + + runtime.register_op( + "test2", + |_op_state: Rc>, _bufs: BufVec| -> Op { + let mut vec = vec![0u8; 100 * 1024 * 1024]; + vec[0] = 4; + let buf = vec.into_boxed_slice(); + Op::AsyncUnref(futures::future::ready(buf).boxed()) + }, + ); + + runtime + .execute( + "overflow_res_async_combined_with_unref.js", + r#" + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + let asyncRecv = 0; + Deno.core.setAsyncHandler(1, (buf) => { + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + Deno.core.setAsyncHandler(2, (buf) => { + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + let control = new Uint8Array(1); + let response1 = Deno.core.dispatch(1, control); + // Async messages always have null response. + assert(response1 == null); + assert(asyncRecv == 0); + let response2 = Deno.core.dispatch(2, control); + // Async messages always have null response. + assert(response2 == null); + assert(asyncRecv == 0); + "#, + ) + .unwrap(); + assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + runtime + .execute("check.js", "assert(asyncRecv == 2);") + .unwrap(); + }); + } + #[test] fn overflow_res_async() { run_in_task(|_cx| {