0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-01 20:25:12 -05:00

remove tokio_util::block_on from ops/workers.rs (#3381)

This commit is contained in:
Bartek Iwańczuk 2019-11-20 01:17:05 +01:00 committed by Ry Dahl
parent 6708fcc386
commit 1912ed6740
4 changed files with 49 additions and 40 deletions

View file

@ -272,8 +272,8 @@ impl TsCompiler {
let worker = TsCompiler::setup_worker(global_state.clone()); let worker = TsCompiler::setup_worker(global_state.clone());
let worker_ = worker.clone(); let worker_ = worker.clone();
worker.post_message(req_msg).unwrap();
let first_msg_fut = async move { let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await; let result = worker.await;
if let Err(err) = result { if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting. // TODO(ry) Need to forward the error instead of exiting.
@ -382,8 +382,8 @@ impl TsCompiler {
.add("Compile", &module_url.to_string()); .add("Compile", &module_url.to_string());
let global_state_ = global_state.clone(); let global_state_ = global_state.clone();
worker.post_message(req_msg).unwrap();
let first_msg_fut = async move { let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await; let result = worker.await;
if let Err(err) = result { if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting. // TODO(ry) Need to forward the error instead of exiting.

View file

@ -86,13 +86,14 @@ impl WasmCompiler {
let worker_ = worker.clone(); let worker_ = worker.clone();
let url = source_file.url.clone(); let url = source_file.url.clone();
let _res = worker.post_message( let fut = worker
.post_message(
serde_json::to_string(&base64_data) serde_json::to_string(&base64_data)
.unwrap() .unwrap()
.into_boxed_str() .into_boxed_str()
.into_boxed_bytes(), .into_boxed_bytes(),
); )
let fut = worker .then(|_| worker)
.then(move |result| { .then(move |result| {
if let Err(err) = result { if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting. // TODO(ry) Need to forward the error instead of exiting.

View file

@ -7,7 +7,6 @@ use crate::deno_error::ErrorKind;
use crate::ops::json_op; use crate::ops::json_op;
use crate::startup_data; use crate::startup_data;
use crate::state::ThreadSafeState; use crate::state::ThreadSafeState;
use crate::tokio_util;
use crate::worker::Worker; use crate::worker::Worker;
use deno::*; use deno::*;
use futures; use futures;
@ -20,6 +19,7 @@ use std::convert::From;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -153,23 +153,31 @@ fn op_create_worker(
js_check(worker.execute(&deno_main_call)); js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()")); js_check(worker.execute("workerMain()"));
let exec_cb = move |worker: Worker| { let worker_id = parent_state.add_child_worker(worker.clone());
let worker_id = parent_state.add_child_worker(worker); let response = json!(worker_id);
json!(worker_id)
};
// Has provided source code, execute immediately. // Has provided source code, execute immediately.
if has_source_code { if has_source_code {
js_check(worker.execute(&source_code)); js_check(worker.execute(&source_code));
return Ok(JsonOp::Sync(exec_cb(worker))); return Ok(JsonOp::Sync(response));
} }
let op = worker // TODO(bartlomieju): this should spawn mod execution on separate tokio task
// and block on receving message on a channel or even use sync channel /shrug
let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
let fut = worker
.execute_mod_async(&module_specifier, None, false) .execute_mod_async(&module_specifier, None, false)
.and_then(move |()| futures::future::ok(exec_cb(worker))); .then(move |result| {
sender.send(result).expect("Failed to send message");
futures::future::ok(())
})
.boxed()
.compat();
tokio::spawn(fut);
let result = tokio_util::block_on(op.boxed())?; let result = receiver.recv().expect("Failed to receive message");
Ok(JsonOp::Sync(result)) result?;
Ok(JsonOp::Sync(response))
} }
struct GetWorkerClosedFuture { struct GetWorkerClosedFuture {
@ -271,9 +279,10 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap(); let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore // TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?; let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
worker let fut = worker
.post_message(msg) .post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }

View file

@ -144,12 +144,17 @@ impl Worker {
/// Post message to worker as a host. /// Post message to worker as a host.
/// ///
/// This method blocks current thread. /// This method blocks current thread.
pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { pub fn post_message(
self: &Self,
buf: Buf,
) -> impl Future<Output = Result<(), ErrBox>> {
let channels = self.external_channels.lock().unwrap(); let channels = self.external_channels.lock().unwrap();
let mut sender = channels.sender.clone(); let mut sender = channels.sender.clone();
futures::executor::block_on(sender.send(buf)) async move {
.map(|_| ()) let result = sender.send(buf).map_err(ErrBox::from).await;
.map_err(ErrBox::from) drop(sender);
result
}
} }
/// Get message from worker as a host. /// Get message from worker as a host.
@ -199,6 +204,7 @@ mod tests {
use crate::startup_data; use crate::startup_data;
use crate::state::ThreadSafeState; use crate::state::ThreadSafeState;
use crate::tokio_util; use crate::tokio_util;
use futures::executor::block_on;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
#[test] #[test]
@ -391,11 +397,10 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = worker_.post_message(msg); let r = block_on(worker_.post_message(msg));
assert!(r.is_ok()); assert!(r.is_ok());
let maybe_msg = let maybe_msg = block_on(worker_.get_message()).unwrap();
futures::executor::block_on(worker_.get_message()).unwrap();
assert!(maybe_msg.is_some()); assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json // Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
@ -404,7 +409,7 @@ mod tests {
.to_string() .to_string()
.into_boxed_str() .into_boxed_str()
.into_boxed_bytes(); .into_boxed_bytes();
let r = worker_.post_message(msg); let r = block_on(worker_.post_message(msg));
assert!(r.is_ok()); assert!(r.is_ok());
}) })
} }
@ -434,10 +439,10 @@ mod tests {
); );
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = worker_.post_message(msg); let r = block_on(worker_.post_message(msg));
assert!(r.is_ok()); assert!(r.is_ok());
futures::executor::block_on(worker_future).unwrap(); block_on(worker_future).unwrap();
}) })
} }
@ -448,11 +453,8 @@ mod tests {
let mut worker = create_test_worker(); let mut worker = create_test_worker();
let module_specifier = let module_specifier =
ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap(); ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
let result = futures::executor::block_on(worker.execute_mod_async( let result =
&module_specifier, block_on(worker.execute_mod_async(&module_specifier, None, false));
None,
false,
));
assert!(result.is_err()); assert!(result.is_err());
}) })
} }
@ -470,11 +472,8 @@ mod tests {
.to_owned(); .to_owned();
let module_specifier = let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let result = futures::executor::block_on(worker.execute_mod_async( let result =
&module_specifier, block_on(worker.execute_mod_async(&module_specifier, None, false));
None,
false,
));
assert!(result.is_ok()); assert!(result.is_ok());
}) })
} }