0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

timers: implement timers in javascript

This commit is contained in:
Bert Belder 2018-10-02 17:47:40 -07:00
parent 6b77acf39d
commit aa691ea26c
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461
7 changed files with 317 additions and 144 deletions

View file

@ -19,5 +19,4 @@ export { libdeno } from "./libdeno";
export { arch, platform } from "./platform";
export { trace } from "./trace";
export { truncateSync, truncate } from "./truncate";
export { setGlobalTimeout } from "./timers";
export const args: string[] = [];

View file

@ -9,19 +9,31 @@ import { maybePushTrace } from "./trace";
let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<fbs.Base>>();
let fireTimers: () => void;
export function setFireTimersCallback(fn: () => void) {
fireTimers = fn;
}
export function handleAsyncMsgFromRust(ui8: Uint8Array) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = fbs.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = fbs.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
}
// Fire timers that have become runnable.
fireTimers();
}
// @internal

View file

@ -1,107 +1,225 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import { assert } from "./util";
import * as util from "./util";
import * as fbs from "gen/msg_generated";
import { flatbuffers } from "flatbuffers";
import { sendSync, sendAsync } from "./dispatch";
import { sendSync, setFireTimersCallback } from "./dispatch";
let nextTimerId = 1;
// tslint:disable-next-line:no-any
export type TimerCallback = (...args: any[]) => void;
// Tell the dispatcher which function it should call to fire timers that are
// due. This is done using a callback because circular imports are disallowed.
setFireTimersCallback(fireTimers);
interface Timer {
id: number;
cb: TimerCallback;
interval: boolean;
// tslint:disable-next-line:no-any
args: any[];
delay: number; // milliseconds
callback: () => void;
delay: number;
due: number;
repeat: boolean;
scheduled: boolean;
}
export function setGlobalTimeout(timeout: number) {
// We'll subtract EPOCH every time we retrieve the time with Date.now(). This
// ensures that absolute time values stay below UINT32_MAX - 2, which is the
// maximum object key that EcmaScript considers "numerical". After running for
// about a month, this is no longer true, and Deno explodes.
// TODO(piscisaureus): fix that ^.
const EPOCH = Date.now();
const APOCALYPS = 2 ** 32 - 2;
let globalTimeoutDue: number | null = null;
let nextTimerId = 1;
const idMap = new Map<number, Timer>();
const dueMap: { [due: number]: Timer[] } = Object.create(null);
function getTime() {
// TODO: use a monotonic clock.
const now = Date.now() - EPOCH;
assert(now >= 0 && now < APOCALYPS);
return now;
}
function setGlobalTimeout(due: number | null, now: number) {
// Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute
// value again.
// Note that a negative time-out value stops the global timer.
let timeout;
if (due === null) {
timeout = -1;
} else {
timeout = due - now;
assert(timeout >= 0);
}
// Send message to the backend.
const builder = new flatbuffers.Builder();
fbs.SetTimeout.startSetTimeout(builder);
fbs.SetTimeout.addTimeout(builder, timeout);
const msg = fbs.SetTimeout.endSetTimeout(builder);
const res = sendSync(builder, fbs.Any.SetTimeout, msg);
assert(res == null);
// Remember when when the global timer will fire.
globalTimeoutDue = due;
}
function startTimer(
id: number,
cb: TimerCallback,
delay: number,
interval: boolean,
// tslint:disable-next-line:no-any
args: any[]
): void {
const timer: Timer = {
id,
interval,
delay,
args,
cb
};
util.log("timers.ts startTimer");
function schedule(timer: Timer, now: number) {
assert(!timer.scheduled);
assert(now <= timer.due);
// Find or create the list of timers that will fire at point-in-time `due`.
let list = dueMap[timer.due];
if (list === undefined) {
list = dueMap[timer.due] = [];
}
// Append the newly scheduled timer to the list and mark it as scheduled.
list.push(timer);
timer.scheduled = true;
// If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
setGlobalTimeout(timer.due, now);
}
}
// Send TimerStart message
const builder = new flatbuffers.Builder();
fbs.TimerStart.startTimerStart(builder);
fbs.TimerStart.addId(builder, timer.id);
fbs.TimerStart.addDelay(builder, timer.delay);
const msg = fbs.TimerStart.endTimerStart(builder);
sendAsync(builder, fbs.Any.TimerStart, msg).then(
baseRes => {
assert(fbs.Any.TimerReady === baseRes!.msgType());
const msg = new fbs.TimerReady();
assert(baseRes!.msg(msg) != null);
assert(msg.id() === timer.id);
if (msg.canceled()) {
util.log("timer canceled message");
} else {
cb(...args);
if (interval) {
// TODO Faking setInterval with setTimeout.
// We need a new timer implementation, this is just a stopgap.
startTimer(id, cb, delay, true, args);
}
function unschedule(timer: Timer) {
if (!timer.scheduled) {
return;
}
// Find the list of timers that will fire at point-in-time `due`.
const list = dueMap[timer.due];
if (list.length === 1) {
// Time timer is the only one in the list. Remove the entire list.
assert(list[0] === timer);
delete dueMap[timer.due];
// If the unscheduled timer was 'next up', find when the next timer that
// still exists is due, and update the global alarm accordingly.
if (timer.due === globalTimeoutDue) {
let nextTimerDue: number | null = null;
for (const key in dueMap) {
nextTimerDue = Number(key);
break;
}
},
error => {
throw error;
setGlobalTimeout(nextTimerDue, getTime());
}
);
} else {
// Multiple timers that are due at the same point in time.
// Remove this timer from the list.
const index = list.indexOf(timer);
assert(index > 0);
list.splice(index, 1);
}
}
export function setTimeout(
cb: TimerCallback,
function fire(timer: Timer) {
// If the timer isn't found in the ID map, that means it has been cancelled
// between the timer firing and the promise callback (this function).
if (!idMap.has(timer.id)) {
return;
}
// Reschedule the timer if it is a repeating one, otherwise drop it.
if (!timer.repeat) {
// One-shot timer: remove the timer from this id-to-timer map.
idMap.delete(timer.id);
} else {
// Interval timer: compute when timer was supposed to fire next.
// However make sure to never schedule the next interval in the past.
const now = getTime();
timer.due = Math.max(now, timer.due + timer.delay);
schedule(timer, now);
}
// Call the user callback. Intermediate assignment is to avoid leaking `this`
// to it, while also keeping the stack trace neat when it shows up in there.
const callback = timer.callback;
callback();
}
function fireTimers() {
const now = getTime();
// Bail out if we're not expecting the global timer to fire (yet).
if (globalTimeoutDue === null || now < globalTimeoutDue) {
return;
}
// After firing the timers that are due now, this will hold the due time of
// the first timer that hasn't fired yet.
let nextTimerDue: number | null = null;
// Walk over the keys of the 'due' map. Since dueMap is actually a regular
// object and its keys are numerical and smaller than UINT32_MAX - 2,
// keys are iterated in ascending order.
for (const key in dueMap) {
// Convert the object key (a string) to a number.
const due = Number(key);
// Break out of the loop if the next timer isn't due to fire yet.
if (Number(due) > now) {
nextTimerDue = due;
break;
}
// Get the list of timers that have this due time, then drop it.
const list = dueMap[key];
delete dueMap[key];
// Fire all the timers in the list.
for (const timer of list) {
// With the list dropped, the timer is no longer scheduled.
timer.scheduled = false;
// Place the callback on the microtask queue.
Promise.resolve(timer).then(fire);
}
}
// Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due.
setGlobalTimeout(nextTimerDue, now);
}
function setTimer<Args extends Array<unknown>>(
cb: (...args: Args) => void,
delay: number,
// tslint:disable-next-line:no-any
...args: any[]
args: Args,
repeat: boolean
): number {
const id = nextTimerId++;
startTimer(id, cb, delay, false, args);
return id;
// If any `args` were provided (which is uncommon), bind them to the callback.
const callback: () => void = args.length === 0 ? cb : cb.bind(null, ...args);
// In the browser, the delay value must be coercable to an integer between 0
// and INT32_MAX. Any other value will cause the timer to fire immediately.
// We emulate this behavior.
const now = getTime();
delay = Math.max(0, delay | 0);
// Create a new, unscheduled timer object.
const timer = {
id: nextTimerId++,
callback,
args,
delay,
due: now + delay,
repeat,
scheduled: false
};
// Register the timer's existence in the id-to-timer map.
idMap.set(timer.id, timer);
// Schedule the timer in the due table.
schedule(timer, now);
return timer.id;
}
export function setInterval(
cb: TimerCallback,
export function setTimeout<Args extends Array<unknown>>(
cb: (...args: Args) => void,
delay: number,
// tslint:disable-next-line:no-any
...args: any[]
...args: Args
): number {
const id = nextTimerId++;
startTimer(id, cb, delay, true, args);
return id;
return setTimer(cb, delay, args, false);
}
export function clearTimer(id: number) {
const builder = new flatbuffers.Builder();
fbs.TimerClear.startTimerClear(builder);
fbs.TimerClear.addId(builder, id);
const msg = fbs.TimerClear.endTimerClear(builder);
const res = sendSync(builder, fbs.Any.TimerClear, msg);
assert(res == null);
export function setInterval<Args extends Array<unknown>>(
cb: (...args: Args) => void,
delay: number,
...args: Args
): number {
return setTimer(cb, delay, args, true);
}
export function clearTimer(id: number): void {
const timer = idMap.get(id);
if (timer === undefined) {
// Timer doesn't exist any more or never existed. This is not an error.
return;
}
// Unschedule the timer if it is currently scheduled, and forget about it.
unschedule(timer);
idMap.delete(timer.id);
}

View file

@ -1,5 +1,4 @@
import { test, assertEqual } from "./test_util.ts";
import { setGlobalTimeout } from "deno";
function deferred() {
let resolve;
@ -96,7 +95,3 @@ test(async function intervalCancelInvalidSilentFail() {
// Should silently fail (no panic)
clearInterval(2147483647);
});
test(async function SetGlobalTimeoutSmoke() {
setGlobalTimeout(50);
});

View file

@ -5,6 +5,7 @@ use errors::DenoError;
use errors::DenoResult;
use fs as deno_fs;
use isolate::Buf;
use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
@ -47,7 +48,7 @@ fn empty_buf() -> Buf {
}
pub fn msg_from_js(
state: Arc<IsolateState>,
isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {
@ -55,38 +56,47 @@ pub fn msg_from_js(
let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
let handler: Handler = match msg_type {
msg::Any::Start => handle_start,
msg::Any::CodeFetch => handle_code_fetch,
msg::Any::CodeCache => handle_code_cache,
msg::Any::SetTimeout => handle_set_timeout,
msg::Any::Environ => handle_env,
msg::Any::FetchReq => handle_fetch_req,
msg::Any::TimerStart => handle_timer_start,
msg::Any::TimerClear => handle_timer_clear,
msg::Any::MakeTempDir => handle_make_temp_dir,
msg::Any::Mkdir => handle_mkdir,
msg::Any::Open => handle_open,
msg::Any::Read => handle_read,
msg::Any::Write => handle_write,
msg::Any::Remove => handle_remove,
msg::Any::ReadFile => handle_read_file,
msg::Any::Rename => handle_rename,
msg::Any::Readlink => handle_read_link,
msg::Any::Symlink => handle_symlink,
msg::Any::SetEnv => handle_set_env,
msg::Any::Stat => handle_stat,
msg::Any::Truncate => handle_truncate,
msg::Any::WriteFile => handle_write_file,
msg::Any::Exit => handle_exit,
msg::Any::CopyFile => handle_copy_file,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(msg_type)
)),
let op: Box<Op> = if msg_type == msg::Any::SetTimeout {
// SetTimeout is an exceptional op: the global timeout field is part of the
// Isolate state (not the IsolateState state) and it must be updated on the
// main thread.
assert_eq!(is_sync, true);
handle_set_timeout(isolate, &base, data)
} else {
// Handle regular ops.
let handler: Handler = match msg_type {
msg::Any::Start => handle_start,
msg::Any::CodeFetch => handle_code_fetch,
msg::Any::CodeCache => handle_code_cache,
msg::Any::Environ => handle_env,
msg::Any::FetchReq => handle_fetch_req,
msg::Any::TimerStart => handle_timer_start,
msg::Any::TimerClear => handle_timer_clear,
msg::Any::MakeTempDir => handle_make_temp_dir,
msg::Any::Mkdir => handle_mkdir,
msg::Any::Open => handle_open,
msg::Any::Read => handle_read,
msg::Any::Write => handle_write,
msg::Any::Remove => handle_remove,
msg::Any::ReadFile => handle_read_file,
msg::Any::Rename => handle_rename,
msg::Any::Readlink => handle_read_link,
msg::Any::Symlink => handle_symlink,
msg::Any::SetEnv => handle_set_env,
msg::Any::Stat => handle_stat,
msg::Any::Truncate => handle_truncate,
msg::Any::WriteFile => handle_write_file,
msg::Any::Exit => handle_exit,
msg::Any::CopyFile => handle_copy_file,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(msg_type)
)),
};
handler(isolate.state.clone(), &base, data)
};
let op: Box<Op> = handler(state.clone(), &base, data);
let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> DenoResult<Buf> {
debug!("op err {}", err);
@ -274,16 +284,18 @@ fn handle_code_cache(
}
fn handle_set_timeout(
state: Arc<IsolateState>,
isolate: &mut Isolate,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let msg = base.msg_as_set_timeout().unwrap();
let val = msg.timeout() as isize;
state
.timeout
.swap(val, std::sync::atomic::Ordering::Relaxed);
let val = msg.timeout() as i64;
isolate.timeout_due = if val >= 0 {
Some(Instant::now() + Duration::from_millis(val as u64))
} else {
None
};
ok_future(empty_buf())
}

View file

@ -16,10 +16,11 @@ use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
use std::sync::atomic::AtomicIsize;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
use tokio_util;
@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch =
fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8])
-> (bool, Box<Op>);
pub struct Isolate {
@ -45,13 +46,13 @@ pub struct Isolate {
dispatch: Dispatch,
rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
pub timeout_due: Option<Instant>,
pub state: Arc<IsolateState>,
}
// Isolate cannot be passed between threads but IsolateState can. So any state that
// needs to be accessed outside the main V8 thread should be inside IsolateState.
pub struct IsolateState {
pub timeout: AtomicIsize,
pub dir: deno_dir::DenoDir,
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
@ -88,8 +89,8 @@ impl Isolate {
dispatch,
rx,
ntasks: 0,
timeout_due: None,
state: Arc::new(IsolateState {
timeout: AtomicIsize::new(-1),
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
timers: Mutex::new(HashMap::new()),
argv: argv_rest,
@ -139,17 +140,54 @@ impl Isolate {
unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
fn complete_op(&mut self, req_id: i32, buf: Buf) {
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
self.respond(req_id, buf);
}
fn timeout(&self) {
let dummy_buf = libdeno::deno_buf {
alloc_ptr: 0 as *mut u8,
alloc_len: 0,
data_ptr: 0 as *mut u8,
data_len: 0,
};
unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) }
}
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
// does not have new_with_park().
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
let (req_id, buf) = self.rx.recv().unwrap();
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
self.respond(req_id, buf);
// Ideally, mpsc::Receiver would have a receive method that takes a optional
// timeout. But it doesn't so we need all this duplicate code.
match self.timeout_due {
Some(due) => {
// Subtracting two Instants causes a panic if the resulting duration
// would become negative. Avoid this.
let now = Instant::now();
let timeout = if due > now {
due - now
} else {
Duration::new(0, 0)
};
// TODO: use recv_deadline() instead of recv_timeout() when this
// feature becomes stable/available.
match self.rx.recv_timeout(timeout) {
Ok((req_id, buf)) => self.complete_op(req_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e),
}
}
None => match self.rx.recv() {
Ok((req_id, buf)) => self.complete_op(req_id, buf),
Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e),
},
};
}
}
@ -164,7 +202,7 @@ impl Isolate {
}
fn is_idle(&self) -> bool {
self.ntasks == 0
self.ntasks == 0 && self.timeout_due.is_none()
}
}
@ -212,8 +250,7 @@ extern "C" fn pre_dispatch(
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
let (is_sync, op) =
dispatch(isolate.state.clone(), control_slice, data_slice);
let (is_sync, op) = dispatch(isolate, control_slice, data_slice);
if is_sync {
// Execute op synchronously.
@ -258,7 +295,7 @@ mod tests {
}
fn unreachable_dispatch(
_state: Arc<IsolateState>,
_isolate: &mut Isolate,
_control: &[u8],
_data: &'static mut [u8],
) -> (bool, Box<Op>) {
@ -289,7 +326,7 @@ mod tests {
}
fn dispatch_sync(
_state: Arc<IsolateState>,
_isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {

View file

@ -128,7 +128,7 @@ table CodeCache {
}
table SetTimeout {
timeout: int;
timeout: double;
}
table Exit {