From 0095611af98d3039e30ff44444ab83f65bcec554 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Aug 2020 09:45:59 -0400 Subject: [PATCH] First pass at json ops in core (#7033) Adds Deno.core.jsonOpSync and Deno.core.jsonOpAsync --- core/core.js | 57 +++++++++++++++++++++++++++++++++ core/core_isolate.rs | 63 +++++++++++++++++++++++++++++++++++++ core/examples/http_bench.js | 13 ++++---- core/examples/http_bench.rs | 37 +++++++++++++--------- core/lib.rs | 1 + 5 files changed, 151 insertions(+), 20 deletions(-) diff --git a/core/core.js b/core/core.js index 5f9d6f9819..099472614c 100644 --- a/core/core.js +++ b/core/core.js @@ -201,7 +201,64 @@ SharedQueue Binary Layout return errorClass; } + // Returns Uint8Array + function encodeJson(args) { + const s = JSON.stringify(args); + return core.encode(s); + } + + function decodeJson(ui8) { + const s = Deno.core.decode(ui8); + return JSON.parse(s); + } + + let nextPromiseId = 1; + const promiseTable = {}; + + function jsonOpAsync(opName, args) { + setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); + + args.promiseId = nextPromiseId++; + const argsBuf = encodeJson(args); + dispatch(opName, argsBuf); + let resolve, reject; + const promise = new Promise((resolve_, reject_) => { + resolve = resolve_; + reject = reject_; + }); + promise.resolve = resolve; + promise.reject = reject; + promiseTable[args.promiseId] = promise; + return promise; + } + + function jsonOpSync(opName, args) { + const argsBuf = encodeJson(args); + const res = dispatch(opName, argsBuf); + const r = decodeJson(res); + if (r["ok"]) { + return r["ok"]; + } else { + throw r["err"]; + } + } + + function jsonOpAsyncHandler(buf) { + // Json Op. + const msg = decodeJson(buf); + const { ok, err, promiseId } = msg; + const promise = promiseTable[promiseId]; + delete promiseTable[promiseId]; + if (ok) { + promise.resolve(ok); + } else { + promise.reject(err); + } + } + Object.assign(window.Deno.core, { + jsonOpAsync, + jsonOpSync, setAsyncHandler, dispatch: send, dispatchByName: dispatch, diff --git a/core/core_isolate.rs b/core/core_isolate.rs index 18539395f2..160e37f1d6 100644 --- a/core/core_isolate.rs +++ b/core/core_isolate.rs @@ -20,6 +20,8 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; use futures::Future; +use serde_json::json; +use serde_json::Value; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; @@ -429,6 +431,50 @@ impl CoreIsolate { state.op_registry.register(name, op) } + pub fn register_op_json_sync(&mut self, name: &str, op: F) -> OpId + where + F: 'static + + Fn( + &mut CoreIsolateState, + serde_json::Value, + &mut [ZeroCopyBuf], + ) -> Result, + { + let core_op = + move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op { + let value = serde_json::from_slice(&bufs[0]).unwrap(); + let result = op(state, value, &mut bufs[1..]); + let buf = serialize_result(None, result); + Op::Sync(buf) + }; + + let state_rc = Self::state(self); + let mut state = state_rc.borrow_mut(); + state.op_registry.register(name, core_op) + } + + pub fn register_op_json_async(&mut self, name: &str, op: F) -> OpId + where + Fut: 'static + Future>, + F: 'static + + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut, + { + let core_op = move |state: &mut CoreIsolateState, + bufs: &mut [ZeroCopyBuf]| + -> Op { + let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap(); + let promise_id = value.get("promiseId").unwrap().as_u64().unwrap(); + let fut = op(state, value, &mut bufs[1..]); + let fut2 = + fut.map(move |result| serialize_result(Some(promise_id), result)); + Op::Async(Box::pin(fut2)) + }; + + let state_rc = Self::state(self); + let mut state = state_rc.borrow_mut(); + state.op_registry.register(name, core_op) + } + /// Registers a callback on the isolate when the memory limits are approached. /// Use this to prevent V8 from crashing the process when reaching the limit. /// @@ -484,6 +530,23 @@ where callback(current_heap_limit, initial_heap_limit) } +fn serialize_result( + promise_id: Option, + result: Result, +) -> Buf { + let value = match result { + Ok(v) => json!({ "ok": v, "promiseId": promise_id }), + Err(err) => json!({ + "promiseId": promise_id , + "err": { + "message": err.to_string(), + "kind": "Other", // TODO(ry) Figure out how to propagate errors. + } + }), + }; + serde_json::to_vec(&value).unwrap().into_boxed_slice() +} + impl Future for CoreIsolate { type Output = Result<(), ErrBox>; diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index eba9bb6774..ac97e0d88b 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -80,12 +80,14 @@ function handleAsyncMsgFromRust(buf) { /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(ops["listen"], -1); + const { rid } = Deno.core.jsonOpSync("listen", {}); + return rid; } /** Accepts a connection, returns rid. */ -function accept(rid) { - return sendAsync(ops["accept"], rid); +async function accept(serverRid) { + const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid }); + return rid; } /** @@ -124,9 +126,8 @@ let ops; async function main() { ops = Deno.core.ops(); - for (const opName in ops) { - Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust); - } + Deno.core.setAsyncHandler(ops["read"], handleAsyncMsgFromRust); + Deno.core.setAsyncHandler(ops["write"], handleAsyncMsgFromRust); Deno.core.print("http_bench.js start\n"); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 3a11a35073..0dbb6f8e68 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -1,14 +1,17 @@ #[macro_use] extern crate log; +use deno_core::serde_json; use deno_core::CoreIsolate; use deno_core::CoreIsolateState; +use deno_core::ErrBox; use deno_core::Op; use deno_core::ResourceTable; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; +use futures::future::Future; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; @@ -152,8 +155,8 @@ pub fn isolate_new() -> CoreIsolate { isolate.register_op(name, core_handler); } - register_sync_op(&mut isolate, "listen", op_listen); - register_async_op(&mut isolate, "accept", op_accept); + isolate.register_op_json_sync("listen", op_listen); + isolate.register_op_json_async("accept", op_accept); register_async_op(&mut isolate, "read", op_read); register_async_op(&mut isolate, "write", op_write); register_sync_op(&mut isolate, "close", op_close); @@ -175,34 +178,40 @@ fn op_close( } fn op_listen( - resource_table: Rc>, - _rid: u32, + state: &mut CoreIsolateState, + _args: serde_json::Value, _buf: &mut [ZeroCopyBuf], -) -> Result { +) -> Result { debug!("listen"); let addr = "127.0.0.1:4544".parse::().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(rid) + Ok(serde_json::json!({ "rid": rid })) } fn op_accept( - resource_table: Rc>, - rid: u32, + state: &mut CoreIsolateState, + args: serde_json::Value, _buf: &mut [ZeroCopyBuf], -) -> impl TryFuture { +) -> impl Future> { + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; debug!("accept rid={}", rid); + let resource_table = state.resource_table.clone(); poll_fn(move |cx| { let resource_table = &mut resource_table.borrow_mut(); let listener = resource_table .get_mut::(rid) .ok_or_else(bad_resource)?; - listener.poll_accept(cx).map_ok(|(stream, _addr)| { - resource_table.add("tcpStream", Box::new(stream)) - }) + listener + .poll_accept(cx) + .map_err(ErrBox::from) + .map_ok(|(stream, _addr)| { + let rid = resource_table.add("tcpStream", Box::new(stream)); + serde_json::json!({ "rid": rid }) + }) }) } @@ -265,7 +274,7 @@ fn main() { .enable_all() .build() .unwrap(); - runtime.block_on(isolate).expect("unexpected isolate error"); + deno_core::js_check(runtime.block_on(isolate)); } #[test] diff --git a/core/lib.rs b/core/lib.rs index cd4a4eeee8..940e0c0269 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -53,6 +53,7 @@ pub use crate::ops::OpId; pub use crate::resources::ResourceTable; pub use crate::zero_copy_buf::BufVec; pub use crate::zero_copy_buf::ZeroCopyBuf; +pub use serde_json; pub fn v8_version() -> &'static str { v8::V8::get_version()