diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 13823b24f1..34bf74ab1d 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -272,8 +272,8 @@ impl TsCompiler { let worker = TsCompiler::setup_worker(global_state.clone()); let worker_ = worker.clone(); - worker.post_message(req_msg).unwrap(); let first_msg_fut = async move { + worker.post_message(req_msg).await.unwrap(); let result = worker.await; if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. @@ -382,8 +382,8 @@ impl TsCompiler { .add("Compile", &module_url.to_string()); let global_state_ = global_state.clone(); - worker.post_message(req_msg).unwrap(); let first_msg_fut = async move { + worker.post_message(req_msg).await.unwrap(); let result = worker.await; if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index 882e28e43b..2e565dd93b 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -86,13 +86,14 @@ impl WasmCompiler { let worker_ = worker.clone(); let url = source_file.url.clone(); - let _res = worker.post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ); let fut = worker + .post_message( + serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(), + ) + .then(|_| worker) .then(move |result| { if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 42f93ec573..a032905459 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -7,7 +7,6 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; -use crate::tokio_util; use crate::worker::Worker; use deno::*; use futures; @@ -20,6 +19,7 @@ use std::convert::From; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering; +use std::sync::mpsc; use std::task::Context; use std::task::Poll; @@ -153,23 +153,31 @@ fn op_create_worker( js_check(worker.execute(&deno_main_call)); js_check(worker.execute("workerMain()")); - let exec_cb = move |worker: Worker| { - let worker_id = parent_state.add_child_worker(worker); - json!(worker_id) - }; + let worker_id = parent_state.add_child_worker(worker.clone()); + let response = json!(worker_id); // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(exec_cb(worker))); + return Ok(JsonOp::Sync(response)); } - let op = worker + // TODO(bartlomieju): this should spawn mod execution on separate tokio task + // and block on receving message on a channel or even use sync channel /shrug + let (sender, receiver) = mpsc::sync_channel::>(1); + let fut = worker .execute_mod_async(&module_specifier, None, false) - .and_then(move |()| futures::future::ok(exec_cb(worker))); + .then(move |result| { + sender.send(result).expect("Failed to send message"); + futures::future::ok(()) + }) + .boxed() + .compat(); + tokio::spawn(fut); - let result = tokio_util::block_on(op.boxed())?; - Ok(JsonOp::Sync(result)) + let result = receiver.recv().expect("Failed to receive message"); + result?; + Ok(JsonOp::Sync(response)) } struct GetWorkerClosedFuture { @@ -271,9 +279,10 @@ fn op_host_post_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - worker + let fut = worker .post_message(msg) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); + futures::executor::block_on(fut)?; Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/worker.rs b/cli/worker.rs index d5cc801d86..1b931a85de 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -144,12 +144,17 @@ impl Worker { /// Post message to worker as a host. /// /// This method blocks current thread. - pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { + pub fn post_message( + self: &Self, + buf: Buf, + ) -> impl Future> { let channels = self.external_channels.lock().unwrap(); let mut sender = channels.sender.clone(); - futures::executor::block_on(sender.send(buf)) - .map(|_| ()) - .map_err(ErrBox::from) + async move { + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result + } } /// Get message from worker as a host. @@ -199,6 +204,7 @@ mod tests { use crate::startup_data; use crate::state::ThreadSafeState; use crate::tokio_util; + use futures::executor::block_on; use std::sync::atomic::Ordering; #[test] @@ -391,11 +397,10 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); - let maybe_msg = - futures::executor::block_on(worker_.get_message()).unwrap(); + let maybe_msg = block_on(worker_.get_message()).unwrap(); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); @@ -404,7 +409,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); }) } @@ -434,10 +439,10 @@ mod tests { ); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); - futures::executor::block_on(worker_future).unwrap(); + block_on(worker_future).unwrap(); }) } @@ -448,11 +453,8 @@ mod tests { let mut worker = create_test_worker(); let module_specifier = ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap(); - let result = futures::executor::block_on(worker.execute_mod_async( - &module_specifier, - None, - false, - )); + let result = + block_on(worker.execute_mod_async(&module_specifier, None, false)); assert!(result.is_err()); }) } @@ -470,11 +472,8 @@ mod tests { .to_owned(); let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); - let result = futures::executor::block_on(worker.execute_mod_async( - &module_specifier, - None, - false, - )); + let result = + block_on(worker.execute_mod_async(&module_specifier, None, false)); assert!(result.is_ok()); }) }