0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-01 20:25:12 -05:00

Support streaming response bodies from fetch()

Also Buffer.readFrom in fetch() to buffer response.
This commit is contained in:
Ryan Dahl 2018-10-26 12:14:06 -04:00
parent f2223fb9ad
commit b1579460ce
7 changed files with 141 additions and 78 deletions

View file

@ -13,7 +13,7 @@ export {
Writer, Writer,
Closer, Closer,
Seeker, Seeker,
ReaderCloser, ReadCloser,
WriteCloser, WriteCloser,
ReadSeeker, ReadSeeker,
WriteSeeker, WriteSeeker,

View file

@ -225,7 +225,7 @@ interface AbortSignal extends EventTarget {
): void; ): void;
} }
interface ReadableStream { export interface ReadableStream {
readonly locked: boolean; readonly locked: boolean;
cancel(): Promise<void>; cancel(): Promise<void>;
getReader(): ReadableStreamReader; getReader(): ReadableStreamReader;
@ -235,7 +235,7 @@ interface EventListenerObject {
handleEvent(evt: Event): void; handleEvent(evt: Event): void;
} }
interface ReadableStreamReader { export interface ReadableStreamReader {
cancel(): Promise<void>; cancel(): Promise<void>;
// tslint:disable-next-line:no-any // tslint:disable-next-line:no-any
read(): Promise<any>; read(): Promise<any>;
@ -270,7 +270,7 @@ export interface Blob {
slice(start?: number, end?: number, contentType?: string): Blob; slice(start?: number, end?: number, contentType?: string): Blob;
} }
interface Body { export interface Body {
/** A simple getter used to expose a `ReadableStream` of the body contents. */ /** A simple getter used to expose a `ReadableStream` of the body contents. */
readonly body: ReadableStream | null; readonly body: ReadableStream | null;
/** Stores a `Boolean` that declares whether the body has been used in a /** Stores a `Boolean` that declares whether the body has been used in a

View file

@ -1,12 +1,5 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license. // Copyright 2018 the Deno authors. All rights reserved. MIT license.
import { import { assert, log, createResolvable, notImplemented } from "./util";
assert,
log,
createResolvable,
Resolvable,
typedArrayToArrayBuffer,
notImplemented
} from "./util";
import * as flatbuffers from "./flatbuffers"; import * as flatbuffers from "./flatbuffers";
import { sendAsync } from "./dispatch"; import { sendAsync } from "./dispatch";
import * as msg from "gen/msg_generated"; import * as msg from "gen/msg_generated";
@ -14,52 +7,65 @@ import * as domTypes from "./dom_types";
import { TextDecoder } from "./text_encoding"; import { TextDecoder } from "./text_encoding";
import { DenoBlob } from "./blob"; import { DenoBlob } from "./blob";
import { Headers } from "./headers"; import { Headers } from "./headers";
import * as io from "./io";
import { read, close } from "./files";
import { Buffer } from "./buffer";
class FetchResponse implements domTypes.Response { class Body implements domTypes.Body, domTypes.ReadableStream, io.ReadCloser {
readonly url: string = ""; bodyUsed = false;
body: null; private _bodyPromise: null | Promise<ArrayBuffer> = null;
bodyUsed = false; // TODO private _data: ArrayBuffer | null = null;
statusText = "FIXME"; // TODO readonly locked: boolean = false; // TODO
readonly type = "basic"; // TODO readonly body: null | Body = this;
redirected = false; // TODO
headers: domTypes.Headers;
readonly trailer: Promise<domTypes.Headers>;
//private bodyChunks: Uint8Array[] = [];
private first = true;
private bodyData: ArrayBuffer;
private bodyWaiter: Resolvable<ArrayBuffer>;
constructor( constructor(private rid: number, readonly contentType: string) {}
readonly status: number,
readonly body_: ArrayBuffer, private async _bodyBuffer(): Promise<ArrayBuffer> {
headersList: Array<[string, string]> assert(this._bodyPromise == null);
) { const buf = new Buffer();
this.bodyWaiter = createResolvable(); try {
this.trailer = createResolvable(); const nread = await buf.readFrom(this);
this.headers = new Headers(headersList); const ui8 = buf.bytes();
this.bodyData = body_; assert(ui8.byteLength === nread);
setTimeout(() => { this._data = ui8.buffer.slice(
this.bodyWaiter.resolve(body_); ui8.byteOffset,
}, 0); ui8.byteOffset + nread
) as ArrayBuffer;
assert(this._data.byteLength === nread);
} finally {
this.close();
}
return this._data;
} }
arrayBuffer(): Promise<ArrayBuffer> { async arrayBuffer(): Promise<ArrayBuffer> {
return this.bodyWaiter; // If we've already bufferred the response, just return it.
if (this._data != null) {
return this._data;
}
// If there is no _bodyPromise yet, start it.
if (this._bodyPromise == null) {
this._bodyPromise = this._bodyBuffer();
}
return this._bodyPromise;
} }
async blob(): Promise<domTypes.Blob> { async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer(); const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], { return new DenoBlob([arrayBuffer], {
type: this.headers.get("content-type") || "" type: this.contentType
}); });
} }
async formData(): Promise<domTypes.FormData> { async formData(): Promise<domTypes.FormData> {
notImplemented(); return notImplemented();
return {} as domTypes.FormData;
} }
async json(): Promise<object> { // tslint:disable-next-line:no-any
async json(): Promise<any> {
const text = await this.text(); const text = await this.text();
return JSON.parse(text); return JSON.parse(text);
} }
@ -70,6 +76,71 @@ class FetchResponse implements domTypes.Response {
return decoder.decode(ab); return decoder.decode(ab);
} }
read(p: Uint8Array): Promise<io.ReadResult> {
return read(this.rid, p);
}
close(): void {
close(this.rid);
}
async cancel(): Promise<void> {
return notImplemented();
}
getReader(): domTypes.ReadableStreamReader {
return notImplemented();
}
}
class Response implements domTypes.Response {
readonly url: string = "";
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
headers: domTypes.Headers;
readonly trailer: Promise<domTypes.Headers>;
bodyUsed = false;
readonly body: Body;
constructor(
readonly status: number,
headersList: Array<[string, string]>,
rid: number,
body_: null | Body = null
) {
this.trailer = createResolvable();
this.headers = new Headers(headersList);
const contentType = this.headers.get("content-type") || "";
if (body_ == null) {
this.body = new Body(rid, contentType);
} else {
this.body = body_;
}
}
async arrayBuffer(): Promise<ArrayBuffer> {
return this.body.arrayBuffer();
}
async blob(): Promise<domTypes.Blob> {
return this.body.blob();
}
async formData(): Promise<domTypes.FormData> {
return this.body.formData();
}
// tslint:disable-next-line:no-any
async json(): Promise<any> {
return this.body.json();
}
async text(): Promise<string> {
return this.body.text();
}
get ok(): boolean { get ok(): boolean {
return 200 <= this.status && this.status < 300; return 200 <= this.status && this.status < 300;
} }
@ -87,25 +158,7 @@ class FetchResponse implements domTypes.Response {
headersList.push(header); headersList.push(header);
} }
return new FetchResponse(this.status, this.bodyData.slice(0), headersList); return new Response(this.status, headersList, -1, this.body);
}
onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;
onMsg(base: msg.Base) {
/*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
*/
if (this.first) {
this.first = false;
}
} }
} }
@ -113,7 +166,7 @@ class FetchResponse implements domTypes.Response {
export async function fetch( export async function fetch(
input?: domTypes.Request | string, input?: domTypes.Request | string,
init?: domTypes.RequestInit init?: domTypes.RequestInit
): Promise<domTypes.Response> { ): Promise<Response> {
const url = input as string; const url = input as string;
log("dispatch FETCH_REQ", url); log("dispatch FETCH_REQ", url);
@ -134,9 +187,7 @@ export async function fetch(
assert(resBase.inner(inner) != null); assert(resBase.inner(inner) != null);
const status = inner.status(); const status = inner.status();
const bodyArray = inner.bodyArray(); const bodyRid = inner.bodyRid();
assert(bodyArray != null);
const body = typedArrayToArrayBuffer(bodyArray!);
const headersList: Array<[string, string]> = []; const headersList: Array<[string, string]> = [];
const len = inner.headerKeyLength(); const len = inner.headerKeyLength();
@ -146,6 +197,6 @@ export async function fetch(
headersList.push([key, value]); headersList.push([key, value]);
} }
const response = new FetchResponse(status, body, headersList); const response = new Response(status, headersList, bodyRid);
return response; return response;
} }

View file

@ -77,7 +77,7 @@ export interface Seeker {
} }
// https://golang.org/pkg/io/#ReadCloser // https://golang.org/pkg/io/#ReadCloser
export interface ReaderCloser extends Reader, Closer {} export interface ReadCloser extends Reader, Closer {}
// https://golang.org/pkg/io/#WriteCloser // https://golang.org/pkg/io/#WriteCloser
export interface WriteCloser extends Writer, Closer {} export interface WriteCloser extends Writer, Closer {}

