1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 06:09:25 -05:00

core: make Isolate use FuturesUnordered to track ops

Additionally, instead of polling ops in a loop until none of them are
ready, the isolate will now yield to the task system after delivering
the first batch of completed ops to the javascript side.

Although this makes performance a bit worse (about 15% fewer
requests/second on the 'deno_core_http_bench' benchmark), we feel that
the advantages are worth it:

* It resolves the extremely high worst-case latency that we were seeing
  on deno_core_http_bench, in particular when using the multi-threaded
  Tokio runtime, which would sometimes exceed a full second.

* Before this patch, the implementation of Isolate::poll() had to loop
  through all sub-futures and poll each one of them, which doesn't scale
  well as the number of futures managed by the isolate goes up. This
  could lead to poor performance when e.g. a server is servicing
  thousands of connected clients.
This commit is contained in:
Bert Belder 2019-04-15 02:07:34 +02:00
parent dd595220ab
commit 7807afa972
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461

View file

@ -12,11 +12,12 @@ use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::Async;
use futures::stream::{FuturesUnordered, Stream};
use futures::task;
use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_void;
use std::collections::VecDeque;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
@ -27,27 +28,28 @@ pub type Op = dyn Future<Item = Buf, Error = ()> + Send;
struct PendingOp {
op: Box<Op>,
polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}
struct OpResult {
buf: Buf,
zero_copy_id: usize,
}
impl Future for PendingOp {
type Item = Buf;
type Item = OpResult;
type Error = ();
fn poll(&mut self) -> Poll<Buf, ()> {
// Do not call poll on ops we've already polled this turn.
if self.polled_recently {
Ok(Async::NotReady)
} else {
self.polled_recently = true;
let op = &mut self.op;
op.poll().map_err(|()| {
// Ops should not error. If an op experiences an error it needs to
// encode that error into a buf, so it can be returned to JS.
panic!("ops should not error")
})
}
fn poll(&mut self) -> Poll<OpResult, ()> {
// Ops should not error. If an op experiences an error it needs to
// encode that error into a buf, so it can be returned to JS.
Ok(match self.op.poll().expect("ops should not error") {
NotReady => NotReady,
Ready(buf) => Ready(OpResult {
buf,
zero_copy_id: self.zero_copy_id,
}),
})
}
}
@ -91,8 +93,8 @@ pub struct Isolate<B: Dispatch> {
dispatcher: B,
needs_init: bool,
shared: SharedQueue,
pending_ops: VecDeque<PendingOp>,
polled_recently: bool,
pending_ops: FuturesUnordered<PendingOp>,
have_unpolled_ops: bool,
}
unsafe impl<B: Dispatch> Send for Isolate<B> {}
@ -142,8 +144,8 @@ impl<B: Dispatch> Isolate<B> {
dispatcher,
shared,
needs_init,
pending_ops: VecDeque::new(),
polled_recently: false,
pending_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
};
// If we want to use execute this has to happen here sadly.
@ -209,12 +211,8 @@ impl<B: Dispatch> Isolate<B> {
// picked up.
let _ = isolate.respond(Some(&res_record));
} else {
isolate.pending_ops.push_back(PendingOp {
op,
polled_recently: false,
zero_copy_id,
});
isolate.polled_recently = false;
isolate.pending_ops.push(PendingOp { op, zero_copy_id });
isolate.have_unpolled_ops = true;
}
}
@ -438,58 +436,41 @@ impl<B: Dispatch> Future for Isolate<B> {
// Lock the current thread for V8.
let _locker = LockerScope::new(self.libdeno_isolate);
// Clear poll_recently state both on the Isolate itself and
// on the pending ops.
self.polled_recently = false;
for pending in self.pending_ops.iter_mut() {
pending.polled_recently = false;
}
let mut overflow_response: Option<Buf> = None;
while !self.polled_recently {
let mut completed_count = 0;
self.polled_recently = true;
assert_eq!(self.shared.size(), 0);
loop {
self.have_unpolled_ops = false;
#[allow(clippy::match_wild_err_arm)]
match self.pending_ops.poll() {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
}
let mut overflow_response: Option<Buf> = None;
for _ in 0..self.pending_ops.len() {
assert!(overflow_response.is_none());
let mut op = self.pending_ops.pop_front().unwrap();
match op.poll() {
Err(()) => panic!("unexpected error"),
Ok(Async::NotReady) => self.pending_ops.push_back(op),
Ok(Async::Ready(buf)) => {
if op.zero_copy_id > 0 {
self.zero_copy_release(op.zero_copy_id);
}
let successful_push = self.shared.push(&buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(buf);
// reset `polled_recently` so pending ops can be
// done even if shared space overflows
self.polled_recently = false;
break;
}
completed_count += 1;
let successful_push = self.shared.push(&r.buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(r.buf);
break;
}
}
}
}
if completed_count > 0 {
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if self.shared.size() > 0 {
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}
if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}
self.check_promise_errors();
@ -501,6 +482,9 @@ impl<B: Dispatch> Future for Isolate<B> {
if self.pending_ops.is_empty() {
Ok(futures::Async::Ready(()))
} else {
if self.have_unpolled_ops {
task::current().notify();
}
Ok(futures::Async::NotReady)
}
}