mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
fix(workers): importScripts
concurrently and use a new reqwest::Client
per importScripts (#23699)
1. We were polling each future in sequence, so this meant it was fetching scripts in sequence. 2. It's not safe to share `reqwest::Client` across tokio runtimes (https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788)
This commit is contained in:
parent
4b0f22eee7
commit
d527b63575
2 changed files with 118 additions and 107 deletions
|
@ -187,27 +187,33 @@ pub fn get_or_create_client_from_state(
|
||||||
Ok(client.clone())
|
Ok(client.clone())
|
||||||
} else {
|
} else {
|
||||||
let options = state.borrow::<Options>();
|
let options = state.borrow::<Options>();
|
||||||
let client = create_http_client(
|
let client = create_client_from_options(options)?;
|
||||||
&options.user_agent,
|
|
||||||
CreateHttpClientOptions {
|
|
||||||
root_cert_store: options.root_cert_store()?,
|
|
||||||
ca_certs: vec![],
|
|
||||||
proxy: options.proxy.clone(),
|
|
||||||
unsafely_ignore_certificate_errors: options
|
|
||||||
.unsafely_ignore_certificate_errors
|
|
||||||
.clone(),
|
|
||||||
client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
|
|
||||||
pool_max_idle_per_host: None,
|
|
||||||
pool_idle_timeout: None,
|
|
||||||
http1: true,
|
|
||||||
http2: true,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
state.put::<reqwest::Client>(client.clone());
|
state.put::<reqwest::Client>(client.clone());
|
||||||
Ok(client)
|
Ok(client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn create_client_from_options(
|
||||||
|
options: &Options,
|
||||||
|
) -> Result<reqwest::Client, AnyError> {
|
||||||
|
create_http_client(
|
||||||
|
&options.user_agent,
|
||||||
|
CreateHttpClientOptions {
|
||||||
|
root_cert_store: options.root_cert_store()?,
|
||||||
|
ca_certs: vec![],
|
||||||
|
proxy: options.proxy.clone(),
|
||||||
|
unsafely_ignore_certificate_errors: options
|
||||||
|
.unsafely_ignore_certificate_errors
|
||||||
|
.clone(),
|
||||||
|
client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
|
||||||
|
pool_max_idle_per_host: None,
|
||||||
|
pool_idle_timeout: None,
|
||||||
|
http1: true,
|
||||||
|
http2: true,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
pub struct ResourceToBodyAdapter(
|
pub struct ResourceToBodyAdapter(
|
||||||
Rc<dyn Resource>,
|
Rc<dyn Resource>,
|
||||||
|
|
|
@ -6,6 +6,7 @@ use crate::web_worker::WebWorkerInternalHandle;
|
||||||
use crate::web_worker::WebWorkerType;
|
use crate::web_worker::WebWorkerType;
|
||||||
use deno_core::error::type_error;
|
use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::futures::StreamExt;
|
||||||
use deno_core::op2;
|
use deno_core::op2;
|
||||||
use deno_core::url::Url;
|
use deno_core::url::Url;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
|
@ -15,7 +16,6 @@ use deno_websocket::DomExceptionNetworkError;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
// TODO(andreubotella) Properly parse the MIME type
|
// TODO(andreubotella) Properly parse the MIME type
|
||||||
fn mime_type_essence(mime_type: &str) -> String {
|
fn mime_type_essence(mime_type: &str) -> String {
|
||||||
|
@ -38,12 +38,15 @@ pub struct SyncFetchScript {
|
||||||
pub fn op_worker_sync_fetch(
|
pub fn op_worker_sync_fetch(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
#[serde] scripts: Vec<String>,
|
#[serde] scripts: Vec<String>,
|
||||||
mut loose_mime_checks: bool,
|
loose_mime_checks: bool,
|
||||||
) -> Result<Vec<SyncFetchScript>, AnyError> {
|
) -> Result<Vec<SyncFetchScript>, AnyError> {
|
||||||
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
|
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
|
||||||
assert_eq!(handle.worker_type, WebWorkerType::Classic);
|
assert_eq!(handle.worker_type, WebWorkerType::Classic);
|
||||||
|
|
||||||
let client = deno_fetch::get_or_create_client_from_state(state)?;
|
// it's not safe to share a client across tokio runtimes, so create a fresh one
|
||||||
|
// https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788
|
||||||
|
let options = state.borrow::<deno_fetch::Options>().clone();
|
||||||
|
let client = deno_fetch::create_client_from_options(&options)?;
|
||||||
|
|
||||||
// TODO(andreubotella) It's not good to throw an exception related to blob
|
// TODO(andreubotella) It's not good to throw an exception related to blob
|
||||||
// URLs when none of the script URLs use the blob scheme.
|
// URLs when none of the script URLs use the blob scheme.
|
||||||
|
@ -62,107 +65,109 @@ pub fn op_worker_sync_fetch(
|
||||||
.enable_time()
|
.enable_time()
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
let handles: Vec<_> = scripts
|
runtime.block_on(async move {
|
||||||
.into_iter()
|
let mut futures = scripts
|
||||||
.map(|script| -> JoinHandle<Result<SyncFetchScript, AnyError>> {
|
.into_iter()
|
||||||
let client = client.clone();
|
.map(|script| {
|
||||||
let blob_store = blob_store.clone();
|
let client = client.clone();
|
||||||
runtime.spawn(async move {
|
let blob_store = blob_store.clone();
|
||||||
let script_url = Url::parse(&script)
|
deno_core::unsync::spawn(async move {
|
||||||
.map_err(|_| type_error("Invalid script URL"))?;
|
let script_url = Url::parse(&script)
|
||||||
|
.map_err(|_| type_error("Invalid script URL"))?;
|
||||||
|
let mut loose_mime_checks = loose_mime_checks;
|
||||||
|
|
||||||
let (body, mime_type, res_url) = match script_url.scheme() {
|
let (body, mime_type, res_url) = match script_url.scheme() {
|
||||||
"http" | "https" => {
|
"http" | "https" => {
|
||||||
let resp =
|
let resp =
|
||||||
client.get(script_url).send().await?.error_for_status()?;
|
client.get(script_url).send().await?.error_for_status()?;
|
||||||
|
|
||||||
let res_url = resp.url().to_string();
|
let res_url = resp.url().to_string();
|
||||||
|
|
||||||
// TODO(andreubotella) Properly run fetch's "extract a MIME type".
|
// TODO(andreubotella) Properly run fetch's "extract a MIME type".
|
||||||
let mime_type = resp
|
let mime_type = resp
|
||||||
.headers()
|
.headers()
|
||||||
.get("Content-Type")
|
.get("Content-Type")
|
||||||
.and_then(|v| v.to_str().ok())
|
.and_then(|v| v.to_str().ok())
|
||||||
.map(mime_type_essence);
|
.map(mime_type_essence);
|
||||||
|
|
||||||
// Always check the MIME type with HTTP(S).
|
// Always check the MIME type with HTTP(S).
|
||||||
loose_mime_checks = false;
|
loose_mime_checks = false;
|
||||||
|
|
||||||
let body = resp.bytes().await?;
|
let body = resp.bytes().await?;
|
||||||
|
|
||||||
(body, mime_type, res_url)
|
(body, mime_type, res_url)
|
||||||
}
|
|
||||||
"data" => {
|
|
||||||
let data_url = DataUrl::process(&script)
|
|
||||||
.map_err(|e| type_error(format!("{e:?}")))?;
|
|
||||||
|
|
||||||
let mime_type = {
|
|
||||||
let mime = data_url.mime_type();
|
|
||||||
format!("{}/{}", mime.type_, mime.subtype)
|
|
||||||
};
|
|
||||||
|
|
||||||
let (body, _) = data_url
|
|
||||||
.decode_to_vec()
|
|
||||||
.map_err(|e| type_error(format!("{e:?}")))?;
|
|
||||||
|
|
||||||
(Bytes::from(body), Some(mime_type), script)
|
|
||||||
}
|
|
||||||
"blob" => {
|
|
||||||
let blob =
|
|
||||||
blob_store.get_object_url(script_url).ok_or_else(|| {
|
|
||||||
type_error("Blob for the given URL not found.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mime_type = mime_type_essence(&blob.media_type);
|
|
||||||
|
|
||||||
let body = blob.read_all().await?;
|
|
||||||
|
|
||||||
(Bytes::from(body), Some(mime_type), script)
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return Err(type_error(format!(
|
|
||||||
"Classic scripts with scheme {}: are not supported in workers.",
|
|
||||||
script_url.scheme()
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !loose_mime_checks {
|
|
||||||
// TODO(andreubotella) Check properly for a Javascript MIME type.
|
|
||||||
match mime_type.as_deref() {
|
|
||||||
Some("application/javascript" | "text/javascript") => {}
|
|
||||||
Some(mime_type) => {
|
|
||||||
return Err(
|
|
||||||
DomExceptionNetworkError {
|
|
||||||
msg: format!("Invalid MIME type {mime_type:?}."),
|
|
||||||
}
|
|
||||||
.into(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
None => {
|
"data" => {
|
||||||
return Err(
|
let data_url = DataUrl::process(&script)
|
||||||
DomExceptionNetworkError::new("Missing MIME type.").into(),
|
.map_err(|e| type_error(format!("{e:?}")))?;
|
||||||
)
|
|
||||||
|
let mime_type = {
|
||||||
|
let mime = data_url.mime_type();
|
||||||
|
format!("{}/{}", mime.type_, mime.subtype)
|
||||||
|
};
|
||||||
|
|
||||||
|
let (body, _) = data_url
|
||||||
|
.decode_to_vec()
|
||||||
|
.map_err(|e| type_error(format!("{e:?}")))?;
|
||||||
|
|
||||||
|
(Bytes::from(body), Some(mime_type), script)
|
||||||
|
}
|
||||||
|
"blob" => {
|
||||||
|
let blob =
|
||||||
|
blob_store.get_object_url(script_url).ok_or_else(|| {
|
||||||
|
type_error("Blob for the given URL not found.")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mime_type = mime_type_essence(&blob.media_type);
|
||||||
|
|
||||||
|
let body = blob.read_all().await?;
|
||||||
|
|
||||||
|
(Bytes::from(body), Some(mime_type), script)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(type_error(format!(
|
||||||
|
"Classic scripts with scheme {}: are not supported in workers.",
|
||||||
|
script_url.scheme()
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !loose_mime_checks {
|
||||||
|
// TODO(andreubotella) Check properly for a Javascript MIME type.
|
||||||
|
match mime_type.as_deref() {
|
||||||
|
Some("application/javascript" | "text/javascript") => {}
|
||||||
|
Some(mime_type) => {
|
||||||
|
return Err(
|
||||||
|
DomExceptionNetworkError {
|
||||||
|
msg: format!("Invalid MIME type {mime_type:?}."),
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(
|
||||||
|
DomExceptionNetworkError::new("Missing MIME type.").into(),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);
|
let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);
|
||||||
|
|
||||||
Ok(SyncFetchScript {
|
Ok(SyncFetchScript {
|
||||||
url: res_url,
|
url: res_url,
|
||||||
script: text.into_owned(),
|
script: text.into_owned(),
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
.collect::<deno_core::futures::stream::FuturesUnordered<_>>();
|
||||||
.collect();
|
let mut ret = Vec::with_capacity(futures.len());
|
||||||
|
while let Some(result) = futures.next().await {
|
||||||
let mut ret = Vec::with_capacity(handles.len());
|
let script = result??;
|
||||||
for handle in handles {
|
ret.push(script);
|
||||||
let script = runtime.block_on(handle)??;
|
}
|
||||||
ret.push(script);
|
Ok(ret)
|
||||||
}
|
})
|
||||||
Ok(ret)
|
|
||||||
});
|
});
|
||||||
|
|
||||||
thread.join().unwrap()
|
thread.join().unwrap()
|
||||||
|
|
Loading…
Add table
Reference in a new issue