View file

@ -206,7 +206,7 @@ table FetchRes {
status: int; status: int;
header_key: [string]; header_key: [string];
header_value: [string]; header_value: [string];
body: [ubyte]; body_rid: uint32;
} }
table MakeTempDir { table MakeTempDir {

View file

@ -18,7 +18,7 @@ use futures;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::Poll; use futures::Poll;
use hyper; use hyper;
use hyper::rt::{Future, Stream}; use hyper::rt::Future;
use remove_dir_all::remove_dir_all; use remove_dir_all::remove_dir_all;
use repl; use repl;
use resources::table_entries; use resources::table_entries;
@ -417,20 +417,17 @@ fn op_fetch(
(keys, values) (keys, values)
}; };
// TODO Handle streaming body. let body = res.into_body();
res let body_resource = resources::add_hyper_body(body);
.into_body() Ok((status, headers, body_resource))
.concat2()
.map(move |body| (status, body, headers))
}); });
let future = future.map_err(|err| -> DenoError { err.into() }).and_then( let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
move |(status, body, headers)| { move |(status, headers, body_resource)| {
debug!("fetch body "); debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new(); let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate // Send the first message without a body. This is just to indicate
// what status code. // what status code.
let body_off = builder.create_vector(body.as_ref());
let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect(); let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect();
let header_keys_off = let header_keys_off =
builder.create_vector_of_strings(header_keys.as_slice()); builder.create_vector_of_strings(header_keys.as_slice());
@ -443,7 +440,7 @@ fn op_fetch(
&msg::FetchResArgs { &msg::FetchResArgs {
id, id,
status, status,
body: Some(body_off), body_rid: body_resource.rid,
header_key: Some(header_keys_off), header_key: Some(header_keys_off),
header_value: Some(header_values_off), header_value: Some(header_values_off),
}, },

View file

@ -13,6 +13,7 @@ use eager_unix as eager;
use errors::bad_resource; use errors::bad_resource;
use errors::DenoError; use errors::DenoError;
use errors::DenoResult; use errors::DenoResult;
use http_body::HttpBody;
use repl::Repl; use repl::Repl;
use tokio_util; use tokio_util;
use tokio_write; use tokio_write;
@ -20,6 +21,7 @@ use tokio_write;
use futures; use futures;
use futures::future::{Either, FutureResult}; use futures::future::{Either, FutureResult};
use futures::Poll; use futures::Poll;
use hyper;
use std; use std;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Error, Read, Write}; use std::io::{Error, Read, Write};
@ -59,6 +61,7 @@ enum Repr {
FsFile(tokio::fs::File), FsFile(tokio::fs::File),
TcpListener(tokio::net::TcpListener), TcpListener(tokio::net::TcpListener),
TcpStream(tokio::net::TcpStream), TcpStream(tokio::net::TcpStream),
HttpBody(HttpBody),
Repl(Repl), Repl(Repl),
} }
@ -89,6 +92,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::FsFile(_) => "fsFile", Repr::FsFile(_) => "fsFile",
Repr::TcpListener(_) => "tcpListener", Repr::TcpListener(_) => "tcpListener",
Repr::TcpStream(_) => "tcpStream", Repr::TcpStream(_) => "tcpStream",
Repr::HttpBody(_) => "httpBody",
Repr::Repl(_) => "repl", Repr::Repl(_) => "repl",
}; };
@ -155,6 +159,7 @@ impl AsyncRead for Resource {
Repr::FsFile(ref mut f) => f.poll_read(buf), Repr::FsFile(ref mut f) => f.poll_read(buf),
Repr::Stdin(ref mut f) => f.poll_read(buf), Repr::Stdin(ref mut f) => f.poll_read(buf),
Repr::TcpStream(ref mut f) => f.poll_read(buf), Repr::TcpStream(ref mut f) => f.poll_read(buf),
Repr::HttpBody(ref mut f) => f.poll_read(buf),
_ => panic!("Cannot read"), _ => panic!("Cannot read"),
}, },
} }
@ -222,6 +227,15 @@ pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource {
Resource { rid } Resource { rid }
} }
pub fn add_hyper_body(body: hyper::Body) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
let body = HttpBody::from(body);
let r = tg.insert(rid, Repr::HttpBody(body));
assert!(r.is_none());
Resource { rid }
}
pub fn add_repl(repl: Repl) -> Resource { pub fn add_repl(repl: Repl) -> Resource {
let rid = new_rid(); let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap(); let mut tg = RESOURCE_TABLE.lock().unwrap();
@ -243,6 +257,7 @@ pub fn readline(rid: ResourceId, prompt: &str) -> DenoResult<String> {
} }
pub fn lookup(rid: ResourceId) -> Option<Resource> { pub fn lookup(rid: ResourceId) -> Option<Resource> {
debug!("resource lookup {}", rid);
let table = RESOURCE_TABLE.lock().unwrap(); let table = RESOURCE_TABLE.lock().unwrap();
table.get(&rid).map(|_| Resource { rid }) table.get(&rid).map(|_| Resource { rid })
} }