From 08307fb84160602da12ba4d1b118c859e8a73cdb Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 21 May 2018 22:07:40 -0400 Subject: [PATCH] Add dispatch pub/sub --- Makefile | 13 +++++++- dispatch.go | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++ dispatch.ts | 61 ++++++++++++++++++++++++++++++++++++ handlers.go | 59 +++++++++++++++++++++-------------- main.go | 35 +++------------------ main.ts | 46 ++++++++++----------------- msg.proto | 5 +++ os.ts | 32 +++---------------- timers.ts | 27 ++++++++++------ 9 files changed, 244 insertions(+), 124 deletions(-) create mode 100644 dispatch.go create mode 100644 dispatch.ts diff --git a/Makefile b/Makefile index 45d402c99d..86a579d66b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ TS_FILES = \ tsconfig.json \ + dispatch.ts \ main.ts \ msg.pb.d.ts \ msg.pb.js \ @@ -10,7 +11,17 @@ TS_FILES = \ util.ts \ v8_source_maps.ts -deno: assets.go msg.pb.go main.go +GO_FILES = \ + assets.go \ + deno_dir.go \ + dispatch.go \ + handlers.go \ + main.go \ + main_test.go \ + msg.pb.go \ + util.go + +deno: $(GO_FILES) go build -o deno assets.go: dist/main.js diff --git a/dispatch.go b/dispatch.go new file mode 100644 index 0000000000..9ac12d25c2 --- /dev/null +++ b/dispatch.go @@ -0,0 +1,90 @@ +package main + +import ( + "github.com/golang/protobuf/proto" + "github.com/ry/v8worker2" + "sync" +) + +// There is a single global worker for this process. +// This file should be the only part of deno that directly access it, so that +// all interaction with V8 can go through a single point. +var worker *v8worker2.Worker + +var channels = make(map[string][]Subscriber) + +type Subscriber func(payload []byte) []byte + +func createWorker() { + worker = v8worker2.New(recv) +} + +func recv(buf []byte) (response []byte) { + msg := &BaseMsg{} + check(proto.Unmarshal(buf, msg)) + assert(len(msg.Payload) > 0, "BaseMsg has empty payload.") + subscribers, ok := channels[msg.Channel] + if !ok { + panic("No subscribers for channel " + msg.Channel) + } + for i := 0; i < len(subscribers); i++ { + s := subscribers[i] + r := s(msg.Payload) + if r != nil { + response = r + } + } + return response +} + +func Sub(channel string, cb Subscriber) { + subscribers, ok := channels[channel] + if !ok { + subscribers = make([]Subscriber, 0) + } + subscribers = append(subscribers, cb) + channels[channel] = subscribers +} + +func Pub(channel string, payload []byte) { + resChan <- &BaseMsg{ + Channel: channel, + Payload: payload, + } +} + +var resChan = make(chan *BaseMsg, 10) +var doneChan = make(chan bool) +var wg sync.WaitGroup + +func DispatchLoop() { + wg.Add(1) + first := true + + // In a goroutine, we wait on for all goroutines to complete (for example + // timers). We use this to signal to the main thread to exit. + go func() { + wg.Wait() + doneChan <- true + }() + + for { + select { + case msg := <-resChan: + out, err := proto.Marshal(msg) + err = worker.SendBytes(out) + check(err) + case <-doneChan: + // All goroutines have completed. Now we can exit main(). + return + } + + // We don't want to exit until we've received at least one message. + // This is so the program doesn't exit after sending the "start" + // message. + if first { + wg.Done() + } + first = false + } +} diff --git a/dispatch.ts b/dispatch.ts new file mode 100644 index 0000000000..859792b74b --- /dev/null +++ b/dispatch.ts @@ -0,0 +1,61 @@ +import { typedArrayToArrayBuffer } from "./util"; +import { _global } from "./globals"; +import { main as pb } from "./msg.pb"; + +type MessageCallback = (msg: Uint8Array) => void; + +const send = V8Worker2.send; +const channels = new Map(); + +export function sub(channel: string, cb: MessageCallback): void { + let subscribers = channels.get(channel); + if (!subscribers) { + subscribers = []; + channels.set(channel, subscribers); + } + subscribers.push(cb); +} + +export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer { + const msg = pb.BaseMsg.fromObject({ channel, payload }); + const ui8 = pb.BaseMsg.encode(msg).finish(); + const ab = typedArrayToArrayBuffer(ui8); + return send(ab); +} + +// Internal version of "pub". +// TODO add internal version of "sub" +// TODO rename to pubInternal() +export function sendMsgFromObject( + channel: string, + obj: pb.IMsg +): null | pb.Msg { + const msg = pb.Msg.fromObject(obj); + const ui8 = pb.Msg.encode(msg).finish(); + const resBuf = pub(channel, ui8); + if (resBuf != null && resBuf.byteLength > 0) { + const res = pb.Msg.decode(new Uint8Array(resBuf)); + if (res != null && res.error != null && res.error.length > 0) { + throw Error(res.error); + } + return res; + } else { + return null; + } +} + +V8Worker2.recv((ab: ArrayBuffer) => { + const msg = pb.BaseMsg.decode(new Uint8Array(ab)); + const subscribers = channels.get(msg.channel); + if (subscribers == null) { + throw Error(`No subscribers for channel "${msg.channel}".`); + } + + for (const subscriber of subscribers) { + subscriber(msg.payload); + } +}); + +// Delete the V8Worker2 from the global object, so that no one else can receive +// messages. +_global["V8Worker2"] = null; diff --git a/handlers.go b/handlers.go index d653615957..237aa822e6 100644 --- a/handlers.go +++ b/handlers.go @@ -10,29 +10,38 @@ import ( const assetPrefix string = "/$asset$/" -func recv(buf []byte) []byte { - msg := &Msg{} - err := proto.Unmarshal(buf, msg) - check(err) - switch msg.Payload.(type) { - case *Msg_Exit: - payload := msg.GetExit() - os.Exit(int(payload.Code)) - case *Msg_SourceCodeFetch: - payload := msg.GetSourceCodeFetch() - return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile) - case *Msg_SourceCodeCache: - payload := msg.GetSourceCodeCache() - return HandleSourceCodeCache(payload.Filename, payload.SourceCode, - payload.OutputCode) - case *Msg_TimerStart: - payload := msg.GetTimerStart() - return HandleTimerStart(payload.Id, payload.Interval, payload.Duration) - default: - panic("Unexpected message") - } +func InitHandlers() { + Sub("os", func(buf []byte) []byte { + msg := &Msg{} + check(proto.Unmarshal(buf, msg)) + switch msg.Payload.(type) { + case *Msg_Exit: + payload := msg.GetExit() + os.Exit(int(payload.Code)) + case *Msg_SourceCodeFetch: + payload := msg.GetSourceCodeFetch() + return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile) + case *Msg_SourceCodeCache: + payload := msg.GetSourceCodeCache() + return HandleSourceCodeCache(payload.Filename, payload.SourceCode, + payload.OutputCode) + default: + panic("[os] Unexpected message " + string(buf)) + } + return nil + }) - return nil + Sub("timers", func(buf []byte) []byte { + msg := &Msg{} + check(proto.Unmarshal(buf, msg)) + switch msg.Payload.(type) { + case *Msg_TimerStart: + payload := msg.GetTimerStart() + return HandleTimerStart(payload.Id, payload.Interval, payload.Duration) + default: + panic("[timers] Unexpected message " + string(buf)) + } + }) } func HandleSourceCodeFetch(moduleSpecifier string, containingFile string) (out []byte) { @@ -107,13 +116,15 @@ func HandleTimerStart(id int32, interval bool, duration int32) []byte { go func() { defer wg.Done() time.Sleep(time.Duration(duration) * time.Millisecond) - resChan <- &Msg{ + payload, err := proto.Marshal(&Msg{ Payload: &Msg_TimerReady{ TimerReady: &TimerReadyMsg{ Id: id, }, }, - } + }) + check(err) + Pub("timers", payload) }() return nil } diff --git a/main.go b/main.go index 9b4b6c120d..409a7940d8 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "net/url" "os" "path" - "sync" ) var flagReload = flag.Bool("reload", false, "Reload cached remote source code.") @@ -19,9 +18,6 @@ var DenoDir string var CompileDir string var SrcDir string -var wg sync.WaitGroup -var resChan chan *Msg - func ResolveModule(moduleSpecifier string, containingFile string) ( moduleName string, filename string, err error) { moduleUrl, err := url.Parse(moduleSpecifier) @@ -58,7 +54,8 @@ func main() { args = v8worker2.SetFlags(args) createDirs() - worker := v8worker2.New(recv) + createWorker() + InitHandlers() main_js := stringAsset("main.js") check(worker.Load("/main.js", main_js)) @@ -67,9 +64,6 @@ func main() { cwd, err := os.Getwd() check(err) - resChan = make(chan *Msg) - doneChan := make(chan bool) - out, err := proto.Marshal(&Msg{ Payload: &Msg_Start{ Start: &StartMsg{ @@ -82,28 +76,7 @@ func main() { }, }) check(err) - err = worker.SendBytes(out) - if err != nil { - os.Stderr.WriteString(err.Error()) - os.Exit(1) - } + Pub("start", out) - // In a goroutine, we wait on for all goroutines to complete (for example - // timers). We use this to signal to the main thread to exit. - go func() { - wg.Wait() - doneChan <- true - }() - - for { - select { - case msg := <-resChan: - out, err := proto.Marshal(msg) - err = worker.SendBytes(out) - check(err) - case <-doneChan: - // All goroutines have completed. Now we can exit main(). - return - } - } + DispatchLoop() } diff --git a/main.ts b/main.ts index 28c1ecdff2..4767f51eca 100644 --- a/main.ts +++ b/main.ts @@ -1,47 +1,33 @@ +import * as dispatch from "./dispatch"; import { main as pb } from "./msg.pb"; -import "./util"; + import * as runtime from "./runtime"; -import * as timers from "./timers"; import * as util from "./util"; +// These have top-level functions that need to execute. +import { initTimers } from "./timers"; + // To control internal logging output // Set with the -debug command-line flag. export let debug = false; +let startCalled = false; + +dispatch.sub("start", (payload: Uint8Array) => { + if (startCalled) { + throw Error("start message received more than once!"); + } + startCalled = true; + + const msg = pb.Msg.decode(payload); + const { cwd, argv, debugFlag, mainJs, mainMap } = msg.start; -function start( - cwd: string, - argv: string[], - debugFlag: boolean, - mainJs: string, - mainMap: string -): void { debug = debugFlag; util.log("start", { cwd, argv, debugFlag }); + initTimers(); runtime.setup(mainJs, mainMap); const inputFn = argv[0]; const mod = runtime.resolveModule(inputFn, cwd + "/"); mod.compileAndRun(); -} - -V8Worker2.recv((ab: ArrayBuffer) => { - const msg = pb.Msg.decode(new Uint8Array(ab)); - switch (msg.payload) { - case "start": - start( - msg.start.cwd, - msg.start.argv, - msg.start.debugFlag, - msg.start.mainJs, - msg.start.mainMap - ); - break; - case "timerReady": - timers.timerReady(msg.timerReady.id, msg.timerReady.done); - break; - default: - console.log("Unknown message", msg); - break; - } }); diff --git a/msg.proto b/msg.proto index 821d00a20c..3d90dc3805 100644 --- a/msg.proto +++ b/msg.proto @@ -1,6 +1,11 @@ syntax = "proto3"; package main; +message BaseMsg { + string channel = 1; + bytes payload = 2; +} + message Msg { string error = 1; oneof payload { diff --git a/os.ts b/os.ts index 82baac25df..b2f9e93ba0 100644 --- a/os.ts +++ b/os.ts @@ -1,18 +1,15 @@ -import { main as pb } from "./msg.pb"; import { ModuleInfo } from "./types"; -import { typedArrayToArrayBuffer } from "./util"; +import { sendMsgFromObject } from "./dispatch"; export function exit(code = 0): void { - sendMsgFromObject({ - exit: { code } - }); + sendMsgFromObject("os", { exit: { code } }); } export function sourceCodeFetch( moduleSpecifier: string, containingFile: string ): ModuleInfo { - const res = sendMsgFromObject({ + const res = sendMsgFromObject("os", { sourceCodeFetch: { moduleSpecifier, containingFile } }); return res.sourceCodeFetchRes; @@ -23,28 +20,7 @@ export function sourceCodeCache( sourceCode: string, outputCode: string ): void { - const res = sendMsgFromObject({ + sendMsgFromObject("os", { sourceCodeCache: { filename, sourceCode, outputCode } }); - throwOnError(res); -} - -export function sendMsgFromObject(obj: pb.IMsg): null | pb.Msg { - const msg = pb.Msg.fromObject(obj); - const ui8 = pb.Msg.encode(msg).finish(); - const ab = typedArrayToArrayBuffer(ui8); - const resBuf = V8Worker2.send(ab); - if (resBuf != null && resBuf.byteLength > 0) { - const res = pb.Msg.decode(new Uint8Array(resBuf)); - throwOnError(res); - return res; - } else { - return null; - } -} - -function throwOnError(res: pb.Msg) { - if (res != null && res.error != null && res.error.length > 0) { - throw Error(res.error); - } } diff --git a/timers.ts b/timers.ts index 6603b3d161..117fde2a80 100644 --- a/timers.ts +++ b/timers.ts @@ -1,4 +1,5 @@ -import { sendMsgFromObject } from "./os"; +import { main as pb } from "./msg.pb"; +import * as dispatch from "./dispatch"; let nextTimerId = 1; @@ -14,6 +15,20 @@ interface Timer { const timers = new Map(); +export function initTimers() { + dispatch.sub("timers", onMessage); +} + +function onMessage(payload: Uint8Array) { + const msg = pb.Msg.decode(payload); + const { id, done } = msg.timerReady; + const timer = timers.get(id); + timer.cb(); + if (done) { + timers.delete(id); + } +} + export function setTimeout(cb: TimerCallback, duration: number): number { const timer = { id: nextTimerId++, @@ -22,7 +37,7 @@ export function setTimeout(cb: TimerCallback, duration: number): number { cb }; timers.set(timer.id, timer); - sendMsgFromObject({ + dispatch.sendMsgFromObject("timers", { timerStart: { id: timer.id, interval: false, @@ -31,11 +46,3 @@ export function setTimeout(cb: TimerCallback, duration: number): number { }); return timer.id; } - -export function timerReady(id: number, done: boolean): void { - const timer = timers.get(id); - timer.cb(); - if (done) { - timers.delete(id); - } -}