mirror of
https://github.com/denoland/deno.git
synced 2025-01-21 13:00:36 -05:00
Refactor deno_core_http_bench and make it single-threaded (#3903)
This commit is contained in:
parent
25467aa7c7
commit
f650c3edb3
5 changed files with 244 additions and 316 deletions
13
Cargo.lock
generated
13
Cargo.lock
generated
|
@ -385,12 +385,12 @@ dependencies = [
|
|||
name = "deno_core"
|
||||
version = "0.32.0"
|
||||
dependencies = [
|
||||
"derive_deref 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"downcast-rs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rusty_v8 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -406,6 +406,16 @@ dependencies = [
|
|||
"serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_deref"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"syn 0.15.44 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs"
|
||||
version = "2.0.2"
|
||||
|
@ -2346,6 +2356,7 @@ dependencies = [
|
|||
"checksum darling 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858"
|
||||
"checksum darling_core 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
|
||||
"checksum darling_macro 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
|
||||
"checksum derive_deref 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11554fdb0aa42363a442e0c4278f51c9621e20c1ce3bac51d79e60646f3b8b8f"
|
||||
"checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3"
|
||||
"checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b"
|
||||
"checksum dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "71e80ad39f814a9abe68583cd50a2d45c8a67561c3361ab8da240587dda80937"
|
||||
|
|
|
@ -27,7 +27,7 @@ rusty_v8 = "0.2.1"
|
|||
name = "deno_core_http_bench"
|
||||
path = "examples/http_bench.rs"
|
||||
|
||||
# tokio is only used for deno_core_http_bench
|
||||
[dev_dependencies]
|
||||
# These dependendencies are only used for deno_core_http_bench.
|
||||
[dev-dependencies]
|
||||
derive_deref = "1.1.0"
|
||||
tokio = { version = "0.2", features = ["rt-core", "tcp"] }
|
||||
num_cpus = "1.11.1"
|
||||
|
|
|
@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array(
|
|||
);
|
||||
assert(scratchBytes.byteLength === 3 * 4);
|
||||
|
||||
function send(promiseId, opId, arg, zeroCopy = null) {
|
||||
function send(promiseId, opId, rid, zeroCopy = null) {
|
||||
scratch32[0] = promiseId;
|
||||
scratch32[1] = arg;
|
||||
scratch32[1] = rid;
|
||||
scratch32[2] = -1;
|
||||
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
|
||||
}
|
||||
|
||||
/** Returns Promise<number> */
|
||||
function sendAsync(opId, arg, zeroCopy = null) {
|
||||
function sendAsync(opId, rid, zeroCopy = null) {
|
||||
const promiseId = nextPromiseId++;
|
||||
const p = createResolvable();
|
||||
const buf = send(promiseId, opId, arg, zeroCopy);
|
||||
const buf = send(promiseId, opId, rid, zeroCopy);
|
||||
if (buf) {
|
||||
const record = recordFromBuf(buf);
|
||||
// Sync result.
|
||||
|
@ -60,8 +60,8 @@ function sendAsync(opId, arg, zeroCopy = null) {
|
|||
}
|
||||
|
||||
/** Returns i32 number */
|
||||
function sendSync(opId, arg) {
|
||||
const buf = send(0, opId, arg);
|
||||
function sendSync(opId, rid) {
|
||||
const buf = send(0, opId, rid);
|
||||
const record = recordFromBuf(buf);
|
||||
return record[2];
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ async function main() {
|
|||
Deno.core.print("http_bench.js start\n");
|
||||
|
||||
const listenerRid = listen();
|
||||
Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`);
|
||||
Deno.core.print(`listening http://127.0.0.1:4544/ rid=${listenerRid}\n`);
|
||||
while (true) {
|
||||
const rid = await accept(listenerRid);
|
||||
// Deno.core.print(`accepted ${rid}`);
|
||||
|
|
|
@ -2,32 +2,33 @@
|
|||
///
|
||||
/// > DENO_BUILD_MODE=release ./tools/build.py && \
|
||||
/// ./target/release/deno_core_http_bench --multi-thread
|
||||
extern crate deno_core;
|
||||
extern crate futures;
|
||||
extern crate libc;
|
||||
extern crate num_cpus;
|
||||
extern crate tokio;
|
||||
|
||||
#[macro_use]
|
||||
extern crate derive_deref;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
use deno_core::Isolate as CoreIsolate;
|
||||
use deno_core::*;
|
||||
use futures::future::Future;
|
||||
use futures::future::FutureExt;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::future::poll_fn;
|
||||
use futures::prelude::*;
|
||||
use futures::task::Context;
|
||||
use futures::task::Poll;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::TryInto;
|
||||
use std::env;
|
||||
use std::fmt::Debug;
|
||||
use std::io::Error;
|
||||
use std::io::ErrorKind;
|
||||
use std::mem::size_of;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::MutexGuard;
|
||||
use std::ptr;
|
||||
use std::rc::Rc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
static LOGGER: Logger = Logger;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
struct Logger;
|
||||
|
||||
|
@ -35,330 +36,245 @@ impl log::Log for Logger {
|
|||
fn enabled(&self, metadata: &log::Metadata) -> bool {
|
||||
metadata.level() <= log::max_level()
|
||||
}
|
||||
|
||||
fn log(&self, record: &log::Record) {
|
||||
if self.enabled(record.metadata()) {
|
||||
println!("{} - {}", record.level(), record.args());
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&self) {}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Record {
|
||||
pub promise_id: i32,
|
||||
pub arg: i32,
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
struct Record {
|
||||
pub promise_id: u32,
|
||||
pub rid: u32,
|
||||
pub result: i32,
|
||||
}
|
||||
|
||||
impl Into<Buf> for Record {
|
||||
fn into(self) -> Buf {
|
||||
let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
|
||||
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
|
||||
unsafe { Box::from_raw(ptr) }
|
||||
}
|
||||
}
|
||||
type RecordBuf = [u8; size_of::<Record>()];
|
||||
|
||||
impl From<&[u8]> for Record {
|
||||
fn from(s: &[u8]) -> Record {
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
let ptr = s.as_ptr() as *const i32;
|
||||
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
|
||||
Record {
|
||||
promise_id: ints[0],
|
||||
arg: ints[1],
|
||||
result: ints[2],
|
||||
fn from(buf: &[u8]) -> Self {
|
||||
assert_eq!(buf.len(), size_of::<RecordBuf>());
|
||||
unsafe { *(buf as *const _ as *const RecordBuf) }.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RecordBuf> for Record {
|
||||
fn from(buf: RecordBuf) -> Self {
|
||||
unsafe {
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
ptr::read_unaligned(&buf as *const _ as *const Self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Buf> for Record {
|
||||
fn from(buf: Buf) -> Record {
|
||||
assert_eq!(buf.len(), 3 * 4);
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
let ptr = Box::into_raw(buf) as *mut [i32; 3];
|
||||
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
|
||||
assert_eq!(ints.len(), 3);
|
||||
Record {
|
||||
promise_id: ints[0],
|
||||
arg: ints[1],
|
||||
result: ints[2],
|
||||
}
|
||||
impl From<Record> for RecordBuf {
|
||||
fn from(record: Record) -> Self {
|
||||
unsafe { ptr::read(&record as *const _ as *const Self) }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_from() {
|
||||
let r = Record {
|
||||
promise_id: 1,
|
||||
arg: 3,
|
||||
result: 4,
|
||||
};
|
||||
let expected = r.clone();
|
||||
let buf: Buf = r.into();
|
||||
#[cfg(target_endian = "little")]
|
||||
assert_eq!(
|
||||
buf,
|
||||
vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
|
||||
);
|
||||
let actual = Record::from(buf);
|
||||
assert_eq!(actual, expected);
|
||||
// TODO test From<&[u8]> for Record
|
||||
struct Isolate {
|
||||
core_isolate: Box<CoreIsolate>, // Unclear why CoreIsolate::new() returns a box.
|
||||
state: State,
|
||||
}
|
||||
|
||||
pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send;
|
||||
#[derive(Clone, Default, Deref)]
|
||||
struct State(Rc<RefCell<StateInner>>);
|
||||
|
||||
pub type HttpOpHandler =
|
||||
fn(record: Record, zero_copy_buf: Option<ZeroCopyBuf>) -> Pin<Box<HttpOp>>;
|
||||
#[derive(Default)]
|
||||
struct StateInner {
|
||||
resource_table: ResourceTable,
|
||||
}
|
||||
|
||||
fn http_op(
|
||||
handler: HttpOpHandler,
|
||||
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp {
|
||||
move |control: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp {
|
||||
let record = Record::from(control);
|
||||
let is_sync = record.promise_id == 0;
|
||||
let op = handler(record.clone(), zero_copy_buf);
|
||||
let mut record_a = record;
|
||||
impl Isolate {
|
||||
pub fn new() -> Self {
|
||||
let startup_data = StartupData::Script(Script {
|
||||
source: include_str!("http_bench.js"),
|
||||
filename: "http_bench.js",
|
||||
});
|
||||
|
||||
let fut = async move {
|
||||
match op.await {
|
||||
Ok(result) => record_a.result = result,
|
||||
Err(err) => {
|
||||
eprintln!("unexpected err {}", err);
|
||||
record_a.result = -1;
|
||||
}
|
||||
};
|
||||
Ok(record_a.into())
|
||||
let mut isolate = Self {
|
||||
core_isolate: CoreIsolate::new(startup_data, false),
|
||||
state: Default::default(),
|
||||
};
|
||||
|
||||
if is_sync {
|
||||
Op::Sync(futures::executor::block_on(fut).unwrap())
|
||||
} else {
|
||||
Op::Async(fut.boxed())
|
||||
}
|
||||
isolate.register_op("listen", op_listen);
|
||||
isolate.register_op("accept", op_accept);
|
||||
isolate.register_op("read", op_read);
|
||||
isolate.register_op("write", op_write);
|
||||
isolate.register_op("close", op_close);
|
||||
|
||||
isolate
|
||||
}
|
||||
|
||||
fn register_op<F>(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
handler: impl Fn(State, u32, Option<ZeroCopyBuf>) -> F + Copy + 'static,
|
||||
) where
|
||||
F: TryFuture,
|
||||
F::Ok: TryInto<i32>,
|
||||
<F::Ok as TryInto<i32>>::Error: Debug,
|
||||
{
|
||||
let state = self.state.clone();
|
||||
let core_handler =
|
||||
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp {
|
||||
let state = state.clone();
|
||||
let record = Record::from(control_buf);
|
||||
let is_sync = record.promise_id == 0;
|
||||
|
||||
let fut = async move {
|
||||
let op = handler(state, record.rid, zero_copy_buf);
|
||||
let result = op
|
||||
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
|
||||
.unwrap_or_else(|_| -1)
|
||||
.await;
|
||||
Ok(RecordBuf::from(Record { result, ..record })[..].into())
|
||||
};
|
||||
|
||||
if is_sync {
|
||||
Op::Sync(futures::executor::block_on(fut).unwrap())
|
||||
} else {
|
||||
Op::Async(fut.boxed_local())
|
||||
}
|
||||
};
|
||||
|
||||
self.core_isolate.register_op(name, core_handler);
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Isolate {
|
||||
type Output = <CoreIsolate as Future>::Output;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.core_isolate.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
fn op_close(
|
||||
state: State,
|
||||
rid: u32,
|
||||
_buf: Option<ZeroCopyBuf>,
|
||||
) -> impl TryFuture<Ok = u32, Error = Error> {
|
||||
debug!("close rid={}", rid);
|
||||
|
||||
async move {
|
||||
let resource_table = &mut state.borrow_mut().resource_table;
|
||||
resource_table
|
||||
.close(rid)
|
||||
.map(|_| 0)
|
||||
.ok_or_else(bad_resource)
|
||||
}
|
||||
}
|
||||
|
||||
fn op_listen(
|
||||
state: State,
|
||||
_rid: u32,
|
||||
_buf: Option<ZeroCopyBuf>,
|
||||
) -> impl TryFuture<Ok = u32, Error = Error> {
|
||||
debug!("listen");
|
||||
|
||||
async move {
|
||||
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
let resource_table = &mut state.borrow_mut().resource_table;
|
||||
let rid = resource_table.add("tcpListener", Box::new(listener));
|
||||
Ok(rid)
|
||||
}
|
||||
}
|
||||
|
||||
fn op_accept(
|
||||
state: State,
|
||||
rid: u32,
|
||||
_buf: Option<ZeroCopyBuf>,
|
||||
) -> impl TryFuture<Ok = u32, Error = Error> {
|
||||
debug!("accept rid={}", rid);
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let resource_table = &mut state.borrow_mut().resource_table;
|
||||
let listener = resource_table
|
||||
.get_mut::<TcpListener>(rid)
|
||||
.ok_or_else(bad_resource)?;
|
||||
listener.poll_accept(cx).map_ok(|(stream, _addr)| {
|
||||
resource_table.add("tcpStream", Box::new(stream))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn op_read(
|
||||
state: State,
|
||||
rid: u32,
|
||||
buf: Option<ZeroCopyBuf>,
|
||||
) -> impl TryFuture<Ok = usize, Error = Error> {
|
||||
let mut buf = buf.unwrap();
|
||||
debug!("read rid={}", rid);
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let resource_table = &mut state.borrow_mut().resource_table;
|
||||
let stream = resource_table
|
||||
.get_mut::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource)?;
|
||||
Pin::new(stream).poll_read(cx, &mut buf)
|
||||
})
|
||||
}
|
||||
|
||||
fn op_write(
|
||||
state: State,
|
||||
rid: u32,
|
||||
buf: Option<ZeroCopyBuf>,
|
||||
) -> impl TryFuture<Ok = usize, Error = Error> {
|
||||
let buf = buf.unwrap();
|
||||
debug!("write rid={}", rid);
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let resource_table = &mut state.borrow_mut().resource_table;
|
||||
let stream = resource_table
|
||||
.get_mut::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource)?;
|
||||
Pin::new(stream).poll_write(cx, &buf)
|
||||
})
|
||||
}
|
||||
|
||||
fn bad_resource() -> Error {
|
||||
Error::new(ErrorKind::NotFound, "bad resource id")
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
// NOTE: `--help` arg will display V8 help and exit
|
||||
let args = deno_core::v8_set_flags(args);
|
||||
|
||||
log::set_logger(&LOGGER).unwrap();
|
||||
log::set_max_level(if args.iter().any(|a| a == "-D") {
|
||||
log::LevelFilter::Debug
|
||||
} else {
|
||||
log::LevelFilter::Warn
|
||||
});
|
||||
|
||||
let js_source = include_str!("http_bench.js");
|
||||
|
||||
let startup_data = StartupData::Script(Script {
|
||||
source: js_source,
|
||||
filename: "http_bench.js",
|
||||
});
|
||||
|
||||
let isolate = deno_core::Isolate::new(startup_data, false);
|
||||
isolate.register_op("listen", http_op(op_listen));
|
||||
isolate.register_op("accept", http_op(op_accept));
|
||||
isolate.register_op("read", http_op(op_read));
|
||||
isolate.register_op("write", http_op(op_write));
|
||||
isolate.register_op("close", http_op(op_close));
|
||||
|
||||
println!(
|
||||
"num cpus; logical: {}; physical: {}",
|
||||
num_cpus::get(),
|
||||
num_cpus::get_physical()
|
||||
log::set_logger(&Logger).unwrap();
|
||||
log::set_max_level(
|
||||
env::args()
|
||||
.find(|a| a == "-D")
|
||||
.map(|_| log::LevelFilter::Debug)
|
||||
.unwrap_or(log::LevelFilter::Warn),
|
||||
);
|
||||
|
||||
// NOTE: `--help` arg will display V8 help and exit
|
||||
deno_core::v8_set_flags(env::args().collect());
|
||||
|
||||
let isolate = Isolate::new();
|
||||
let mut runtime = tokio::runtime::Builder::new()
|
||||
.basic_scheduler()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let result = runtime.block_on(isolate.boxed_local());
|
||||
js_check(result);
|
||||
runtime.block_on(isolate).expect("unexpected isolate error");
|
||||
}
|
||||
|
||||
pub fn bad_resource() -> Error {
|
||||
Error::new(ErrorKind::NotFound, "bad resource id")
|
||||
}
|
||||
|
||||
struct TcpListener(tokio::net::TcpListener);
|
||||
|
||||
impl Resource for TcpListener {}
|
||||
|
||||
struct TcpStream(tokio::net::TcpStream);
|
||||
|
||||
impl Resource for TcpStream {}
|
||||
|
||||
lazy_static! {
|
||||
static ref RESOURCE_TABLE: Mutex<ResourceTable> =
|
||||
Mutex::new(ResourceTable::default());
|
||||
}
|
||||
|
||||
fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
|
||||
RESOURCE_TABLE.lock().unwrap()
|
||||
}
|
||||
|
||||
struct Accept {
|
||||
rid: ResourceId,
|
||||
}
|
||||
|
||||
impl Future for Accept {
|
||||
type Output = Result<(tokio::net::TcpStream, SocketAddr), std::io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
|
||||
let mut table = lock_resource_table();
|
||||
match table.get_mut::<TcpListener>(inner.rid) {
|
||||
None => Poll::Ready(Err(bad_resource())),
|
||||
Some(listener) => {
|
||||
let listener = &mut listener.0;
|
||||
listener.poll_accept(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn op_accept(
|
||||
record: Record,
|
||||
_zero_copy_buf: Option<ZeroCopyBuf>,
|
||||
) -> Pin<Box<HttpOp>> {
|
||||
let rid = record.arg as u32;
|
||||
debug!("accept {}", rid);
|
||||
|
||||
let fut = async move {
|
||||
let (stream, addr) = Accept { rid }.await?;
|
||||
debug!("accept success {}", addr);
|
||||
let mut table = lock_resource_table();
|
||||
let rid = table.add("tcpStream", Box::new(TcpStream(stream)));
|
||||
Ok(rid as i32)
|
||||
};
|
||||
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
fn op_listen(
|
||||
_record: Record,
|
||||
_zero_copy_buf: Option<ZeroCopyBuf>,
|
||||
) -> Pin<Box<HttpOp>> {
|
||||
debug!("listen");
|
||||
let fut = async {
|
||||
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
let mut table = lock_resource_table();
|
||||
let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
|
||||
Ok(rid as i32)
|
||||
};
|
||||
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
fn op_close(
|
||||
record: Record,
|
||||
_zero_copy_buf: Option<ZeroCopyBuf>,
|
||||
) -> Pin<Box<HttpOp>> {
|
||||
debug!("close");
|
||||
let fut = async move {
|
||||
let rid = record.arg as u32;
|
||||
let mut table = lock_resource_table();
|
||||
match table.close(rid) {
|
||||
Some(_) => Ok(0),
|
||||
None => Err(bad_resource()),
|
||||
}
|
||||
};
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
struct Read {
|
||||
rid: ResourceId,
|
||||
buf: ZeroCopyBuf,
|
||||
}
|
||||
|
||||
impl Future for Read {
|
||||
type Output = Result<usize, std::io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
let mut table = lock_resource_table();
|
||||
|
||||
match table.get_mut::<TcpStream>(inner.rid) {
|
||||
None => Poll::Ready(Err(bad_resource())),
|
||||
Some(stream) => {
|
||||
let pinned_stream = Pin::new(&mut stream.0);
|
||||
pinned_stream.poll_read(cx, &mut inner.buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn op_read(
|
||||
record: Record,
|
||||
zero_copy_buf: Option<ZeroCopyBuf>,
|
||||
) -> Pin<Box<HttpOp>> {
|
||||
let rid = record.arg as u32;
|
||||
debug!("read rid={}", rid);
|
||||
let zero_copy_buf = zero_copy_buf.unwrap();
|
||||
|
||||
let fut = async move {
|
||||
let nread = Read {
|
||||
rid,
|
||||
buf: zero_copy_buf,
|
||||
}
|
||||
.await?;
|
||||
debug!("read success {}", nread);
|
||||
Ok(nread as i32)
|
||||
};
|
||||
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
struct Write {
|
||||
rid: ResourceId,
|
||||
buf: ZeroCopyBuf,
|
||||
}
|
||||
|
||||
impl Future for Write {
|
||||
type Output = Result<usize, std::io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
let mut table = lock_resource_table();
|
||||
|
||||
match table.get_mut::<TcpStream>(inner.rid) {
|
||||
None => Poll::Ready(Err(bad_resource())),
|
||||
Some(stream) => {
|
||||
let pinned_stream = Pin::new(&mut stream.0);
|
||||
pinned_stream.poll_write(cx, &inner.buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn op_write(
|
||||
record: Record,
|
||||
zero_copy_buf: Option<ZeroCopyBuf>,
|
||||
) -> Pin<Box<HttpOp>> {
|
||||
let rid = record.arg as u32;
|
||||
debug!("write rid={}", rid);
|
||||
let zero_copy_buf = zero_copy_buf.unwrap();
|
||||
|
||||
let fut = async move {
|
||||
let nwritten = Write {
|
||||
rid,
|
||||
buf: zero_copy_buf,
|
||||
}
|
||||
.await?;
|
||||
debug!("write success {}", nwritten);
|
||||
Ok(nwritten as i32)
|
||||
};
|
||||
|
||||
fut.boxed()
|
||||
}
|
||||
|
||||
fn js_check(r: Result<(), ErrBox>) {
|
||||
if let Err(e) = r {
|
||||
panic!(e.to_string());
|
||||
#[test]
|
||||
fn test_record_from() {
|
||||
let expected = Record {
|
||||
promise_id: 1,
|
||||
rid: 3,
|
||||
result: 4,
|
||||
};
|
||||
let buf = RecordBuf::from(expected);
|
||||
if cfg!(target_endian = "little") {
|
||||
assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]);
|
||||
}
|
||||
let actual = Record::from(buf);
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
|
|
@ -82,9 +82,9 @@ def deno_http_proxy(deno_exe, hyper_hello_exe):
|
|||
origin_cmd=http_proxy_origin(hyper_hello_exe, origin_port))
|
||||
|
||||
|
||||
def deno_core_single(exe):
|
||||
print "http_benchmark testing deno_core_single"
|
||||
return run([exe, "--single-thread"], 4544)
|
||||
def deno_core_http_bench(exe):
|
||||
print "http_benchmark testing deno_core_http_bench"
|
||||
return run([exe], 4544)
|
||||
|
||||
|
||||
def node_http():
|
||||
|
@ -132,8 +132,8 @@ def hyper_http(hyper_hello_exe):
|
|||
|
||||
def http_benchmark(build_dir):
|
||||
hyper_hello_exe = os.path.join(build_dir, "hyper_hello")
|
||||
core_http_bench_exe = os.path.join(build_dir,
|
||||
"examples/deno_core_http_bench")
|
||||
deno_core_http_bench_exe = os.path.join(build_dir,
|
||||
"examples/deno_core_http_bench")
|
||||
deno_exe = os.path.join(build_dir, "deno")
|
||||
return {
|
||||
# "deno_tcp" was once called "deno"
|
||||
|
@ -142,7 +142,8 @@ def http_benchmark(build_dir):
|
|||
"deno_http": deno_http(deno_exe),
|
||||
"deno_proxy": deno_http_proxy(deno_exe, hyper_hello_exe),
|
||||
"deno_proxy_tcp": deno_tcp_proxy(deno_exe, hyper_hello_exe),
|
||||
"deno_core_single": deno_core_single(core_http_bench_exe),
|
||||
# "deno_core_http_bench" was once called "deno_core_single"
|
||||
"deno_core_http_bench": deno_core_http_bench(deno_core_http_bench_exe),
|
||||
# "node_http" was once called "node"
|
||||
"node_http": node_http(),
|
||||
"node_proxy": node_http_proxy(hyper_hello_exe),
|
||||
|
|
Loading…
Add table
Reference in a new issue