diff --git a/js/errors.ts b/js/errors.ts index 11d4cd509a..2d0572e6da 100644 --- a/js/errors.ts +++ b/js/errors.ts @@ -10,8 +10,17 @@ export class DenoError extends Error { // @internal export function maybeThrowError(base: fbs.Base): void { - const kind = base.errorKind(); - if (kind !== fbs.ErrorKind.NoError) { - throw new DenoError(kind, base.error()!); + const err = maybeError(base); + if (err != null) { + throw err; + } +} + +export function maybeError(base: fbs.Base): null | DenoError { + const kind = base.errorKind(); + if (kind === fbs.ErrorKind.NoError) { + return null; + } else { + return new DenoError(kind, base.error()!); } } diff --git a/js/fbs_util.ts b/js/fbs_util.ts index bb623d54d1..16f3b6ca20 100644 --- a/js/fbs_util.ts +++ b/js/fbs_util.ts @@ -1,27 +1,77 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. +// TODO Rename this file to //js/dispatch.ts import { libdeno } from "./libdeno"; import { flatbuffers } from "flatbuffers"; -import { maybeThrowError } from "./errors"; import { deno as fbs } from "gen/msg_generated"; +import * as errors from "./errors"; +import * as util from "./util"; +let nextCmdId = 0; +const promiseTable = new Map>(); + +export function handleAsyncMsgFromRust(ui8: Uint8Array) { + const bb = new flatbuffers.ByteBuffer(ui8); + const base = fbs.Base.getRootAsBase(bb); + + const cmdId = base.cmdId(); + const promise = promiseTable.get(cmdId); + util.assert(promise != null, `Expecting promise in table. ${cmdId}`); + promiseTable.delete(cmdId); + const err = errors.maybeError(base); + if (err != null) { + promise!.reject(err); + } else { + promise!.resolve(base); + } +} + +// @internal +export function sendAsync( + builder: flatbuffers.Builder, + msgType: fbs.Any, + msg: flatbuffers.Offset +): Promise { + const [cmdId, resBuf] = sendInternal(builder, msgType, msg, false); + util.assert(resBuf == null); + const promise = util.createResolvable(); + promiseTable.set(cmdId, promise); + return promise; +} + +// TODO Rename to sendSync // @internal export function send( builder: flatbuffers.Builder, msgType: fbs.Any, msg: flatbuffers.Offset ): null | fbs.Base { - fbs.Base.startBase(builder); - fbs.Base.addMsg(builder, msg); - fbs.Base.addMsgType(builder, msgType); - builder.finish(fbs.Base.endBase(builder)); - - const resBuf = libdeno.send(builder.asUint8Array()); + const [cmdId, resBuf] = sendInternal(builder, msgType, msg, true); + util.assert(cmdId >= 0); if (resBuf == null) { return null; } else { - const bb = new flatbuffers.ByteBuffer(new Uint8Array(resBuf!)); + const u8 = new Uint8Array(resBuf!); + // console.log("recv sync message", util.hexdump(u8)); + const bb = new flatbuffers.ByteBuffer(u8); const baseRes = fbs.Base.getRootAsBase(bb); - maybeThrowError(baseRes); + errors.maybeThrowError(baseRes); return baseRes; } } + +function sendInternal( + builder: flatbuffers.Builder, + msgType: fbs.Any, + msg: flatbuffers.Offset, + sync = true +): [number, null | Uint8Array] { + const cmdId = nextCmdId++; + fbs.Base.startBase(builder); + fbs.Base.addMsg(builder, msg); + fbs.Base.addMsgType(builder, msgType); + fbs.Base.addSync(builder, sync); + fbs.Base.addCmdId(builder, cmdId); + builder.finish(fbs.Base.endBase(builder)); + + return [cmdId, libdeno.send(builder.asUint8Array())]; +} diff --git a/js/fetch.ts b/js/fetch.ts index 2d57fa65c2..20af1d03d6 100644 --- a/js/fetch.ts +++ b/js/fetch.ts @@ -8,7 +8,7 @@ import { notImplemented } from "./util"; import { flatbuffers } from "flatbuffers"; -import { send } from "./fbs_util"; +import { sendAsync } from "./fbs_util"; import { deno as fbs } from "gen/msg_generated"; import { Headers, @@ -20,16 +20,6 @@ import { } from "./fetch_types"; import { TextDecoder } from "./text_encoding"; -/** @internal */ -export function onFetchRes(base: fbs.Base, msg: fbs.FetchRes) { - const id = msg.id(); - const req = fetchRequests.get(id); - assert(req != null, `Couldn't find FetchRequest id ${id}`); - req!.onMsg(base, msg); -} - -const fetchRequests = new Map(); - class DenoHeaders implements Headers { append(name: string, value: string): void { assert(false, "Implement me"); @@ -58,10 +48,9 @@ class DenoHeaders implements Headers { } class FetchResponse implements Response { - readonly url: string; + readonly url: string = ""; body: null; bodyUsed = false; // TODO - status = 0; statusText = "FIXME"; // TODO readonly type = "basic"; // TODO redirected = false; // TODO @@ -71,10 +60,12 @@ class FetchResponse implements Response { private first = true; private bodyWaiter: Resolvable; - constructor(readonly req: FetchRequest) { - this.url = req.url; + constructor(readonly status: number, readonly body_: ArrayBuffer) { this.bodyWaiter = createResolvable(); this.trailer = createResolvable(); + setTimeout(() => { + this.bodyWaiter.resolve(body_); + }, 0); } arrayBuffer(): Promise { @@ -114,78 +105,50 @@ class FetchResponse implements Response { onHeader?: (res: FetchResponse) => void; onError?: (error: Error) => void; - onMsg(base: fbs.Base, msg: fbs.FetchRes) { + onMsg(base: fbs.Base) { + /* const error = base.error(); if (error != null) { assert(this.onError != null); this.onError!(new Error(error)); return; } + */ if (this.first) { this.first = false; - this.status = msg.status(); - assert(this.onHeader != null); - this.onHeader!(this); - } else { - // Body message. Assuming it all comes in one message now. - const bodyArray = msg.bodyArray(); - assert(bodyArray != null); - const ab = typedArrayToArrayBuffer(bodyArray!); - this.bodyWaiter.resolve(ab); } } } -let nextFetchId = 0; -//TODO implements Request -class FetchRequest { - private readonly id: number; - response: FetchResponse; - constructor(readonly url: string) { - this.id = nextFetchId++; - fetchRequests.set(this.id, this); - this.response = new FetchResponse(this); - } - - onMsg(base: fbs.Base, msg: fbs.FetchRes) { - this.response.onMsg(base, msg); - } - - destroy() { - fetchRequests.delete(this.id); - } - - start() { - log("dispatch FETCH_REQ", this.id, this.url); - - // Send FetchReq message - const builder = new flatbuffers.Builder(); - const url = builder.createString(this.url); - fbs.FetchReq.startFetchReq(builder); - fbs.FetchReq.addId(builder, this.id); - fbs.FetchReq.addUrl(builder, url); - const msg = fbs.FetchReq.endFetchReq(builder); - send(builder, fbs.Any.FetchReq, msg); - } -} - -export function fetch( +export async function fetch( input?: Request | string, init?: RequestInit ): Promise { - const fetchReq = new FetchRequest(input as string); - const response = fetchReq.response; - const promise = new Promise((resolve, reject) => { - response.onHeader = (response: FetchResponse) => { - log("onHeader"); - resolve(response); - }; - response.onError = (error: Error) => { - log("onError", error); - reject(error); - }; - }); - fetchReq.start(); - return promise; + const url = input as string; + log("dispatch FETCH_REQ", url); + + // Send FetchReq message + const builder = new flatbuffers.Builder(); + const url_ = builder.createString(url); + fbs.FetchReq.startFetchReq(builder); + fbs.FetchReq.addUrl(builder, url_); + const resBase = await sendAsync( + builder, + fbs.Any.FetchReq, + fbs.FetchReq.endFetchReq(builder) + ); + + // Decode FetchRes + assert(fbs.Any.FetchRes === resBase.msgType()); + const msg = new fbs.FetchRes(); + assert(resBase.msg(msg) != null); + + const status = msg.status(); + const bodyArray = msg.bodyArray(); + assert(bodyArray != null); + const body = typedArrayToArrayBuffer(bodyArray!); + + const response = new FetchResponse(status, body); + return response; } diff --git a/js/main.ts b/js/main.ts index 538d0cc27a..e45562d99b 100644 --- a/js/main.ts +++ b/js/main.ts @@ -5,10 +5,8 @@ import { assert, log, setLogDebug } from "./util"; import * as os from "./os"; import { DenoCompiler } from "./compiler"; import { libdeno } from "./libdeno"; -import * as timers from "./timers"; -import { onFetchRes } from "./fetch"; import { argv } from "./deno"; -import { send } from "./fbs_util"; +import { send, handleAsyncMsgFromRust } from "./fbs_util"; function sendStart(): fbs.StartRes { const builder = new flatbuffers.Builder(); @@ -22,29 +20,6 @@ function sendStart(): fbs.StartRes { return startRes; } -function onMessage(ui8: Uint8Array) { - const bb = new flatbuffers.ByteBuffer(ui8); - const base = fbs.Base.getRootAsBase(bb); - switch (base.msgType()) { - case fbs.Any.FetchRes: { - const msg = new fbs.FetchRes(); - assert(base.msg(msg) != null); - onFetchRes(base, msg); - break; - } - case fbs.Any.TimerReady: { - const msg = new fbs.TimerReady(); - assert(base.msg(msg) != null); - timers.onMessage(msg); - break; - } - default: { - assert(false, "Unhandled message type"); - break; - } - } -} - function onGlobalError( message: string, source: string, @@ -58,7 +33,7 @@ function onGlobalError( /* tslint:disable-next-line:no-default-export */ export default function denoMain() { - libdeno.recv(onMessage); + libdeno.recv(handleAsyncMsgFromRust); libdeno.setGlobalErrorHandler(onGlobalError); const compiler = DenoCompiler.instance(); diff --git a/js/timers.ts b/js/timers.ts index d09af02959..6b23c64f1a 100644 --- a/js/timers.ts +++ b/js/timers.ts @@ -3,7 +3,7 @@ import { assert } from "./util"; import * as util from "./util"; import { deno as fbs } from "gen/msg_generated"; import { flatbuffers } from "flatbuffers"; -import { send } from "./fbs_util"; +import { send, sendAsync } from "./fbs_util"; let nextTimerId = 1; @@ -19,50 +19,51 @@ interface Timer { delay: number; // milliseconds } -const timers = new Map(); - -/** @internal */ -export function onMessage(msg: fbs.TimerReady) { - const timerReadyId = msg.id(); - const timerReadyDone = msg.done(); - const timer = timers.get(timerReadyId); - if (!timer) { - return; - } - timer.cb(...timer.args); - if (timerReadyDone) { - timers.delete(timerReadyId); - } -} - function startTimer( + id: number, cb: TimerCallback, delay: number, interval: boolean, // tslint:disable-next-line:no-any args: any[] -): number { - const timer = { - id: nextTimerId++, +): void { + const timer: Timer = { + id, interval, delay, args, cb }; - timers.set(timer.id, timer); - util.log("timers.ts startTimer"); // Send TimerStart message const builder = new flatbuffers.Builder(); fbs.TimerStart.startTimerStart(builder); fbs.TimerStart.addId(builder, timer.id); - fbs.TimerStart.addInterval(builder, timer.interval); fbs.TimerStart.addDelay(builder, timer.delay); const msg = fbs.TimerStart.endTimerStart(builder); - const baseRes = send(builder, fbs.Any.TimerStart, msg); - assert(baseRes == null); - return timer.id; + + sendAsync(builder, fbs.Any.TimerStart, msg).then( + baseRes => { + assert(fbs.Any.TimerReady === baseRes!.msgType()); + const msg = new fbs.TimerReady(); + assert(baseRes!.msg(msg) != null); + assert(msg.id() === timer.id); + if (msg.canceled()) { + util.log("timer canceled message"); + } else { + cb(...args); + if (interval) { + // TODO Faking setInterval with setTimeout. + // We need a new timer implementation, this is just a stopgap. + startTimer(id, cb, delay, true, args); + } + } + }, + error => { + throw error; + } + ); } export function setTimeout( @@ -71,7 +72,9 @@ export function setTimeout( // tslint:disable-next-line:no-any ...args: any[] ): number { - return startTimer(cb, delay, false, args); + const id = nextTimerId++; + startTimer(id, cb, delay, false, args); + return id; } export function setInterval( @@ -80,12 +83,12 @@ export function setInterval( // tslint:disable-next-line:no-any ...args: any[] ): number { - return startTimer(cb, delay, true, args); + const id = nextTimerId++; + startTimer(id, cb, delay, true, args); + return id; } export function clearTimer(id: number) { - timers.delete(id); - const builder = new flatbuffers.Builder(); fbs.TimerClear.startTimerClear(builder); fbs.TimerClear.addId(builder, id); diff --git a/js/util.ts b/js/util.ts index 6971ca1b90..efe0dcf84c 100644 --- a/js/util.ts +++ b/js/util.ts @@ -83,3 +83,11 @@ export function notImplemented(): never { export function unreachable(): never { throw new Error("Code not reachable"); } + +export function hexdump(u8: Uint8Array): string { + return Array.prototype.map + .call(u8, (x: number) => { + return ("00" + x.toString(16)).slice(-2); + }) + .join(" "); +} diff --git a/libdeno/binding.cc b/libdeno/binding.cc index 7a79bf1998..8ca34684b0 100644 --- a/libdeno/binding.cc +++ b/libdeno/binding.cc @@ -11,6 +11,14 @@ #include "deno.h" #include "internal.h" +void hexdump(const uint8_t* buf, size_t len) { + for (size_t i = 0; i < len; ++i) { + char ch = buf[i]; + printf("%02x ", ch & 0xff); + } + printf("\n"); +} + namespace deno { Deno* FromIsolate(v8::Isolate* isolate) { @@ -429,6 +437,8 @@ int deno_send(Deno* d, deno_buf buf) { } void deno_set_response(Deno* d, deno_buf buf) { + // printf("deno_set_response: "); + // hexdump(buf.data_ptr, buf.data_len); auto ab = deno::ImportBuf(d->isolate, buf); d->currentArgs->GetReturnValue().Set(ab); } diff --git a/src/handlers.rs b/src/handlers.rs index f0a15186d9..477d347747 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,4 +1,5 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. +use errors::DenoError; use errors::DenoResult; use flatbuffers::FlatBufferBuilder; use from_c; @@ -16,19 +17,29 @@ use std::fs; use std::path::Path; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; -use tokio::prelude::future; -use tokio::prelude::*; -use tokio::timer::{Delay, Interval}; +use tokio::timer::Delay; -type HandlerResult = DenoResult; -type Handler = - fn(d: *const DenoC, base: msg::Base, builder: &mut FlatBufferBuilder) - -> HandlerResult; +// Buf represents a byte array returned from a "Op". +// The message might be empty (which will be translated into a null object on +// the javascript side) or it is a heap allocated opaque sequence of bytes. +// Usually a flatbuffer message. +type Buf = Option>; + +// JS promises in Deno map onto a specific Future +// which yields either a DenoError or a byte array. +type Op = Future; + +type OpResult = DenoResult; + +// TODO Ideally we wouldn't have to box the Op being returned. +// The box is just to make it easier to get a prototype refactor working. +type Handler = fn(d: *const DenoC, base: &msg::Base) -> Box; pub extern "C" fn msg_from_js(d: *const DenoC, buf: deno_buf) { let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; let base = msg::get_root_as_base(bytes); let msg_type = base.msg_type(); + let cmd_id = base.cmd_id(); let handler: Handler = match msg_type { msg::Any::Start => handle_start, msg::Any::CodeFetch => handle_code_fetch, @@ -51,30 +62,59 @@ pub extern "C" fn msg_from_js(d: *const DenoC, buf: deno_buf) { )), }; - let builder = &mut FlatBufferBuilder::new(); - let result = handler(d, base, builder); + let future = handler(d, &base); + let future = future.or_else(move |err| { + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a deno_buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }); - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a deno_buf. - let buf = match result { - Err(ref err) => { - let errmsg_offset = builder.create_string(&format!("{}", err)); - create_msg( - builder, - &msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - ) + let deno = from_c(d); + if base.sync() { + // Execute future synchronously. + // println!("sync handler {}", msg::enum_name_any(msg_type)); + let maybe_box_u8 = future.wait().unwrap(); + match maybe_box_u8 { + None => {} + Some(box_u8) => { + let buf = deno_buf_from(box_u8); + // Set the synchronous response, the value returned from deno.send(). + unsafe { libdeno::deno_set_response(d, buf) } + } } - Ok(buf) => buf, - }; + } else { + // Execute future asynchornously. + let future = future.and_then(move |maybe_box_u8| { + let buf = match maybe_box_u8 { + Some(box_u8) => deno_buf_from(box_u8), + None => null_buf(), + }; + // TODO(ry) make this thread safe. + unsafe { libdeno::deno_send(d, buf) }; + Ok(()) + }); + deno.rt.spawn(future); + } +} - // Set the synchronous response, the value returned from deno.send(). - // null_buf is a special case that indicates success. - if buf != null_buf() { - unsafe { libdeno::deno_set_response(d, buf) } +fn deno_buf_from(x: Box<[u8]>) -> deno_buf { + let len = x.len(); + let ptr = Box::into_raw(x); + deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: ptr as *mut u8, + data_len: len, } } @@ -87,21 +127,21 @@ fn null_buf() -> deno_buf { } } -fn handle_exit( - _d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn permission_denied() -> DenoError { + DenoError::from(std::io::Error::new( + std::io::ErrorKind::PermissionDenied, + "permission denied", + )) +} + +fn handle_exit(_d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_exit().unwrap(); std::process::exit(msg.code()) } -fn handle_start( - d: *const DenoC, - _base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_start(d: *const DenoC, base: &msg::Base) -> Box { let deno = from_c(d); + let mut builder = FlatBufferBuilder::new(); let argv = deno.argv.iter().map(|s| s.as_str()).collect::>(); let argv_off = builder.create_vector_of_strings(argv.as_slice()); @@ -111,19 +151,19 @@ fn handle_start( builder.create_string(deno_fs::normalize_path(cwd_path.as_ref()).as_ref()); let msg = msg::StartRes::create( - builder, + &mut builder, &msg::StartResArgs { cwd: Some(cwd_off), argv: Some(argv_off), debug_flag: deno.flags.log_debug, - deps_flag: deno.flags.deps_flag, ..Default::default() }, ); - Ok(create_msg( - builder, - &msg::BaseArgs { + ok_future(create_msg( + base.cmd_id(), + &mut builder, + msg::BaseArgs { msg_type: msg::Any::StartRes, msg: Some(msg.as_union_value()), ..Default::default() @@ -132,122 +172,101 @@ fn handle_start( } fn create_msg( + cmd_id: u32, builder: &mut FlatBufferBuilder, - args: &msg::BaseArgs, -) -> deno_buf { + mut args: msg::BaseArgs, +) -> Buf { + args.cmd_id = cmd_id; let base = msg::Base::create(builder, &args); msg::finish_base_buffer(builder, base); let data = builder.finished_data(); - deno_buf { - // TODO(ry) - // The deno_buf / ImportBuf / ExportBuf semantics should be such that we do not need to yield - // ownership. Temporarally there is a hack in ImportBuf that when alloc_ptr is null, it will - // memcpy the deno_buf into V8 instead of doing zero copy. - alloc_ptr: 0 as *mut u8, - alloc_len: 0, - data_ptr: data.as_ptr() as *mut u8, - data_len: data.len(), - } + // println!("create_msg {:x?}", data); + let vec = data.to_vec(); + Some(vec.into_boxed_slice()) } -// TODO(ry) Use Deno instead of DenoC as first arg. -fn send_base( - d: *const DenoC, - builder: &mut FlatBufferBuilder, - args: &msg::BaseArgs, -) { - let buf = create_msg(builder, args); - unsafe { libdeno::deno_send(d, buf) } +fn ok_future(buf: Buf) -> Box { + Box::new(futures::future::ok(buf)) +} + +// Shout out to Earl Sweatshirt. +fn odd_future(err: DenoError) -> Box { + Box::new(futures::future::err(err)) } // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 -fn handle_code_fetch( - d: *const DenoC, - base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_code_fetch(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_code_fetch().unwrap(); + let cmd_id = base.cmd_id(); let module_specifier = msg.module_specifier().unwrap(); let containing_file = msg.containing_file().unwrap(); let deno = from_c(d); - assert!(deno.dir.root.join("gen") == deno.dir.gen, "Sanity check"); + assert_eq!(deno.dir.root.join("gen"), deno.dir.gen, "Sanity check"); - let out = deno.dir.code_fetch(module_specifier, containing_file)?; - let mut msg_args = msg::CodeFetchResArgs { - module_name: Some(builder.create_string(&out.module_name)), - filename: Some(builder.create_string(&out.filename)), - source_code: Some(builder.create_string(&out.source_code)), - ..Default::default() - }; - match out.maybe_output_code { - Some(ref output_code) => { - msg_args.output_code = Some(builder.create_string(output_code)); - } - _ => (), - }; - let msg = msg::CodeFetchRes::create(builder, &msg_args); - Ok(create_msg( - builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::CodeFetchRes, + Box::new(futures::future::result(|| -> OpResult { + let builder = &mut FlatBufferBuilder::new(); + let out = deno.dir.code_fetch(module_specifier, containing_file)?; + let mut msg_args = msg::CodeFetchResArgs { + module_name: Some(builder.create_string(&out.module_name)), + filename: Some(builder.create_string(&out.filename)), + source_code: Some(builder.create_string(&out.source_code)), ..Default::default() - }, - )) + }; + match out.maybe_output_code { + Some(ref output_code) => { + msg_args.output_code = Some(builder.create_string(output_code)); + } + _ => (), + }; + let msg = msg::CodeFetchRes::create(builder, &msg_args); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::CodeFetchRes, + ..Default::default() + }, + )) + }())) } // https://github.com/denoland/deno/blob/golang/os.go#L156-L169 -fn handle_code_cache( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_code_cache(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_code_cache().unwrap(); let filename = msg.filename().unwrap(); let source_code = msg.source_code().unwrap(); let output_code = msg.output_code().unwrap(); - let deno = from_c(d); - deno.dir.code_cache(filename, source_code, output_code)?; - Ok(null_buf()) // null response indicates success. + Box::new(futures::future::result(|| -> OpResult { + let deno = from_c(d); + deno.dir.code_cache(filename, source_code, output_code)?; + Ok(None) + }())) } -fn handle_set_env( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_set_env(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_set_env().unwrap(); let key = msg.key().unwrap(); let value = msg.value().unwrap(); let deno = from_c(d); if !deno.flags.allow_env { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_env is off.", - ); - return Err(err.into()); + return odd_future(permission_denied()); } std::env::set_var(key, value); - Ok(null_buf()) + ok_future(None) } -fn handle_env( - d: *const DenoC, - _base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_env(d: *const DenoC, base: &msg::Base) -> Box { let deno = from_c(d); + let cmd_id = base.cmd_id(); if !deno.flags.allow_env { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_env is off.", - ); - return Err(err.into()); + return odd_future(permission_denied()); } + let builder = &mut FlatBufferBuilder::new(); let vars: Vec<_> = std::env::vars() .map(|(key, value)| { let key = builder.create_string(&key); @@ -263,9 +282,7 @@ fn handle_env( ) }) .collect(); - let tables = builder.create_vector(&vars); - let msg = msg::EnvironRes::create( builder, &msg::EnvironResArgs { @@ -273,10 +290,10 @@ fn handle_env( ..Default::default() }, ); - - Ok(create_msg( + ok_future(create_msg( + cmd_id, builder, - &msg::BaseArgs { + msg::BaseArgs { msg: Some(msg.as_union_value()), msg_type: msg::Any::EnvironRes, ..Default::default() @@ -284,105 +301,53 @@ fn handle_env( )) } -fn handle_fetch_req( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { - let deno = from_c(d); - if !deno.flags.allow_net { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_net is off.", - ); - return Err(err.into()); - } +fn handle_fetch_req(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_fetch_req().unwrap(); + let cmd_id = base.cmd_id(); let id = msg.id(); let url = msg.url().unwrap(); + let deno = from_c(d); + + if !deno.flags.allow_net { + return odd_future(permission_denied()); + } + let url = url.parse::().unwrap(); let client = Client::new(); - deno.rt.spawn( - client - .get(url) - .map(move |res| { - let status = res.status().as_u16() as i32; + let future = client.get(url).and_then(|res| { + let status = res.status().as_u16() as i32; + // TODO Handle streaming body. + res.into_body().concat2().map(move |body| (status, body)) + }); - let mut builder = FlatBufferBuilder::new(); - // Send the first message without a body. This is just to indicate - // what status code. - let msg = msg::FetchRes::create( - &mut builder, - &msg::FetchResArgs { - id, - status, - ..Default::default() - }, - ); - send_base( - d, - &mut builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::FetchRes, - ..Default::default() - }, - ); - res - }) - .and_then(move |res| { - // Send the body as a FetchRes message. - res.into_body().concat2().map(move |body_buffer| { - let mut builder = FlatBufferBuilder::new(); - let data_off = builder.create_vector(body_buffer.as_ref()); - let msg = msg::FetchRes::create( - &mut builder, - &msg::FetchResArgs { - id, - body: Some(data_off), - ..Default::default() - }, - ); - send_base( - d, - &mut builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::FetchRes, - ..Default::default() - }, - ); - }) - }) - .map_err(move |err| { - let errmsg = format!("{}", err); - - // TODO This is obviously a lot of duplicated code from the success case. - // Leaving it here now jsut to get a first pass implementation, but this - // needs to be cleaned up. - let mut builder = FlatBufferBuilder::new(); - let err_off = builder.create_string(errmsg.as_str()); - let msg = msg::FetchRes::create( - &mut builder, - &msg::FetchResArgs { - id, - ..Default::default() - }, - ); - send_base( - d, - &mut builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::FetchRes, - error: Some(err_off), - ..Default::default() - }, - ); - }), + let future = future.map_err(|err| -> DenoError { err.into() }).and_then( + move |(status, body)| { + let builder = &mut FlatBufferBuilder::new(); + // Send the first message without a body. This is just to indicate + // what status code. + let body_off = builder.create_vector(body.as_ref()); + let msg = msg::FetchRes::create( + builder, + &msg::FetchResArgs { + id, + status, + body: Some(body_off), + ..Default::default() + }, + ); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::FetchRes, + ..Default::default() + }, + )) + }, ); - Ok(null_buf()) // null response indicates success. + Box::new(future) } fn set_timeout( @@ -410,146 +375,92 @@ where (delay_task, cancel_tx) } -fn set_interval( - cb: F, - delay: u32, -) -> ( - impl Future, - futures::sync::oneshot::Sender<()>, -) -where - F: Fn() -> (), -{ - let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); - let delay = Duration::from_millis(delay.into()); - let interval_task = future::lazy(move || { - Interval::new(Instant::now() + delay, delay) - .for_each(move |_| { - cb(); - future::ok(()) - }) - .into_future() - .map_err(|_| panic!()) - }).select(cancel_rx) - .map(|_| ()) - .map_err(|_| ()); - - (interval_task, cancel_tx) -} - -// TODO(ry) Use Deno instead of DenoC as first arg. -fn send_timer_ready(d: *const DenoC, timer_id: u32, done: bool) { - let mut builder = FlatBufferBuilder::new(); - let msg = msg::TimerReady::create( - &mut builder, - &msg::TimerReadyArgs { - id: timer_id, - done, - ..Default::default() - }, - ); - send_base( - d, - &mut builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::TimerReady, - ..Default::default() - }, - ); -} - -fn handle_make_temp_dir( - d: *const DenoC, - base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_make_temp_dir(d: *const DenoC, base: &msg::Base) -> Box { + let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); + let cmd_id = base.cmd_id(); let dir = msg.dir(); let prefix = msg.prefix(); let suffix = msg.suffix(); + let deno = from_c(d); if !deno.flags.allow_write { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_write is off.", - ); - return Err(err.into()); + return Box::new(futures::future::err(permission_denied())); } - // TODO(piscisaureus): use byte vector for paths, not a string. - // See https://github.com/denoland/deno/issues/627. - // We can't assume that paths are always valid utf8 strings. - let path = deno_fs::make_temp_dir(dir.map(Path::new), prefix, suffix)?; - let path_off = builder.create_string(path.to_str().unwrap()); - let msg = msg::MakeTempDirRes::create( - builder, - &msg::MakeTempDirResArgs { - path: Some(path_off), - ..Default::default() - }, - ); - Ok(create_msg( - builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::MakeTempDirRes, - ..Default::default() - }, - )) + // TODO Use blocking() here. + Box::new(futures::future::result(|| -> OpResult { + // TODO(piscisaureus): use byte vector for paths, not a string. + // See https://github.com/denoland/deno/issues/627. + // We can't assume that paths are always valid utf8 strings. + let path = deno_fs::make_temp_dir(dir.map(Path::new), prefix, suffix)?; + let builder = &mut FlatBufferBuilder::new(); + let path_off = builder.create_string(path.to_str().unwrap()); + let msg = msg::MakeTempDirRes::create( + builder, + &msg::MakeTempDirResArgs { + path: Some(path_off), + ..Default::default() + }, + ); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::MakeTempDirRes, + ..Default::default() + }, + )) + }())) } -fn handle_mkdir_sync( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_mkdir_sync(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_mkdir_sync().unwrap(); let path = msg.path().unwrap(); // TODO let mode = msg.mode(); let deno = from_c(d); - debug!("handle_mkdir_sync {}", path); - if deno.flags.allow_write { + if !deno.flags.allow_write { + return odd_future(permission_denied()); + } + + // TODO(ry) use blocking + Box::new(futures::future::result(|| -> OpResult { // TODO(ry) Use mode. deno_fs::mkdir(Path::new(path))?; - Ok(null_buf()) - } else { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_write is off.", - ); - Err(err.into()) - } + Ok(None) + }())) } // Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184 -fn handle_read_file_sync( - _d: *const DenoC, - base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_read_file_sync(_d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_read_file_sync().unwrap(); - let filename = msg.filename().unwrap(); - debug!("handle_read_file_sync {}", filename); - let vec = fs::read(Path::new(filename))?; - // Build the response message. memcpy data into msg. - // TODO(ry) zero-copy. - let data_off = builder.create_vector(vec.as_slice()); - let msg = msg::ReadFileSyncRes::create( - builder, - &msg::ReadFileSyncResArgs { - data: Some(data_off), - ..Default::default() - }, - ); - Ok(create_msg( - builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::ReadFileSyncRes, - ..Default::default() - }, - )) + let cmd_id = base.cmd_id(); + let filename = String::from(msg.filename().unwrap()); + Box::new(futures::future::result(|| -> OpResult { + debug!("handle_read_file_sync {}", filename); + let vec = fs::read(Path::new(&filename))?; + // Build the response message. memcpy data into msg. + // TODO(ry) zero-copy. + let builder = &mut FlatBufferBuilder::new(); + let data_off = builder.create_vector(vec.as_slice()); + let msg = msg::ReadFileSyncRes::create( + builder, + &msg::ReadFileSyncResArgs { + data: Some(data_off), + ..Default::default() + }, + ); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::ReadFileSyncRes, + ..Default::default() + }, + )) + }())) } macro_rules! to_seconds { @@ -562,145 +473,135 @@ macro_rules! to_seconds { }}; } -fn handle_stat_sync( - _d: *const DenoC, - base: msg::Base, - builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_stat_sync(_d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_stat_sync().unwrap(); - let filename = msg.filename().unwrap(); + let cmd_id = base.cmd_id(); + let filename = String::from(msg.filename().unwrap()); let lstat = msg.lstat(); - debug!("handle_stat_sync {} {}", filename, lstat); - let path = Path::new(filename); - let metadata = if lstat { - fs::symlink_metadata(path)? - } else { - fs::metadata(path)? - }; + Box::new(futures::future::result(|| -> OpResult { + let builder = &mut FlatBufferBuilder::new(); + debug!("handle_stat_sync {} {}", filename, lstat); + let path = Path::new(&filename); + let metadata = if lstat { + fs::symlink_metadata(path)? + } else { + fs::metadata(path)? + }; - let msg = msg::StatSyncRes::create( - builder, - &msg::StatSyncResArgs { - is_file: metadata.is_file(), - is_symlink: metadata.file_type().is_symlink(), - len: metadata.len(), - modified: to_seconds!(metadata.modified()), - accessed: to_seconds!(metadata.accessed()), - created: to_seconds!(metadata.created()), - ..Default::default() - }, - ); + let msg = msg::StatSyncRes::create( + builder, + &msg::StatSyncResArgs { + is_file: metadata.is_file(), + is_symlink: metadata.file_type().is_symlink(), + len: metadata.len(), + modified: to_seconds!(metadata.modified()), + accessed: to_seconds!(metadata.accessed()), + created: to_seconds!(metadata.created()), + ..Default::default() + }, + ); - Ok(create_msg( - builder, - &msg::BaseArgs { - msg: Some(msg.as_union_value()), - msg_type: msg::Any::StatSyncRes, - ..Default::default() - }, - )) + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::StatSyncRes, + ..Default::default() + }, + )) + }())) } -fn handle_write_file_sync( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_write_file_sync(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_write_file_sync().unwrap(); - let filename = msg.filename().unwrap(); + let filename = String::from(msg.filename().unwrap()); let data = msg.data().unwrap(); // TODO let perm = msg.perm(); let deno = from_c(d); debug!("handle_write_file_sync {}", filename); - if deno.flags.allow_write { - // TODO(ry) Use perm. - deno_fs::write_file_sync(Path::new(filename), data)?; - Ok(null_buf()) - } else { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_write is off.", - ); - Err(err.into()) - } + Box::new(futures::future::result(|| -> OpResult { + if !deno.flags.allow_write { + Err(permission_denied()) + } else { + // TODO(ry) Use perm. + deno_fs::write_file_sync(Path::new(&filename), data)?; + Ok(None) + } + }())) } // TODO(ry) Use Deno instead of DenoC as first arg. fn remove_timer(d: *const DenoC, timer_id: u32) { let deno = from_c(d); + assert!(deno.timers.contains_key(&timer_id)); deno.timers.remove(&timer_id); + assert!(!deno.timers.contains_key(&timer_id)); } // Prototype: https://github.com/ry/deno/blob/golang/timers.go#L25-L39 -fn handle_timer_start( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_timer_start(d: *const DenoC, base: &msg::Base) -> Box { debug!("handle_timer_start"); let msg = base.msg_as_timer_start().unwrap(); + let cmd_id = base.cmd_id(); let timer_id = msg.id(); - let interval = msg.interval(); let delay = msg.delay(); let deno = from_c(d); - if interval { - let (interval_task, cancel_interval) = set_interval( - move || { - send_timer_ready(d, timer_id, false); - }, - delay, - ); - - deno.timers.insert(timer_id, cancel_interval); - deno.rt.spawn(interval_task); - } else { + let future = { let (delay_task, cancel_delay) = set_timeout( move || { remove_timer(d, timer_id); - send_timer_ready(d, timer_id, true); }, delay, ); - deno.timers.insert(timer_id, cancel_delay); - deno.rt.spawn(delay_task); - } - Ok(null_buf()) + delay_task + }; + Box::new(future.then(move |result| { + let builder = &mut FlatBufferBuilder::new(); + let msg = msg::TimerReady::create( + builder, + &msg::TimerReadyArgs { + id: timer_id, + canceled: result.is_err(), + ..Default::default() + }, + ); + Ok(create_msg( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::TimerReady, + ..Default::default() + }, + )) + })) } // Prototype: https://github.com/ry/deno/blob/golang/timers.go#L40-L43 -fn handle_timer_clear( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { +fn handle_timer_clear(d: *const DenoC, base: &msg::Base) -> Box { let msg = base.msg_as_timer_clear().unwrap(); debug!("handle_timer_clear"); remove_timer(d, msg.id()); - Ok(null_buf()) + ok_future(None) } -fn handle_rename_sync( - d: *const DenoC, - base: msg::Base, - _builder: &mut FlatBufferBuilder, -) -> HandlerResult { - let msg = base.msg_as_rename_sync().unwrap(); - let oldpath = msg.oldpath().unwrap(); - let newpath = msg.newpath().unwrap(); +fn handle_rename_sync(d: *const DenoC, base: &msg::Base) -> Box { let deno = from_c(d); - - debug!("handle_rename_sync {} {}", oldpath, newpath); if !deno.flags.allow_write { - let err = std::io::Error::new( - std::io::ErrorKind::PermissionDenied, - "allow_write is off.", - ); - return Err(err.into()); - } - fs::rename(Path::new(oldpath), Path::new(newpath))?; - Ok(null_buf()) + return Box::new(futures::future::err(permission_denied())); + }; + let msg = base.msg_as_rename_sync().unwrap(); + let oldpath = String::from(msg.oldpath().unwrap()); + let newpath = String::from(msg.newpath().unwrap()); + // TODO use blocking() + Box::new(futures::future::result(|| -> OpResult { + debug!("handle_rename {} {}", oldpath, newpath); + fs::rename(Path::new(&oldpath), Path::new(&newpath))?; + Ok(None) + }())) } diff --git a/src/msg.fbs b/src/msg.fbs index 6807a44f85..4afe947d56 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -74,6 +74,7 @@ enum ErrorKind: byte { table Base { cmd_id: uint32; + sync: bool = true; // TODO(ry) Change default to false. error_kind: ErrorKind = NoError; error: string; msg: Any; @@ -117,13 +118,12 @@ table Exit { table TimerStart { id: uint; - interval: bool; delay: uint; } table TimerReady { id: uint; - done: bool; + canceled: bool; } table TimerClear {