0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-09 21:57:40 -04:00

core: poll ops round robin

Also use a VecDeque to store pending ops to avoid exponential time complexity
when removing completed ops from the list.
This commit is contained in:
Bert Belder 2019-04-08 08:20:39 +02:00
parent 2debbdacb9
commit e43da28b28
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461

View file

@ -16,6 +16,7 @@ use futures::Async;
use futures::Future; use futures::Future;
use futures::Poll; use futures::Poll;
use libc::c_void; use libc::c_void;
use std::collections::VecDeque;
use std::ffi::CStr; use std::ffi::CStr;
use std::ffi::CString; use std::ffi::CString;
use std::ptr::null; use std::ptr::null;
@ -92,7 +93,7 @@ pub struct Isolate<B: Behavior> {
behavior: B, behavior: B,
needs_init: bool, needs_init: bool,
shared: SharedQueue, shared: SharedQueue,
pending_ops: Vec<PendingOp>, pending_ops: VecDeque<PendingOp>,
polled_recently: bool, polled_recently: bool,
} }
@ -143,7 +144,7 @@ impl<B: Behavior> Isolate<B> {
behavior, behavior,
shared, shared,
needs_init, needs_init,
pending_ops: Vec::new(), pending_ops: VecDeque::new(),
polled_recently: false, polled_recently: false,
}; };
@ -210,7 +211,7 @@ impl<B: Behavior> Isolate<B> {
// picked up. // picked up.
let _ = isolate.respond(Some(&res_record)); let _ = isolate.respond(Some(&res_record));
} else { } else {
isolate.pending_ops.push(PendingOp { isolate.pending_ops.push_back(PendingOp {
op, op,
polled_recently: false, polled_recently: false,
zero_copy_id, zero_copy_id,
@ -453,20 +454,15 @@ impl<B: Behavior> Future for Isolate<B> {
let mut overflow_response: Option<Buf> = None; let mut overflow_response: Option<Buf> = None;
let mut i = 0; for _ in 0..self.pending_ops.len() {
while i < self.pending_ops.len() {
assert!(overflow_response.is_none()); assert!(overflow_response.is_none());
let pending = &mut self.pending_ops[i]; let mut op = self.pending_ops.pop_front().unwrap();
match pending.poll() { match op.poll() {
Err(()) => panic!("unexpected error"), Err(()) => panic!("unexpected error"),
Ok(Async::NotReady) => { Ok(Async::NotReady) => self.pending_ops.push_back(op),
i += 1;
}
Ok(Async::Ready(buf)) => { Ok(Async::Ready(buf)) => {
let completed = self.pending_ops.remove(i); if op.zero_copy_id > 0 {
self.zero_copy_release(op.zero_copy_id);
if completed.zero_copy_id > 0 {
self.zero_copy_release(completed.zero_copy_id);
} }
let successful_push = self.shared.push(&buf); let successful_push = self.shared.push(&buf);