diff --git a/cli/fmt_errors.rs b/cli/fmt_errors.rs index 83e417fe24..9979eeb290 100644 --- a/cli/fmt_errors.rs +++ b/cli/fmt_errors.rs @@ -151,6 +151,12 @@ impl JSError { } } +impl Into for JSError { + fn into(self) -> V8Exception { + self.0 + } +} + impl DisplayFormatter for JSError { fn format_category_and_code(&self) -> String { "".to_string() diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts index d658c5de75..7e6709dc6b 100644 --- a/cli/js/dispatch.ts +++ b/cli/js/dispatch.ts @@ -42,8 +42,11 @@ export let OP_QUERY_PERMISSION: number; export let OP_REVOKE_PERMISSION: number; export let OP_REQUEST_PERMISSION: number; export let OP_CREATE_WORKER: number; -export let OP_HOST_GET_WORKER_CLOSED: number; +export let OP_HOST_GET_WORKER_LOADED: number; export let OP_HOST_POST_MESSAGE: number; +export let OP_HOST_POLL_WORKER: number; +export let OP_HOST_CLOSE_WORKER: number; +export let OP_HOST_RESUME_WORKER: number; export let OP_HOST_GET_MESSAGE: number; export let OP_WORKER_POST_MESSAGE: number; export let OP_WORKER_GET_MESSAGE: number; diff --git a/cli/js/globals.ts b/cli/js/globals.ts index c7f3b23f2c..f090afcd4e 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -150,6 +150,7 @@ window.performance = new performanceUtil.Performance(); // This variable functioning correctly depends on `declareAsLet` // in //tools/ts_library_builder/main.ts window.onmessage = workers.onmessage; +window.onerror = workers.onerror; window.workerMain = workers.workerMain; window.workerClose = workers.workerClose; diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts index 8740af062e..109c0367d4 100644 --- a/cli/js/lib.deno_runtime.d.ts +++ b/cli/js/lib.deno_runtime.d.ts @@ -1986,6 +1986,7 @@ declare interface Window { Response: typeof __fetch.Response; performance: __performanceUtil.Performance; onmessage: (e: { data: any }) => void; + onerror: undefined | typeof onerror; workerMain: typeof __workers.workerMain; workerClose: typeof __workers.workerClose; postMessage: typeof __workers.postMessage; @@ -2036,6 +2037,7 @@ declare const Request: __domTypes.RequestConstructor; declare const Response: typeof __fetch.Response; declare const performance: __performanceUtil.Performance; declare let onmessage: (e: { data: any }) => void; +declare let onerror: (e: Event) => void; declare const workerMain: typeof __workers.workerMain; declare const workerClose: typeof __workers.workerClose; declare const postMessage: typeof __workers.postMessage; @@ -3293,7 +3295,7 @@ declare namespace __workers { export function workerClose(): void; export function workerMain(): Promise; export interface Worker { - onerror?: () => void; + onerror?: (e: Event) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; @@ -3311,7 +3313,7 @@ declare namespace __workers { private readonly id; private isClosing; private readonly isClosedPromise; - onerror?: () => void; + onerror?: (e: Event) => void; onmessage?: (data: any) => void; onmessageerror?: () => void; constructor(specifier: string, options?: DenoWorkerOptions); diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 27f8731006..d1d8f78e2c 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -2,11 +2,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as dispatch from "./dispatch.ts"; import { sendAsync, sendSync } from "./dispatch_json.ts"; -import { log } from "./util.ts"; +import { log, createResolvable, Resolvable } from "./util.ts"; import { TextDecoder, TextEncoder } from "./text_encoding.ts"; import { window } from "./window.ts"; import { blobURLMap } from "./url.ts"; import { blobBytesWeakMap } from "./blob.ts"; +import { EventTarget } from "./event_target.ts"; const encoder = new TextEncoder(); const decoder = new TextDecoder(); @@ -26,7 +27,7 @@ function createWorker( includeDenoNamespace: boolean, hasSourceCode: boolean, sourceCode: Uint8Array -): number { +): { id: number; loaded: boolean } { return sendSync(dispatch.OP_CREATE_WORKER, { specifier, includeDenoNamespace, @@ -35,8 +36,20 @@ function createWorker( }); } -async function hostGetWorkerClosed(id: number): Promise { - await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id }); +async function hostGetWorkerLoaded(id: number): Promise { + return await sendAsync(dispatch.OP_HOST_GET_WORKER_LOADED, { id }); +} + +async function hostPollWorker(id: number): Promise { + return await sendAsync(dispatch.OP_HOST_POLL_WORKER, { id }); +} + +function hostCloseWorker(id: number): void { + sendSync(dispatch.OP_HOST_CLOSE_WORKER, { id }); +} + +function hostResumeWorker(id: number): void { + sendSync(dispatch.OP_HOST_RESUME_WORKER, { id }); } function hostPostMessage(id: number, data: any): void { @@ -56,6 +69,7 @@ async function hostGetMessage(id: number): Promise { // Stuff for workers export const onmessage: (e: { data: any }) => void = (): void => {}; +export const onerror: (e: { data: any }) => void = (): void => {}; export function postMessage(data: any): void { const dataIntArray = encodeMessage(data); @@ -88,25 +102,41 @@ export async function workerMain(): Promise { break; } - if (window["onmessage"]) { - const event = { data }; - const result: void | Promise = window.onmessage(event); + let result: void | Promise; + const event = { data }; + + try { + result = window.onmessage(event); if (result && "then" in result) { await result; } - } - - if (!window["onmessage"]) { - break; + if (!window["onmessage"]) { + break; + } + } catch (e) { + if (window["onerror"]) { + const result = window.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + continue; + } + } + throw e; } } } export interface Worker { - onerror?: () => void; + onerror?: (e: any) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; + // TODO(bartlomieju): remove this closed: Promise; } @@ -122,15 +152,18 @@ export interface DenoWorkerOptions extends WorkerOptions { noDenoNamespace?: boolean; } -export class WorkerImpl implements Worker { +export class WorkerImpl extends EventTarget implements Worker { private readonly id: number; private isClosing = false; - private readonly isClosedPromise: Promise; - public onerror?: () => void; + private messageBuffer: any[] = []; + private ready = false; + private readonly isClosedPromise: Resolvable; + public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; constructor(specifier: string, options?: DenoWorkerOptions) { + super(); let hasSourceCode = false; let sourceCode = new Uint8Array(); @@ -152,24 +185,87 @@ export class WorkerImpl implements Worker { sourceCode = blobBytes!; } - this.id = createWorker( + const { id, loaded } = createWorker( specifier, includeDenoNamespace, hasSourceCode, sourceCode ); - this.run(); - this.isClosedPromise = hostGetWorkerClosed(this.id); - this.isClosedPromise.then((): void => { - this.isClosing = true; - }); + this.id = id; + this.ready = loaded; + this.isClosedPromise = createResolvable(); + this.poll(); } get closed(): Promise { return this.isClosedPromise; } + private handleError(e: any): boolean { + const event = new window.Event("error", { cancelable: true }); + event.message = e.message; + event.lineNumber = e.lineNumber ? e.lineNumber + 1 : null; + event.columnNumber = e.columnNumber ? e.columnNumber + 1 : null; + event.fileName = e.fileName; + event.error = null; + + let handled = false; + if (this.onerror) { + this.onerror(event); + if (event.defaultPrevented) { + handled = true; + } + } + + return handled; + } + + async poll(): Promise { + // If worker has not been immediately executed + // then let's await it's readiness + if (!this.ready) { + const result = await hostGetWorkerLoaded(this.id); + + if (result.error) { + if (!this.handleError(result.error)) { + throw new Error(result.error.message); + } + return; + } + } + + // drain messages + for (const data of this.messageBuffer) { + hostPostMessage(this.id, data); + } + this.messageBuffer = []; + this.ready = true; + this.run(); + + while (true) { + const result = await hostPollWorker(this.id); + + if (result.error) { + if (!this.handleError(result.error)) { + throw Error(result.error.message); + } else { + hostResumeWorker(this.id); + } + } else { + this.isClosing = true; + hostCloseWorker(this.id); + this.isClosedPromise.resolve(); + break; + } + } + } + postMessage(data: any): void { + if (!this.ready) { + this.messageBuffer.push(data); + return; + } + hostPostMessage(this.id, data); } @@ -180,7 +276,6 @@ export class WorkerImpl implements Worker { log("worker got null message. quitting."); break; } - // TODO(afinch7) stop this from eating messages before onmessage has been assigned if (this.onmessage) { const event = { data }; this.onmessage(event); diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 6ebaa141f8..eeffb39305 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -4,12 +4,15 @@ use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; +use crate::deno_error::GetErrorKind; +use crate::fmt_errors::JSError; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; use crate::worker::Worker; use deno_core::*; use futures; +use futures::channel::mpsc; use futures::future::FutureExt; use futures::future::TryFutureExt; use futures::sink::SinkExt; @@ -19,7 +22,6 @@ use std::convert::From; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering; -use std::sync::mpsc; use std::task::Context; use std::task::Poll; @@ -29,8 +31,20 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { s.core_op(json_op(s.stateful_op(op_create_worker))), ); i.register_op( - "host_get_worker_closed", - s.core_op(json_op(s.stateful_op(op_host_get_worker_closed))), + "host_get_worker_loaded", + s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), + ); + i.register_op( + "host_poll_worker", + s.core_op(json_op(s.stateful_op(op_host_poll_worker))), + ); + i.register_op( + "host_close_worker", + s.core_op(json_op(s.stateful_op(op_host_close_worker))), + ); + i.register_op( + "host_resume_worker", + s.core_op(json_op(s.stateful_op(op_host_resume_worker))), ); i.register_op( "host_post_message", @@ -155,37 +169,36 @@ fn op_create_worker( js_check(worker.execute("workerMain()")); let worker_id = parent_state.add_child_worker(worker.clone()); - let response = json!(worker_id); // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(response)); + return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); } - // TODO(bartlomieju): this should spawn mod execution on separate tokio task - // and block on receving message on a channel or even use sync channel /shrug - let (sender, receiver) = mpsc::sync_channel::>(1); + let (mut sender, receiver) = mpsc::channel::>(1); + + // TODO(bartlomieju): this future should be spawned on the separate thread, + // dedicated to that worker let fut = async move { let result = worker .execute_mod_async(&module_specifier, None, false) .await; - sender.send(result).expect("Failed to send message"); + sender.send(result).await.expect("Failed to send message"); } .boxed(); tokio::spawn(fut); - - let result = receiver.recv().expect("Failed to receive message"); - result?; - Ok(JsonOp::Sync(response)) + let mut table = state.loading_workers.lock().unwrap(); + table.insert(worker_id, receiver); + Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) } -struct GetWorkerClosedFuture { +struct WorkerPollFuture { state: ThreadSafeState, rid: ResourceId, } -impl Future for GetWorkerClosedFuture { +impl Future for WorkerPollFuture { type Output = Result<(), ErrBox>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -203,39 +216,114 @@ impl Future for GetWorkerClosedFuture { } } +fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { + if let Err(error) = result { + match error.kind() { + ErrorKind::JSError => { + let error = error.downcast::().unwrap(); + let exception: V8Exception = error.into(); + json!({"error": { + "message": exception.message, + "fileName": exception.script_resource_name, + "lineNumber": exception.line_number, + "columnNumber": exception.start_column, + }}) + } + _ => json!({"error": { + "message": error.to_string(), + }}), + } + } else { + json!({"ok": true}) + } +} + #[derive(Deserialize)] -struct HostGetWorkerClosedArgs { +struct WorkerArgs { id: i32, } -/// Return when the worker closes -fn op_host_get_worker_closed( +fn op_host_get_worker_loaded( state: &ThreadSafeState, args: Value, _data: Option, ) -> Result { - let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let mut table = state.loading_workers.lock().unwrap(); + let mut receiver = table.remove(&id).unwrap(); + + let op = async move { + let result = receiver.next().await.unwrap(); + Ok(serialize_worker_result(result)) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +fn op_host_poll_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let state_ = state.clone(); - let future = GetWorkerClosedFuture { + let future = WorkerPollFuture { state: state.clone(), rid: id, }; - let op = future.then(move |_result| { - let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker = workers_table.remove(&id); - if let Some(worker) = maybe_worker { - let mut channels = worker.state.worker_channels.lock().unwrap(); - channels.sender.close_channel(); - channels.receiver.close(); - }; - futures::future::ok(json!({})) - }); + let op = async move { + let result = future.await; + + if result.is_err() { + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + worker.clear_exception(); + } + + Ok(serialize_worker_result(result)) + }; Ok(JsonOp::Async(op.boxed())) } +fn op_host_close_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let maybe_worker = workers_table.remove(&id); + if let Some(worker) = maybe_worker { + let mut channels = worker.state.worker_channels.lock().unwrap(); + channels.sender.close_channel(); + channels.receiver.close(); + }; + + Ok(JsonOp::Sync(json!({}))) +} + +fn op_host_resume_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + js_check(worker.execute("workerMain()")); + Ok(JsonOp::Sync(json!({}))) +} + #[derive(Deserialize)] struct HostGetMessageArgs { id: i32, diff --git a/cli/state.rs b/cli/state.rs index a4974958bc..ed7b8e438d 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -53,6 +53,7 @@ pub struct State { pub metrics: Metrics, pub global_timer: Mutex, pub workers: Mutex>, + pub loading_workers: Mutex>>>, pub next_worker_id: AtomicUsize, pub start_time: Instant, pub seeded_rng: Option>, @@ -248,6 +249,7 @@ impl ThreadSafeState { metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), workers: Mutex::new(HashMap::new()), + loading_workers: Mutex::new(HashMap::new()), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, diff --git a/cli/tests/026_workers.ts b/cli/tests/026_workers.ts index f45fc4b77e..7ac1a0f326 100644 --- a/cli/tests/026_workers.ts +++ b/cli/tests/026_workers.ts @@ -11,4 +11,10 @@ jsWorker.onmessage = (e): void => { tsWorker.postMessage("Hello World"); }; +jsWorker.onerror = (e: Event): void => { + e.preventDefault(); + console.log("called onerror in script"); + jsWorker.postMessage("Hello World"); +}; + jsWorker.postMessage("Hello World"); diff --git a/cli/tests/026_workers.ts.out b/cli/tests/026_workers.ts.out index 7538cc867a..92f7550ad1 100644 --- a/cli/tests/026_workers.ts.out +++ b/cli/tests/026_workers.ts.out @@ -1,4 +1,7 @@ Hello World +called onerror in worker +called onerror in script +Hello World Received js: Hello World Hello World Received ts: Hello World diff --git a/cli/tests/subdir/test_worker.js b/cli/tests/subdir/test_worker.js index 53d38ba96c..cec5bdf9be 100644 --- a/cli/tests/subdir/test_worker.js +++ b/cli/tests/subdir/test_worker.js @@ -1,7 +1,19 @@ +let thrown = false; + onmessage = function(e) { console.log(e.data); + if (thrown === false) { + thrown = true; + throw new SyntaxError("[test error]"); + } + postMessage(e.data); workerClose(); }; + +onerror = function() { + console.log("called onerror in worker"); + return false; +}; diff --git a/cli/worker.rs b/cli/worker.rs index 2b335127f8..7faf17e601 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -12,6 +12,7 @@ use futures::future::FutureExt; use futures::future::TryFutureExt; use futures::sink::SinkExt; use futures::stream::StreamExt; +use futures::task::AtomicWaker; use std::env; use std::future::Future; use std::pin::Pin; @@ -159,6 +160,11 @@ impl Worker { channels: self.external_channels.clone(), } } + + pub fn clear_exception(&mut self) { + let mut isolate = self.isolate.try_lock().unwrap(); + isolate.clear_exception(); + } } impl Future for Worker { @@ -166,8 +172,15 @@ impl Future for Worker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let inner = self.get_mut(); - let mut isolate = inner.isolate.try_lock().unwrap(); - isolate.poll_unpin(cx) + let waker = AtomicWaker::new(); + waker.register(cx.waker()); + match inner.isolate.try_lock() { + Ok(mut isolate) => isolate.poll_unpin(cx), + Err(_) => { + waker.wake(); + Poll::Pending + } + } } } @@ -436,7 +449,7 @@ mod tests { let worker_ = worker.clone(); let worker_future = async move { - let result = worker.await; + let result = worker_.await; println!("workers.rs after resource close"); result.unwrap(); } @@ -446,10 +459,10 @@ mod tests { tokio::spawn(worker_future_); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = block_on(worker_.post_message(msg)); + let r = block_on(worker.post_message(msg)); assert!(r.is_ok()); - block_on(worker_future); + block_on(worker_future) }) } diff --git a/core/isolate.rs b/core/isolate.rs index 9c13f0e4d6..5617caa860 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -368,6 +368,15 @@ impl Isolate { isolate } + pub fn clear_exception(&mut self) { + let isolate = self.v8_isolate.as_ref().unwrap(); + let mut locker = v8::Locker::new(isolate); + let mut hs = v8::HandleScope::new(&mut locker); + let scope = hs.enter(); + self.last_exception_handle.reset(scope); + self.last_exception.take(); + } + pub fn handle_exception<'a>( &mut self, scope: &mut impl v8::ToLocal<'a>,