0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-01 20:25:12 -05:00

First pass at json ops in core (#7033)

Adds Deno.core.jsonOpSync and Deno.core.jsonOpAsync
This commit is contained in:
Ryan Dahl 2020-08-20 09:45:59 -04:00 committed by GitHub
parent be1e7ab532
commit 0095611af9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 20 deletions

View file

@ -201,7 +201,64 @@ SharedQueue Binary Layout
return errorClass;
}
// Returns Uint8Array
function encodeJson(args) {
const s = JSON.stringify(args);
return core.encode(s);
}
function decodeJson(ui8) {
const s = Deno.core.decode(ui8);
return JSON.parse(s);
}
let nextPromiseId = 1;
const promiseTable = {};
function jsonOpAsync(opName, args) {
setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
args.promiseId = nextPromiseId++;
const argsBuf = encodeJson(args);
dispatch(opName, argsBuf);
let resolve, reject;
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
promise.resolve = resolve;
promise.reject = reject;
promiseTable[args.promiseId] = promise;
return promise;
}
function jsonOpSync(opName, args) {
const argsBuf = encodeJson(args);
const res = dispatch(opName, argsBuf);
const r = decodeJson(res);
if (r["ok"]) {
return r["ok"];
} else {
throw r["err"];
}
}
function jsonOpAsyncHandler(buf) {
// Json Op.
const msg = decodeJson(buf);
const { ok, err, promiseId } = msg;
const promise = promiseTable[promiseId];
delete promiseTable[promiseId];
if (ok) {
promise.resolve(ok);
} else {
promise.reject(err);
}
}
Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
setAsyncHandler,
dispatch: send,
dispatchByName: dispatch,

View file

@ -20,6 +20,8 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
use serde_json::json;
use serde_json::Value;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
@ -429,6 +431,50 @@ impl CoreIsolate {
state.op_registry.register(name, op)
}
pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId
where
F: 'static
+ Fn(
&mut CoreIsolateState,
serde_json::Value,
&mut [ZeroCopyBuf],
) -> Result<serde_json::Value, ErrBox>,
{
let core_op =
move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op {
let value = serde_json::from_slice(&bufs[0]).unwrap();
let result = op(state, value, &mut bufs[1..]);
let buf = serialize_result(None, result);
Op::Sync(buf)
};
let state_rc = Self::state(self);
let mut state = state_rc.borrow_mut();
state.op_registry.register(name, core_op)
}
pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId
where
Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>,
F: 'static
+ Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut,
{
let core_op = move |state: &mut CoreIsolateState,
bufs: &mut [ZeroCopyBuf]|
-> Op {
let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap();
let promise_id = value.get("promiseId").unwrap().as_u64().unwrap();
let fut = op(state, value, &mut bufs[1..]);
let fut2 =
fut.map(move |result| serialize_result(Some(promise_id), result));
Op::Async(Box::pin(fut2))
};
let state_rc = Self::state(self);
let mut state = state_rc.borrow_mut();
state.op_registry.register(name, core_op)
}
/// Registers a callback on the isolate when the memory limits are approached.
/// Use this to prevent V8 from crashing the process when reaching the limit.
///
@ -484,6 +530,23 @@ where
callback(current_heap_limit, initial_heap_limit)
}
fn serialize_result(
promise_id: Option<u64>,
result: Result<Value, ErrBox>,
) -> Buf {
let value = match result {
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
Err(err) => json!({
"promiseId": promise_id ,
"err": {
"message": err.to_string(),
"kind": "Other", // TODO(ry) Figure out how to propagate errors.
}
}),
};
serde_json::to_vec(&value).unwrap().into_boxed_slice()
}
impl Future for CoreIsolate {
type Output = Result<(), ErrBox>;

View file

@ -80,12 +80,14 @@ function handleAsyncMsgFromRust(buf) {
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return sendSync(ops["listen"], -1);
const { rid } = Deno.core.jsonOpSync("listen", {});
return rid;
}
/** Accepts a connection, returns rid. */
function accept(rid) {
return sendAsync(ops["accept"], rid);
async function accept(serverRid) {
const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid });
return rid;
}
/**
@ -124,9 +126,8 @@ let ops;
async function main() {
ops = Deno.core.ops();
for (const opName in ops) {
Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
}
Deno.core.setAsyncHandler(ops["read"], handleAsyncMsgFromRust);
Deno.core.setAsyncHandler(ops["write"], handleAsyncMsgFromRust);
Deno.core.print("http_bench.js start\n");

View file

@ -1,14 +1,17 @@
#[macro_use]
extern crate log;
use deno_core::serde_json;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::Op;
use deno_core::ResourceTable;
use deno_core::Script;
use deno_core::StartupData;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::Future;
use futures::future::FutureExt;
use futures::future::TryFuture;
use futures::future::TryFutureExt;
@ -152,8 +155,8 @@ pub fn isolate_new() -> CoreIsolate {
isolate.register_op(name, core_handler);
}
register_sync_op(&mut isolate, "listen", op_listen);
register_async_op(&mut isolate, "accept", op_accept);
isolate.register_op_json_sync("listen", op_listen);
isolate.register_op_json_async("accept", op_accept);
register_async_op(&mut isolate, "read", op_read);
register_async_op(&mut isolate, "write", op_write);
register_sync_op(&mut isolate, "close", op_close);
@ -175,33 +178,39 @@ fn op_close(
}
fn op_listen(
resource_table: Rc<RefCell<ResourceTable>>,
_rid: u32,
state: &mut CoreIsolateState,
_args: serde_json::Value,
_buf: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
) -> Result<serde_json::Value, ErrBox> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let resource_table = &mut resource_table.borrow_mut();
let resource_table = &mut state.resource_table.borrow_mut();
let rid = resource_table.add("tcpListener", Box::new(listener));
Ok(rid)
Ok(serde_json::json!({ "rid": rid }))
}
fn op_accept(
resource_table: Rc<RefCell<ResourceTable>>,
rid: u32,
state: &mut CoreIsolateState,
args: serde_json::Value,
_buf: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = u32, Error = Error> {
) -> impl Future<Output = Result<serde_json::Value, ErrBox>> {
let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
debug!("accept rid={}", rid);
let resource_table = state.resource_table.clone();
poll_fn(move |cx| {
let resource_table = &mut resource_table.borrow_mut();
let listener = resource_table
.get_mut::<TcpListener>(rid)
.ok_or_else(bad_resource)?;
listener.poll_accept(cx).map_ok(|(stream, _addr)| {
resource_table.add("tcpStream", Box::new(stream))
listener
.poll_accept(cx)
.map_err(ErrBox::from)
.map_ok(|(stream, _addr)| {
let rid = resource_table.add("tcpStream", Box::new(stream));
serde_json::json!({ "rid": rid })
})
})
}
@ -265,7 +274,7 @@ fn main() {
.enable_all()
.build()
.unwrap();
runtime.block_on(isolate).expect("unexpected isolate error");
deno_core::js_check(runtime.block_on(isolate));
}
#[test]

View file

@ -53,6 +53,7 @@ pub use crate::ops::OpId;
pub use crate::resources::ResourceTable;
pub use crate::zero_copy_buf::BufVec;
pub use crate::zero_copy_buf::ZeroCopyBuf;
pub use serde_json;
pub fn v8_version() -> &'static str {
v8::V8::get_version()