From a831d1e2391e67f6d6ad3284ab9858d9b8d610c1 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sun, 27 May 2018 03:46:18 -0400 Subject: [PATCH] Implement fetch --- Makefile | 4 +- dispatch.go | 8 +++ dispatch.ts | 14 ++++ fetch.go | 64 ++++++++++++++++++ fetch.ts | 151 ++++++++++++++++++++++++++++++++++++++++++ globals.ts | 7 ++ main.go | 2 + main.ts | 3 +- msg.proto | 20 +++++- os.go | 4 +- package.json | 2 + runtime.ts | 6 +- testdata/fetch.ts | 12 ++++ testdata/fetch.ts.out | 2 + util.go | 8 +++ util.ts | 25 +++++++ yarn.lock | 8 +++ 17 files changed, 330 insertions(+), 10 deletions(-) create mode 100644 fetch.go create mode 100644 fetch.ts create mode 100644 testdata/fetch.ts create mode 100644 testdata/fetch.ts.out diff --git a/Makefile b/Makefile index 939d3f9394..e1df60d43a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ TS_FILES = \ dispatch.ts \ + fetch.ts \ globals.ts \ main.ts \ msg.pb.d.ts \ @@ -18,8 +19,9 @@ GO_FILES = \ assets.go \ deno_dir.go \ deno_dir_test.go \ - echo.go \ dispatch.go \ + echo.go \ + fetch.go \ main.go \ msg.pb.go \ os.go \ diff --git a/dispatch.go b/dispatch.go index f974ea5aeb..3746ece354 100644 --- a/dispatch.go +++ b/dispatch.go @@ -66,6 +66,7 @@ func Sub(channel string, cb Subscriber) { } func Pub(channel string, payload []byte) { + wg.Add(1) resChan <- &BaseMsg{ Channel: channel, Payload: payload, @@ -79,11 +80,14 @@ func PubMsg(channel string, msg *Msg) { } func DispatchLoop() { + // runtime.LockOSThread() wg.Add(1) first := true // In a goroutine, we wait on for all goroutines to complete (for example // timers). We use this to signal to the main thread to exit. + // wg.Add(1) basically translates to uv_ref, if this was Node. + // wg.Done() basically translates to uv_unref go func() { wg.Wait() doneChan <- true @@ -92,7 +96,11 @@ func DispatchLoop() { for { select { case msg := <-resChan: + wg.Done() out, err := proto.Marshal(msg) + if err != nil { + panic(err) + } err = worker.SendBytes(out) stats.v8workerSend++ stats.v8workerBytesSent += len(out) diff --git a/dispatch.ts b/dispatch.ts index c2a7a6ef78..41f736fff3 100644 --- a/dispatch.ts +++ b/dispatch.ts @@ -3,6 +3,7 @@ import { _global } from "./globals"; import { main as pb } from "./msg.pb"; type MessageCallback = (msg: Uint8Array) => void; +//type MessageStructCallback = (msg: pb.IMsg) => void; const send = V8Worker2.send; const channels = new Map(); @@ -16,6 +17,19 @@ export function sub(channel: string, cb: MessageCallback): void { subscribers.push(cb); } +/* +export function subMsg(channel: string, cb: MessageStructCallback): void { + sub(channel, (payload: Uint8Array) => { + const msg = pb.Msg.decode(payload); + if (msg.error != null) { + f.onError(new Error(msg.error)); + } else { + cb(msg); + } + }); +} + */ + export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer { const msg = pb.BaseMsg.fromObject({ channel, payload }); const ui8 = pb.BaseMsg.encode(msg).finish(); diff --git a/fetch.go b/fetch.go new file mode 100644 index 0000000000..d0d0ac1b3b --- /dev/null +++ b/fetch.go @@ -0,0 +1,64 @@ +package main + +import ( + "github.com/golang/protobuf/proto" + "io/ioutil" + "net/http" +) + +func InitFetch() { + Sub("fetch", func(buf []byte) []byte { + msg := &Msg{} + check(proto.Unmarshal(buf, msg)) + switch msg.Command { + case Msg_FETCH_REQ: + return Fetch( + msg.FetchReqId, + msg.FetchReqUrl) + default: + panic("[fetch] Unexpected message " + string(buf)) + } + }) +} + +func Fetch(id int32, targetUrl string) []byte { + logDebug("Fetch %d %s", id, targetUrl) + async(func() { + resMsg := &Msg{ + Command: Msg_FETCH_RES, + FetchResId: id, + } + + resp, err := http.Get(targetUrl) + if err != nil { + resMsg.Error = err.Error() + PubMsg("fetch", resMsg) + return + } + if resp == nil { + resMsg.Error = "resp is nil " + PubMsg("fetch", resMsg) + return + } + + resMsg.FetchResStatus = int32(resp.StatusCode) + logDebug("fetch success %d %s", resMsg.FetchResStatus, targetUrl) + PubMsg("fetch", resMsg) + + // Now we read the body and send another message0 + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if resp == nil { + resMsg.Error = "resp is nil " + PubMsg("fetch", resMsg) + return + } + + resMsg.FetchResBody = body + PubMsg("fetch", resMsg) + + // TODO streaming. + }) + return nil +} diff --git a/fetch.ts b/fetch.ts new file mode 100644 index 0000000000..dac019048f --- /dev/null +++ b/fetch.ts @@ -0,0 +1,151 @@ +import { assert, log, createResolvable, Resolvable } from "./util"; +import * as util from "./util"; +import * as dispatch from "./dispatch"; +import { main as pb } from "./msg.pb"; +import { TextDecoder } from "text-encoding"; + +export function initFetch() { + dispatch.sub("fetch", (payload: Uint8Array) => { + const msg = pb.Msg.decode(payload); + assert(msg.command === pb.Msg.Command.FETCH_RES); + const id = msg.fetchResId; + const f = fetchRequests.get(id); + assert(f != null, `Couldn't find FetchRequest id ${id}`); + + f.onMsg(msg); + }); +} + +const fetchRequests = new Map(); + +class FetchResponse implements Response { + readonly url: string; + body: null; + bodyUsed = false; // TODO + status: number; + statusText = "FIXME"; // TODO + readonly type = "basic"; // TODO + redirected = false; // TODO + headers: null; // TODO + //private bodyChunks: Uint8Array[] = []; + private first = true; + + constructor(readonly req: FetchRequest) { + this.url = req.url; + } + + bodyWaiter: Resolvable; + arrayBuffer(): Promise { + this.bodyWaiter = createResolvable(); + return this.bodyWaiter; + } + + blob(): Promise { + throw Error("not implemented"); + } + + formData(): Promise { + throw Error("not implemented"); + } + + async json(): Promise { + const text = await this.text(); + return JSON.parse(text); + } + + async text(): Promise { + const ab = await this.arrayBuffer(); + const enc = new TextDecoder("utf-8"); + // Maybe new Uint8Array(ab) + return enc.decode(ab); + } + + get ok(): boolean { + return 200 <= this.status && this.status < 300; + } + + clone(): Response { + throw Error("not implemented"); + } + + onHeader: (res: Response) => void; + onError: (error: Error) => void; + + onMsg(msg: pb.Msg) { + if (msg.error !== null && msg.error !== "") { + //throw new Error(msg.error) + this.onError(new Error(msg.error)); + return; + } + + if (this.first) { + this.first = false; + this.status = msg.fetchResStatus; + this.onHeader(this); + } else { + // Body message. Assuming it all comes in one message now. + const ab = util.typedArrayToArrayBuffer(msg.fetchResBody); + this.bodyWaiter.resolve(ab); + } + } +} + +let nextFetchId = 0; +//TODO implements Request +class FetchRequest { + private readonly id: number; + response: FetchResponse; + constructor(readonly url: string) { + this.id = nextFetchId++; + fetchRequests.set(this.id, this); + this.response = new FetchResponse(this); + } + + onMsg(msg: pb.Msg) { + this.response.onMsg(msg); + } + + destroy() { + fetchRequests.delete(this.id); + } + + start() { + log("dispatch FETCH_REQ", this.id, this.url); + const res = dispatch.sendMsg("fetch", { + command: pb.Msg.Command.FETCH_REQ, + fetchReqId: this.id, + fetchReqUrl: this.url + }); + assert(res == null); + } +} + +export function fetch( + input?: Request | string, + init?: RequestInit +): Promise { + const fetchReq = new FetchRequest(input as string); + const response = fetchReq.response; + return new Promise((resolve, reject) => { + // tslint:disable-next-line:no-any + response.onHeader = (response: any) => { + log("onHeader"); + resolve(response); + }; + response.onError = (error: Error) => { + log("onError", error); + reject(error); + }; + fetchReq.start(); + }); +} + +/* +fetch('http://example.com/movies.json') + .then(function(response) { + return response.json(); + }) + .then(function(myJson) { + console.log(myJson); + }); + */ diff --git a/globals.ts b/globals.ts index d38fbf47d2..184f25163b 100644 --- a/globals.ts +++ b/globals.ts @@ -45,3 +45,10 @@ function stringifyArgs(args: any[]): string { } return out.join(" "); } + +import { fetch } from "./fetch"; +_global["fetch"] = fetch; + +import { TextEncoder, TextDecoder } from "text-encoding"; +_global["TextEncoder"] = TextEncoder; +_global["TextDecoder"] = TextDecoder; diff --git a/main.go b/main.go index bb2e32205d..e34137236b 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ func FlagsParse() []string { if *flagV8Options { args = append(args, "--help") } + args = append(args, "--abort-on-uncaught-exception") args = v8worker2.SetFlags(args) return args @@ -49,6 +50,7 @@ func main() { InitOS() InitEcho() InitTimers() + InitFetch() main_js := stringAsset("main.js") err := worker.Load("/main.js", main_js) diff --git a/main.ts b/main.ts index 25ade14f9b..10acb649fb 100644 --- a/main.ts +++ b/main.ts @@ -7,8 +7,8 @@ import { main as pb } from "./msg.pb"; import * as runtime from "./runtime"; import * as util from "./util"; -// These have top-level functions that need to execute. import { initTimers } from "./timers"; +import { initFetch } from "./fetch"; // To control internal logging output // Set with the -debug command-line flag. @@ -32,6 +32,7 @@ dispatch.sub("start", (payload: Uint8Array) => { util.log("start", { cwd, argv, debugFlag }); initTimers(); + initFetch(); runtime.setup(mainJs, mainMap); const inputFn = argv[0]; diff --git a/msg.proto b/msg.proto index ee9cfceeb9..1c18c8018a 100644 --- a/msg.proto +++ b/msg.proto @@ -7,8 +7,6 @@ message BaseMsg { } message Msg { - string error = 1; - enum Command { ERROR = 0; START = 1; @@ -19,8 +17,10 @@ message Msg { TIMER_START = 6; TIMER_READY = 7; TIMER_CLEAR = 8; + FETCH_REQ = 9; + FETCH_RES = 10; } - Command command = 2; + Command command = 1; // We avoid creating a message for each command (and use oneof or any types) // In order to reduce code in the size of the generated javascript @@ -28,6 +28,9 @@ message Msg { // potentially add many hundreds of commands. Therefore we just prefix command // arguments by their name. + // ERROR + string error = 2; + // START string start_cwd = 10; repeated string start_argv = 11; @@ -67,4 +70,15 @@ message Msg { // TIMER_CLEAR int32 timer_clear_id = 80; + + // FETCH_REQ + int32 fetch_req_id = 90; + string fetch_req_url = 91; + // repeated string fetch_req_header_line = 91 + + // FETCH_RES + int32 fetch_res_id = 100; + int32 fetch_res_status = 101; + repeated string fetch_res_header_line = 102; + bytes fetch_res_body = 103; } diff --git a/os.go b/os.go index d1923837e4..4f4f26d71f 100644 --- a/os.go +++ b/os.go @@ -37,7 +37,7 @@ func InitOS() { func ResolveModule(moduleSpecifier string, containingFile string) ( moduleName string, filename string, err error) { - logDebug("ResolveModule %s %s", moduleSpecifier, containingFile) + logDebug("os.go ResolveModule moduleSpecifier %s containingFile %s", moduleSpecifier, containingFile) moduleUrl, err := url.Parse(moduleSpecifier) if err != nil { @@ -76,7 +76,7 @@ func HandleCodeFetch(moduleSpecifier string, containingFile string) (out []byte) return } - logDebug("HandleCodeFetch moduleSpecifier %s containingFile %s filename %s", + logDebug("CodeFetch moduleSpecifier %s containingFile %s filename %s", moduleSpecifier, containingFile, filename) if isRemote(moduleName) { diff --git a/package.json b/package.json index 1eae8b70a6..2a6288c403 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "devDependencies": { "@types/base64-js": "^1.2.5", "@types/source-map-support": "^0.4.0", + "@types/text-encoding": "^0.0.32", "babel-polyfill": "^6.26.0", "base64-js": "^1.3.0", "espree": "^3.5.3", @@ -16,6 +17,7 @@ "prettier": "^1.12.1", "protobufjs": "^6.8.6", "source-map": "0.6.0", + "text-encoding": "^0.6.4", "tmp": "0.0.33", "tslint": "5.10.0", "typescript": "^2.8.3", diff --git a/runtime.ts b/runtime.ts index 52277653f2..c34f7faac2 100644 --- a/runtime.ts +++ b/runtime.ts @@ -24,7 +24,7 @@ type AmdFactory = (...args: any[]) => undefined | object; type AmdDefine = (deps: string[], factory: AmdFactory) => void; // Uncaught exceptions are sent to window.onerror by v8worker2. -window.onerror = function(message, source, lineno, colno, error) { +window.onerror = (message, source, lineno, colno, error) => { console.log(error.message, error.stack); os.exit(1); }; @@ -143,11 +143,11 @@ export function resolveModule( moduleSpecifier, containingFile ); - if (sourceCode.length == 0) { + if (sourceCode.length === 0) { return null; } util.log("resolveModule sourceCode length ", sourceCode.length); - let m = FileModule.load(filename); + const m = FileModule.load(filename); if (m != null) { return m; } else { diff --git a/testdata/fetch.ts b/testdata/fetch.ts new file mode 100644 index 0000000000..24d4139039 --- /dev/null +++ b/testdata/fetch.ts @@ -0,0 +1,12 @@ + +const request = async () => { + const response = await fetch('http://localhost:4545/package.json'); + const json = await response.json(); + console.log("expect deno:", json.name); + if (json.name !== "deno") { + throw Error("bad value" + json.name); + } +} + +request(); +console.log("fetch started"); diff --git a/testdata/fetch.ts.out b/testdata/fetch.ts.out new file mode 100644 index 0000000000..b8ca9c5982 --- /dev/null +++ b/testdata/fetch.ts.out @@ -0,0 +1,2 @@ +fetch started +expect deno: deno diff --git a/util.go b/util.go index 11c4514cf9..1b87413ef7 100644 --- a/util.go +++ b/util.go @@ -49,3 +49,11 @@ func exitOnError(err error) { os.Exit(1) } } + +func async(cb func()) { + wg.Add(1) + go func() { + cb() + wg.Done() + }() +} diff --git a/util.ts b/util.ts index c4fa032558..67d3b550e0 100644 --- a/util.ts +++ b/util.ts @@ -20,3 +20,28 @@ export function typedArrayToArrayBuffer(ta: TypedArray): ArrayBuffer { const ab = ta.buffer.slice(ta.byteOffset, ta.byteOffset + ta.byteLength); return ab as ArrayBuffer; } + +// A `Resolvable` is a Promise with the `reject` and `resolve` functions +// placed as methods on the promise object itself. It allows you to do: +// +// const p = createResolvable(); +// ... +// p.resolve(42); +// +// It'd be prettier to make Resolvable a class that inherits from Promise, +// rather than an interface. This is possible in ES2016, however typescript +// produces broken code when targeting ES5 code. +// See https://github.com/Microsoft/TypeScript/issues/15202 +// At the time of writing, the github issue is closed but the problem remains. +export interface Resolvable extends Promise { + resolve: (value?: T | PromiseLike) => void; + // tslint:disable-next-line:no-any + reject: (reason?: any) => void; +} +export function createResolvable(): Resolvable { + let methods; + const promise = new Promise((resolve, reject) => { + methods = { resolve, reject }; + }); + return Object.assign(promise, methods) as Resolvable; +} diff --git a/yarn.lock b/yarn.lock index 7c430653ec..708b5dd01d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -67,6 +67,10 @@ dependencies: "@types/node" "*" +"@types/text-encoding@^0.0.32": + version "0.0.32" + resolved "https://registry.yarnpkg.com/@types/text-encoding/-/text-encoding-0.0.32.tgz#52289b320a406850b14f08f48b475ca021218048" + abbrev@1: version "1.1.1" resolved "https://registry.yarnpkg.com/abbrev/-/abbrev-1.1.1.tgz#f8f2c887ad10bf67f634f005b6987fed3179aac8" @@ -3541,6 +3545,10 @@ tar@^4: safe-buffer "^5.1.2" yallist "^3.0.2" +text-encoding@^0.6.4: + version "0.6.4" + resolved "https://registry.yarnpkg.com/text-encoding/-/text-encoding-0.6.4.tgz#e399a982257a276dae428bb92845cb71bdc26d19" + through2@^2.0.0, through2@~2.0.3: version "2.0.3" resolved "https://registry.yarnpkg.com/through2/-/through2-2.0.3.tgz#0004569b37c7c74ba39c43f3ced78d1ad94140be"