mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
First pass at support for TCP servers and clients. (#884)
Adds deno.listen(), deno.dial(), deno.Listener and deno.Conn.
This commit is contained in:
parent
e5e7f0f038
commit
0422b224e8
11 changed files with 497 additions and 6 deletions
12
BUILD.gn
12
BUILD.gn
|
@ -70,6 +70,7 @@ ts_sources = [
|
||||||
"js/blob.ts",
|
"js/blob.ts",
|
||||||
"js/compiler.ts",
|
"js/compiler.ts",
|
||||||
"js/console.ts",
|
"js/console.ts",
|
||||||
|
"js/copy_file.ts",
|
||||||
"js/deno.ts",
|
"js/deno.ts",
|
||||||
"js/dispatch.ts",
|
"js/dispatch.ts",
|
||||||
"js/dom_types.ts",
|
"js/dom_types.ts",
|
||||||
|
@ -77,22 +78,23 @@ ts_sources = [
|
||||||
"js/fetch.ts",
|
"js/fetch.ts",
|
||||||
"js/fileinfo.ts",
|
"js/fileinfo.ts",
|
||||||
"js/files.ts",
|
"js/files.ts",
|
||||||
"js/io.ts",
|
|
||||||
"js/global-eval.ts",
|
"js/global-eval.ts",
|
||||||
"js/globals.ts",
|
"js/globals.ts",
|
||||||
|
"js/io.ts",
|
||||||
"js/libdeno.ts",
|
"js/libdeno.ts",
|
||||||
"js/main.ts",
|
"js/main.ts",
|
||||||
"js/mkdir.ts",
|
|
||||||
"js/make_temp_dir.ts",
|
"js/make_temp_dir.ts",
|
||||||
|
"js/mkdir.ts",
|
||||||
"js/mock_builtin.js",
|
"js/mock_builtin.js",
|
||||||
|
"js/net.ts",
|
||||||
"js/os.ts",
|
"js/os.ts",
|
||||||
"js/platform.ts",
|
"js/platform.ts",
|
||||||
"js/plugins.d.ts",
|
"js/plugins.d.ts",
|
||||||
"js/read_file.ts",
|
|
||||||
"js/read_dir.ts",
|
"js/read_dir.ts",
|
||||||
|
"js/read_file.ts",
|
||||||
|
"js/read_link.ts",
|
||||||
"js/remove.ts",
|
"js/remove.ts",
|
||||||
"js/rename.ts",
|
"js/rename.ts",
|
||||||
"js/read_link.ts",
|
|
||||||
"js/stat.ts",
|
"js/stat.ts",
|
||||||
"js/symlink.ts",
|
"js/symlink.ts",
|
||||||
"js/text_encoding.ts",
|
"js/text_encoding.ts",
|
||||||
|
@ -103,7 +105,7 @@ ts_sources = [
|
||||||
"js/util.ts",
|
"js/util.ts",
|
||||||
"js/v8_source_maps.ts",
|
"js/v8_source_maps.ts",
|
||||||
"js/write_file.ts",
|
"js/write_file.ts",
|
||||||
"js/copy_file.ts",
|
|
||||||
"js/tsconfig.declarations.json",
|
"js/tsconfig.declarations.json",
|
||||||
"tsconfig.json",
|
"tsconfig.json",
|
||||||
|
|
||||||
|
|
|
@ -21,4 +21,5 @@ export { platform } from "./platform";
|
||||||
export { trace } from "./trace";
|
export { trace } from "./trace";
|
||||||
export { truncateSync, truncate } from "./truncate";
|
export { truncateSync, truncate } from "./truncate";
|
||||||
export { FileInfo } from "./fileinfo";
|
export { FileInfo } from "./fileinfo";
|
||||||
|
export { connect, dial, listen, Listener, Conn } from "./net";
|
||||||
export const args: string[] = [];
|
export const args: string[] = [];
|
||||||
|
|
|
@ -18,7 +18,7 @@ export class File implements Reader, Writer, Closer {
|
||||||
}
|
}
|
||||||
|
|
||||||
close(): void {
|
close(): void {
|
||||||
return close(this.fd);
|
close(this.fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
175
js/net.ts
Normal file
175
js/net.ts
Normal file
|
@ -0,0 +1,175 @@
|
||||||
|
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
import { ReadResult, Reader, Writer, Closer } from "./io";
|
||||||
|
import * as fbs from "gen/msg_generated";
|
||||||
|
import { assert, notImplemented } from "./util";
|
||||||
|
import * as dispatch from "./dispatch";
|
||||||
|
import { flatbuffers } from "flatbuffers";
|
||||||
|
import { read, write, close } from "./files";
|
||||||
|
|
||||||
|
export type Network = "tcp";
|
||||||
|
// TODO support other types:
|
||||||
|
// export type Network = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
|
||||||
|
|
||||||
|
// TODO Support finding network from Addr, see https://golang.org/pkg/net/#Addr
|
||||||
|
export type Addr = string;
|
||||||
|
|
||||||
|
/** A Listener is a generic network listener for stream-oriented protocols. */
|
||||||
|
export interface Listener {
|
||||||
|
/** accept() waits for and returns the next connection to the Listener. */
|
||||||
|
accept(): Promise<Conn>;
|
||||||
|
|
||||||
|
/** Close closes the listener.
|
||||||
|
* Any pending accept promises will be rejected with errors.
|
||||||
|
*/
|
||||||
|
close(): void;
|
||||||
|
|
||||||
|
addr(): Addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ListenerImpl implements Listener {
|
||||||
|
constructor(readonly fd: number) {}
|
||||||
|
|
||||||
|
async accept(): Promise<Conn> {
|
||||||
|
const builder = new flatbuffers.Builder();
|
||||||
|
fbs.Accept.startAccept(builder);
|
||||||
|
fbs.Accept.addRid(builder, this.fd);
|
||||||
|
const msg = fbs.Accept.endAccept(builder);
|
||||||
|
const baseRes = await dispatch.sendAsync(builder, fbs.Any.Accept, msg);
|
||||||
|
assert(baseRes != null);
|
||||||
|
assert(fbs.Any.NewConn === baseRes!.msgType());
|
||||||
|
const res = new fbs.NewConn();
|
||||||
|
assert(baseRes!.msg(res) != null);
|
||||||
|
return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(): void {
|
||||||
|
close(this.fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
addr(): Addr {
|
||||||
|
return notImplemented();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Conn extends Reader, Writer, Closer {
|
||||||
|
localAddr: string;
|
||||||
|
remoteAddr: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ConnImpl implements Conn {
|
||||||
|
constructor(
|
||||||
|
readonly fd: number,
|
||||||
|
readonly remoteAddr: string,
|
||||||
|
readonly localAddr: string
|
||||||
|
) {}
|
||||||
|
|
||||||
|
write(p: ArrayBufferView): Promise<number> {
|
||||||
|
return write(this.fd, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
read(p: ArrayBufferView): Promise<ReadResult> {
|
||||||
|
return read(this.fd, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(): void {
|
||||||
|
close(this.fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** closeRead shuts down (shutdown(2)) the reading side of the TCP connection.
|
||||||
|
* Most callers should just use close().
|
||||||
|
*/
|
||||||
|
closeRead(): void {
|
||||||
|
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
|
||||||
|
return notImplemented();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** closeWrite shuts down (shutdown(2)) the writing side of the TCP
|
||||||
|
* connection. Most callers should just use close().
|
||||||
|
*/
|
||||||
|
closeWrite(): void {
|
||||||
|
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
|
||||||
|
return notImplemented();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Listen announces on the local network address.
|
||||||
|
*
|
||||||
|
* The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
|
||||||
|
*
|
||||||
|
* For TCP networks, if the host in the address parameter is empty or a literal
|
||||||
|
* unspecified IP address, Listen listens on all available unicast and anycast
|
||||||
|
* IP addresses of the local system. To only use IPv4, use network "tcp4". The
|
||||||
|
* address can use a host name, but this is not recommended, because it will
|
||||||
|
* create a listener for at most one of the host's IP addresses. If the port in
|
||||||
|
* the address parameter is empty or "0", as in "127.0.0.1:" or "[::1]:0", a
|
||||||
|
* port number is automatically chosen. The Addr method of Listener can be used
|
||||||
|
* to discover the chosen port.
|
||||||
|
*
|
||||||
|
* See dial() for a description of the network and address parameters.
|
||||||
|
*/
|
||||||
|
export function listen(network: Network, address: string): Listener {
|
||||||
|
const builder = new flatbuffers.Builder();
|
||||||
|
const network_ = builder.createString(network);
|
||||||
|
const address_ = builder.createString(address);
|
||||||
|
fbs.Listen.startListen(builder);
|
||||||
|
fbs.Listen.addNetwork(builder, network_);
|
||||||
|
fbs.Listen.addAddress(builder, address_);
|
||||||
|
const msg = fbs.Listen.endListen(builder);
|
||||||
|
const baseRes = dispatch.sendSync(builder, fbs.Any.Listen, msg);
|
||||||
|
assert(baseRes != null);
|
||||||
|
assert(fbs.Any.ListenRes === baseRes!.msgType());
|
||||||
|
const res = new fbs.ListenRes();
|
||||||
|
assert(baseRes!.msg(res) != null);
|
||||||
|
return new ListenerImpl(res.rid());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Dial connects to the address on the named network.
|
||||||
|
*
|
||||||
|
* Supported networks are only "tcp" currently.
|
||||||
|
* TODO: "tcp4" (IPv4-only), "tcp6" (IPv6-only), "udp", "udp4"
|
||||||
|
* (IPv4-only), "udp6" (IPv6-only), "ip", "ip4" (IPv4-only), "ip6" (IPv6-only),
|
||||||
|
* "unix", "unixgram" and "unixpacket".
|
||||||
|
*
|
||||||
|
* For TCP and UDP networks, the address has the form "host:port". The host must
|
||||||
|
* be a literal IP address, or a host name that can be resolved to IP addresses.
|
||||||
|
* The port must be a literal port number or a service name. If the host is a
|
||||||
|
* literal IPv6 address it must be enclosed in square brackets, as in
|
||||||
|
* "[2001:db8::1]:80" or "[fe80::1%zone]:80". The zone specifies the scope of
|
||||||
|
* the literal IPv6 address as defined in RFC 4007. The functions JoinHostPort
|
||||||
|
* and SplitHostPort manipulate a pair of host and port in this form. When using
|
||||||
|
* TCP, and the host resolves to multiple IP addresses, Dial will try each IP
|
||||||
|
* address in order until one succeeds.
|
||||||
|
*
|
||||||
|
* Examples:
|
||||||
|
*
|
||||||
|
* dial("tcp", "golang.org:http")
|
||||||
|
* dial("tcp", "192.0.2.1:http")
|
||||||
|
* dial("tcp", "198.51.100.1:80")
|
||||||
|
* dial("udp", "[2001:db8::1]:domain")
|
||||||
|
* dial("udp", "[fe80::1%lo0]:53")
|
||||||
|
* dial("tcp", ":80")
|
||||||
|
*/
|
||||||
|
export async function dial(network: Network, address: string): Promise<Conn> {
|
||||||
|
const builder = new flatbuffers.Builder();
|
||||||
|
const network_ = builder.createString(network);
|
||||||
|
const address_ = builder.createString(address);
|
||||||
|
fbs.Dial.startDial(builder);
|
||||||
|
fbs.Dial.addNetwork(builder, network_);
|
||||||
|
fbs.Dial.addAddress(builder, address_);
|
||||||
|
const msg = fbs.Dial.endDial(builder);
|
||||||
|
const baseRes = await dispatch.sendAsync(builder, fbs.Any.Dial, msg);
|
||||||
|
assert(baseRes != null);
|
||||||
|
assert(fbs.Any.NewConn === baseRes!.msgType());
|
||||||
|
const res = new fbs.NewConn();
|
||||||
|
assert(baseRes!.msg(res) != null);
|
||||||
|
return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unused but reserved op.
|
||||||
|
export async function connect(
|
||||||
|
network: Network,
|
||||||
|
address: string
|
||||||
|
): Promise<Conn> {
|
||||||
|
return notImplemented();
|
||||||
|
}
|
37
js/net_test.ts
Normal file
37
js/net_test.ts
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
import * as deno from "deno";
|
||||||
|
import { testPerm, assert, assertEqual } from "./test_util.ts";
|
||||||
|
|
||||||
|
testPerm({ net: true }, function netListenClose() {
|
||||||
|
const listener = deno.listen("tcp", "127.0.0.1:4500");
|
||||||
|
listener.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
testPerm({ net: true }, async function netDialListen() {
|
||||||
|
let addr = "127.0.0.1:4500";
|
||||||
|
const listener = deno.listen("tcp", addr);
|
||||||
|
listener.accept().then(async conn => {
|
||||||
|
await conn.write(new Uint8Array([1, 2, 3]));
|
||||||
|
conn.close();
|
||||||
|
});
|
||||||
|
const conn = await deno.dial("tcp", addr);
|
||||||
|
const buf = new Uint8Array(1024);
|
||||||
|
const readResult = await conn.read(buf);
|
||||||
|
assertEqual(3, readResult.nread);
|
||||||
|
assertEqual(1, buf[0]);
|
||||||
|
assertEqual(2, buf[1]);
|
||||||
|
assertEqual(3, buf[2]);
|
||||||
|
|
||||||
|
// TODO Currently ReadResult does not properly transmit EOF in the same call.
|
||||||
|
// it requires a second call to get the EOF. Either ReadResult to be an
|
||||||
|
// integer in which 0 signifies EOF or the handler should be modified so that
|
||||||
|
// EOF is properly transmitted.
|
||||||
|
assertEqual(false, readResult.eof);
|
||||||
|
|
||||||
|
const readResult2 = await conn.read(buf);
|
||||||
|
assertEqual(true, readResult2.eof);
|
||||||
|
|
||||||
|
listener.close();
|
||||||
|
conn.close();
|
||||||
|
});
|
|
@ -20,6 +20,7 @@ import "./timers_test.ts";
|
||||||
import "./symlink_test.ts";
|
import "./symlink_test.ts";
|
||||||
import "./platform_test.ts";
|
import "./platform_test.ts";
|
||||||
import "./text_encoding_test.ts";
|
import "./text_encoding_test.ts";
|
||||||
|
import "./net_test.ts";
|
||||||
import "./trace_test.ts";
|
import "./trace_test.ts";
|
||||||
import "./truncate_test.ts";
|
import "./truncate_test.ts";
|
||||||
import "./v8_source_maps_test.ts";
|
import "./v8_source_maps_test.ts";
|
||||||
|
|
150
src/handlers.rs
150
src/handlers.rs
|
@ -9,6 +9,7 @@ use isolate::Isolate;
|
||||||
use isolate::IsolateState;
|
use isolate::IsolateState;
|
||||||
use isolate::Op;
|
use isolate::Op;
|
||||||
use msg;
|
use msg;
|
||||||
|
use tokio_util;
|
||||||
|
|
||||||
use flatbuffers::FlatBufferBuilder;
|
use flatbuffers::FlatBufferBuilder;
|
||||||
use futures;
|
use futures;
|
||||||
|
@ -21,14 +22,18 @@ use remove_dir_all::remove_dir_all;
|
||||||
use resources;
|
use resources;
|
||||||
use std;
|
use std;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::net::SocketAddr;
|
||||||
#[cfg(any(unix))]
|
#[cfg(any(unix))]
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::UNIX_EPOCH;
|
use std::time::UNIX_EPOCH;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio;
|
use tokio;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
use tokio_io;
|
use tokio_io;
|
||||||
use tokio_threadpool;
|
use tokio_threadpool;
|
||||||
|
|
||||||
|
@ -74,6 +79,7 @@ pub fn msg_from_js(
|
||||||
msg::Any::Open => handle_open,
|
msg::Any::Open => handle_open,
|
||||||
msg::Any::Read => handle_read,
|
msg::Any::Read => handle_read,
|
||||||
msg::Any::Write => handle_write,
|
msg::Any::Write => handle_write,
|
||||||
|
msg::Any::Close => handle_close,
|
||||||
msg::Any::Remove => handle_remove,
|
msg::Any::Remove => handle_remove,
|
||||||
msg::Any::ReadFile => handle_read_file,
|
msg::Any::ReadFile => handle_read_file,
|
||||||
msg::Any::ReadDir => handle_read_dir,
|
msg::Any::ReadDir => handle_read_dir,
|
||||||
|
@ -86,6 +92,9 @@ pub fn msg_from_js(
|
||||||
msg::Any::WriteFile => handle_write_file,
|
msg::Any::WriteFile => handle_write_file,
|
||||||
msg::Any::Exit => handle_exit,
|
msg::Any::Exit => handle_exit,
|
||||||
msg::Any::CopyFile => handle_copy_file,
|
msg::Any::CopyFile => handle_copy_file,
|
||||||
|
msg::Any::Listen => handle_listen,
|
||||||
|
msg::Any::Accept => handle_accept,
|
||||||
|
msg::Any::Dial => handle_dial,
|
||||||
_ => panic!(format!(
|
_ => panic!(format!(
|
||||||
"Unhandled message {}",
|
"Unhandled message {}",
|
||||||
msg::enum_name_any(msg_type)
|
msg::enum_name_any(msg_type)
|
||||||
|
@ -581,6 +590,26 @@ fn handle_open(
|
||||||
Box::new(op)
|
Box::new(op)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_close(
|
||||||
|
_state: Arc<IsolateState>,
|
||||||
|
base: &msg::Base,
|
||||||
|
data: &'static mut [u8],
|
||||||
|
) -> Box<Op> {
|
||||||
|
assert_eq!(data.len(), 0);
|
||||||
|
let msg = base.msg_as_close().unwrap();
|
||||||
|
let rid = msg.rid();
|
||||||
|
match resources::lookup(rid) {
|
||||||
|
None => odd_future(errors::new(
|
||||||
|
errors::ErrorKind::BadFileDescriptor,
|
||||||
|
String::from("Bad File Descriptor"),
|
||||||
|
)),
|
||||||
|
Some(mut resource) => {
|
||||||
|
resource.close();
|
||||||
|
ok_future(empty_buf())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_read(
|
fn handle_read(
|
||||||
_state: Arc<IsolateState>,
|
_state: Arc<IsolateState>,
|
||||||
base: &msg::Base,
|
base: &msg::Base,
|
||||||
|
@ -994,3 +1023,124 @@ fn handle_truncate(
|
||||||
Ok(empty_buf())
|
Ok(empty_buf())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_listen(
|
||||||
|
state: Arc<IsolateState>,
|
||||||
|
base: &msg::Base,
|
||||||
|
data: &'static mut [u8],
|
||||||
|
) -> Box<Op> {
|
||||||
|
assert_eq!(data.len(), 0);
|
||||||
|
if !state.flags.allow_net {
|
||||||
|
return odd_future(permission_denied());
|
||||||
|
}
|
||||||
|
|
||||||
|
let cmd_id = base.cmd_id();
|
||||||
|
let msg = base.msg_as_listen().unwrap();
|
||||||
|
let network = msg.network().unwrap();
|
||||||
|
assert_eq!(network, "tcp");
|
||||||
|
let address = msg.address().unwrap();
|
||||||
|
|
||||||
|
Box::new(futures::future::result((move || {
|
||||||
|
// TODO properly parse addr
|
||||||
|
let addr = SocketAddr::from_str(address).unwrap();
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(&addr)?;
|
||||||
|
let resource = resources::add_tcp_listener(listener);
|
||||||
|
|
||||||
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
|
let msg = msg::ListenRes::create(
|
||||||
|
builder,
|
||||||
|
&msg::ListenResArgs {
|
||||||
|
rid: resource.rid,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Ok(serialize_response(
|
||||||
|
cmd_id,
|
||||||
|
builder,
|
||||||
|
msg::BaseArgs {
|
||||||
|
msg: Some(msg.as_union_value()),
|
||||||
|
msg_type: msg::Any::ListenRes,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
))
|
||||||
|
})()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
|
||||||
|
let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
|
||||||
|
// TODO forward socket_addr to client.
|
||||||
|
|
||||||
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
|
let msg = msg::NewConn::create(
|
||||||
|
builder,
|
||||||
|
&msg::NewConnArgs {
|
||||||
|
rid: tcp_stream_resource.rid,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Ok(serialize_response(
|
||||||
|
cmd_id,
|
||||||
|
builder,
|
||||||
|
msg::BaseArgs {
|
||||||
|
msg: Some(msg.as_union_value()),
|
||||||
|
msg_type: msg::Any::NewConn,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_accept(
|
||||||
|
state: Arc<IsolateState>,
|
||||||
|
base: &msg::Base,
|
||||||
|
data: &'static mut [u8],
|
||||||
|
) -> Box<Op> {
|
||||||
|
assert_eq!(data.len(), 0);
|
||||||
|
if !state.flags.allow_net {
|
||||||
|
return odd_future(permission_denied());
|
||||||
|
}
|
||||||
|
|
||||||
|
let cmd_id = base.cmd_id();
|
||||||
|
let msg = base.msg_as_accept().unwrap();
|
||||||
|
let server_rid = msg.rid();
|
||||||
|
|
||||||
|
match resources::lookup(server_rid) {
|
||||||
|
None => odd_future(errors::new(
|
||||||
|
errors::ErrorKind::BadFileDescriptor,
|
||||||
|
String::from("Bad File Descriptor"),
|
||||||
|
)),
|
||||||
|
Some(server_resource) => {
|
||||||
|
let op = tokio_util::accept(server_resource)
|
||||||
|
.map_err(|err| DenoError::from(err))
|
||||||
|
.and_then(move |(tcp_stream, _socket_addr)| {
|
||||||
|
new_conn(cmd_id, tcp_stream)
|
||||||
|
});
|
||||||
|
Box::new(op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_dial(
|
||||||
|
state: Arc<IsolateState>,
|
||||||
|
base: &msg::Base,
|
||||||
|
data: &'static mut [u8],
|
||||||
|
) -> Box<Op> {
|
||||||
|
assert_eq!(data.len(), 0);
|
||||||
|
if !state.flags.allow_net {
|
||||||
|
return odd_future(permission_denied());
|
||||||
|
}
|
||||||
|
|
||||||
|
let cmd_id = base.cmd_id();
|
||||||
|
let msg = base.msg_as_dial().unwrap();
|
||||||
|
let network = msg.network().unwrap();
|
||||||
|
assert_eq!(network, "tcp");
|
||||||
|
let address = msg.address().unwrap();
|
||||||
|
|
||||||
|
// TODO properly parse addr
|
||||||
|
let addr = SocketAddr::from_str(address).unwrap();
|
||||||
|
|
||||||
|
let op = TcpStream::connect(&addr)
|
||||||
|
.map_err(|err| err.into())
|
||||||
|
.and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream));
|
||||||
|
Box::new(op)
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
||||||
extern crate flatbuffers;
|
extern crate flatbuffers;
|
||||||
|
#[macro_use]
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate libc;
|
extern crate libc;
|
||||||
|
|
30
src/msg.fbs
30
src/msg.fbs
|
@ -35,6 +35,11 @@ union Any {
|
||||||
Write,
|
Write,
|
||||||
WriteRes,
|
WriteRes,
|
||||||
Close,
|
Close,
|
||||||
|
Listen,
|
||||||
|
ListenRes,
|
||||||
|
Accept,
|
||||||
|
Dial,
|
||||||
|
NewConn,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ErrorKind: byte {
|
enum ErrorKind: byte {
|
||||||
|
@ -285,4 +290,29 @@ table Close {
|
||||||
rid: int;
|
rid: int;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
table Listen {
|
||||||
|
network: string;
|
||||||
|
address: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
table ListenRes {
|
||||||
|
rid: int;
|
||||||
|
}
|
||||||
|
|
||||||
|
table Accept {
|
||||||
|
rid: int;
|
||||||
|
}
|
||||||
|
|
||||||
|
table Dial {
|
||||||
|
network: string;
|
||||||
|
address: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response to Accept and Dial.
|
||||||
|
table NewConn {
|
||||||
|
rid: int;
|
||||||
|
remote_addr: string;
|
||||||
|
local_addr: string;
|
||||||
|
}
|
||||||
|
|
||||||
root_type Base;
|
root_type Base;
|
||||||
|
|
|
@ -14,11 +14,13 @@ use std;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Error;
|
use std::io::Error;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::AtomicIsize;
|
use std::sync::atomic::AtomicIsize;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use tokio;
|
use tokio;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
pub type ResourceId = i32; // Sometimes referred to RID.
|
pub type ResourceId = i32; // Sometimes referred to RID.
|
||||||
|
|
||||||
|
@ -45,14 +47,40 @@ enum Repr {
|
||||||
Stdout(tokio::io::Stdout),
|
Stdout(tokio::io::Stdout),
|
||||||
Stderr(tokio::io::Stderr),
|
Stderr(tokio::io::Stderr),
|
||||||
FsFile(tokio::fs::File),
|
FsFile(tokio::fs::File),
|
||||||
|
TcpListener(tokio::net::TcpListener),
|
||||||
|
TcpStream(tokio::net::TcpStream),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abstract async file interface.
|
// Abstract async file interface.
|
||||||
// Ideally in unix, if Resource represents an OS rid, it will be the same.
|
// Ideally in unix, if Resource represents an OS rid, it will be the same.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct Resource {
|
pub struct Resource {
|
||||||
pub rid: ResourceId,
|
pub rid: ResourceId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Resource {
|
||||||
|
// TODO Should it return a Resource instead of net::TcpStream?
|
||||||
|
pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), Error> {
|
||||||
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
let maybe_repr = table.get_mut(&self.rid);
|
||||||
|
match maybe_repr {
|
||||||
|
None => panic!("bad rid"),
|
||||||
|
Some(repr) => match repr {
|
||||||
|
Repr::TcpListener(ref mut s) => s.poll_accept(),
|
||||||
|
_ => panic!("Cannot accept"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// close(2) is done by dropping the value. Therefore we just need to remove
|
||||||
|
// the resource from the RESOURCE_TABLE.
|
||||||
|
pub fn close(&mut self) {
|
||||||
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
let r = table.remove(&self.rid);
|
||||||
|
assert!(r.is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Read for Resource {
|
impl Read for Resource {
|
||||||
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
|
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
@ -68,9 +96,11 @@ impl AsyncRead for Resource {
|
||||||
Some(repr) => match repr {
|
Some(repr) => match repr {
|
||||||
Repr::FsFile(ref mut f) => f.poll_read(buf),
|
Repr::FsFile(ref mut f) => f.poll_read(buf),
|
||||||
Repr::Stdin(ref mut f) => f.poll_read(buf),
|
Repr::Stdin(ref mut f) => f.poll_read(buf),
|
||||||
|
Repr::TcpStream(ref mut f) => f.poll_read(buf),
|
||||||
Repr::Stdout(_) | Repr::Stderr(_) => {
|
Repr::Stdout(_) | Repr::Stderr(_) => {
|
||||||
panic!("Cannot read from stdout/stderr")
|
panic!("Cannot read from stdout/stderr")
|
||||||
}
|
}
|
||||||
|
Repr::TcpListener(_) => panic!("Cannot read"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -96,7 +126,9 @@ impl AsyncWrite for Resource {
|
||||||
Repr::FsFile(ref mut f) => f.poll_write(buf),
|
Repr::FsFile(ref mut f) => f.poll_write(buf),
|
||||||
Repr::Stdout(ref mut f) => f.poll_write(buf),
|
Repr::Stdout(ref mut f) => f.poll_write(buf),
|
||||||
Repr::Stderr(ref mut f) => f.poll_write(buf),
|
Repr::Stderr(ref mut f) => f.poll_write(buf),
|
||||||
|
Repr::TcpStream(ref mut f) => f.poll_write(buf),
|
||||||
Repr::Stdin(_) => panic!("Cannot write to stdin"),
|
Repr::Stdin(_) => panic!("Cannot write to stdin"),
|
||||||
|
Repr::TcpListener(_) => panic!("Cannot write"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,6 +152,22 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
|
||||||
|
let rid = new_rid();
|
||||||
|
let mut tg = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
let r = tg.insert(rid, Repr::TcpListener(listener));
|
||||||
|
assert!(r.is_none());
|
||||||
|
Resource { rid }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource {
|
||||||
|
let rid = new_rid();
|
||||||
|
let mut tg = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
let r = tg.insert(rid, Repr::TcpStream(stream));
|
||||||
|
assert!(r.is_none());
|
||||||
|
Resource { rid }
|
||||||
|
}
|
||||||
|
|
||||||
pub fn lookup(rid: ResourceId) -> Option<Resource> {
|
pub fn lookup(rid: ResourceId) -> Option<Resource> {
|
||||||
let table = RESOURCE_TABLE.lock().unwrap();
|
let table = RESOURCE_TABLE.lock().unwrap();
|
||||||
table.get(&rid).map(|_| Resource { rid })
|
table.get(&rid).map(|_| Resource { rid })
|
||||||
|
|
|
@ -1,8 +1,15 @@
|
||||||
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use resources::Resource;
|
||||||
|
|
||||||
use futures;
|
use futures;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use futures::Poll;
|
||||||
|
use std::io;
|
||||||
|
use std::mem;
|
||||||
|
use std::net::SocketAddr;
|
||||||
use tokio;
|
use tokio;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
use tokio_executor;
|
use tokio_executor;
|
||||||
|
|
||||||
pub fn block_on<F, R, E>(future: F) -> Result<R, E>
|
pub fn block_on<F, R, E>(future: F) -> Result<R, E>
|
||||||
|
@ -28,3 +35,42 @@ where
|
||||||
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
|
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
|
||||||
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
|
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum AcceptState {
|
||||||
|
Pending(Resource),
|
||||||
|
Empty,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Simply accepts a connection.
|
||||||
|
pub fn accept(r: Resource) -> Accept {
|
||||||
|
Accept {
|
||||||
|
state: AcceptState::Pending(r),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A future which can be used to easily read available number of bytes to fill
|
||||||
|
/// a buffer.
|
||||||
|
///
|
||||||
|
/// Created by the [`read`] function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Accept {
|
||||||
|
state: AcceptState,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for Accept {
|
||||||
|
type Item = (TcpStream, SocketAddr);
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let (stream, addr) = match self.state {
|
||||||
|
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
|
||||||
|
AcceptState::Empty => panic!("poll Accept after it's done"),
|
||||||
|
};
|
||||||
|
|
||||||
|
match mem::replace(&mut self.state, AcceptState::Empty) {
|
||||||
|
AcceptState::Pending(_) => Ok((stream, addr).into()),
|
||||||
|
AcceptState::Empty => panic!("invalid internal state"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue