0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-20 20:33:42 -05:00

fix(lint): out of order diagnostics for plugins (#28029)

This commit fixes racy condition in lint plugins
that could have caused diagnostics for another
file to be printed for completely unrelated file.

With this change, a oneshot channel is used
to receive diagnostics for a file, which ensures
that the caller will receive diagnostics for itself.
This commit is contained in:
Bartek Iwańczuk 2025-02-10 14:22:57 +01:00 committed by GitHub
parent 7d139ddd60
commit 94a28f783d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -25,9 +25,8 @@ use deno_runtime::deno_permissions::PermissionsContainer;
use deno_runtime::tokio_util; use deno_runtime::tokio_util;
use deno_runtime::worker::MainWorker; use deno_runtime::worker::MainWorker;
use deno_runtime::WorkerExecutionMode; use deno_runtime::WorkerExecutionMode;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver; use tokio::sync::oneshot;
use tokio::sync::mpsc::Sender;
use crate::args::DenoSubcommand; use crate::args::DenoSubcommand;
use crate::args::Flags; use crate::args::Flags;
@ -42,6 +41,7 @@ pub enum PluginHostRequest {
LoadPlugins { LoadPlugins {
specifiers: Vec<ModuleSpecifier>, specifiers: Vec<ModuleSpecifier>,
exclude_rules: Option<Vec<String>>, exclude_rules: Option<Vec<String>>,
tx: oneshot::Sender<PluginHostResponse>,
}, },
Run { Run {
serialized_ast: Vec<u8>, serialized_ast: Vec<u8>,
@ -49,6 +49,7 @@ pub enum PluginHostRequest {
source_text_info: SourceTextInfo, source_text_info: SourceTextInfo,
utf16_map: Utf16Map, utf16_map: Utf16Map,
maybe_token: Option<CancellationToken>, maybe_token: Option<CancellationToken>,
tx: oneshot::Sender<PluginHostResponse>,
}, },
} }
@ -102,8 +103,7 @@ v8_static_strings! {
#[derive(Debug)] #[derive(Debug)]
pub struct PluginHostProxy { pub struct PluginHostProxy {
tx: Sender<PluginHostRequest>, tx: mpsc::Sender<PluginHostRequest>,
rx: Arc<tokio::sync::Mutex<Receiver<PluginHostResponse>>>,
pub(crate) plugin_info: Arc<Mutex<Vec<PluginInfo>>>, pub(crate) plugin_info: Arc<Mutex<Vec<PluginInfo>>>,
#[allow(unused)] #[allow(unused)]
join_handle: std::thread::JoinHandle<Result<(), AnyError>>, join_handle: std::thread::JoinHandle<Result<(), AnyError>>,
@ -127,14 +127,12 @@ pub struct PluginHost {
worker: MainWorker, worker: MainWorker,
install_plugins_fn: Rc<v8::Global<v8::Function>>, install_plugins_fn: Rc<v8::Global<v8::Function>>,
run_plugins_for_file_fn: Rc<v8::Global<v8::Function>>, run_plugins_for_file_fn: Rc<v8::Global<v8::Function>>,
tx: Sender<PluginHostResponse>, rx: mpsc::Receiver<PluginHostRequest>,
rx: Receiver<PluginHostRequest>,
} }
async fn create_plugin_runner_inner( async fn create_plugin_runner_inner(
logger: PluginLogger, logger: PluginLogger,
rx_req: Receiver<PluginHostRequest>, rx_req: mpsc::Receiver<PluginHostRequest>,
tx_res: Sender<PluginHostResponse>,
) -> Result<PluginHost, AnyError> { ) -> Result<PluginHost, AnyError> {
let flags = Flags { let flags = Flags {
subcommand: DenoSubcommand::Lint(LintFlags::default()), subcommand: DenoSubcommand::Lint(LintFlags::default()),
@ -202,7 +200,6 @@ async fn create_plugin_runner_inner(
worker, worker,
install_plugins_fn, install_plugins_fn,
run_plugins_for_file_fn, run_plugins_for_file_fn,
tx: tx_res,
rx: rx_req, rx: rx_req,
}) })
} }
@ -228,8 +225,7 @@ impl PluginInfo {
impl PluginHost { impl PluginHost {
fn create(logger: PluginLogger) -> Result<PluginHostProxy, AnyError> { fn create(logger: PluginLogger) -> Result<PluginHostProxy, AnyError> {
let (tx_req, rx_req) = channel(10); let (tx_req, rx_req) = mpsc::channel(10);
let (tx_res, rx_res) = channel(10);
let logger_ = logger.clone(); let logger_ = logger.clone();
let join_handle = std::thread::spawn(move || { let join_handle = std::thread::spawn(move || {
@ -237,8 +233,7 @@ impl PluginHost {
log::debug!("Lint PluginHost thread spawned"); log::debug!("Lint PluginHost thread spawned");
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let fut = async move { let fut = async move {
let runner = let runner = create_plugin_runner_inner(logger.clone(), rx_req).await?;
create_plugin_runner_inner(logger.clone(), rx_req, tx_res).await?;
log::debug!("Lint PlugibnHost running loop"); log::debug!("Lint PlugibnHost running loop");
runner.run_loop().await?; runner.run_loop().await?;
log::debug!( log::debug!(
@ -253,7 +248,6 @@ impl PluginHost {
let proxy = PluginHostProxy { let proxy = PluginHostProxy {
tx: tx_req, tx: tx_req,
rx: Arc::new(tokio::sync::Mutex::new(rx_res)),
plugin_info: Arc::new(Mutex::new(vec![])), plugin_info: Arc::new(Mutex::new(vec![])),
join_handle, join_handle,
}; };
@ -269,9 +263,10 @@ impl PluginHost {
PluginHostRequest::LoadPlugins { PluginHostRequest::LoadPlugins {
specifiers, specifiers,
exclude_rules, exclude_rules,
tx,
} => { } => {
let r = self.load_plugins(specifiers, exclude_rules).await; let r = self.load_plugins(specifiers, exclude_rules).await;
let _ = self.tx.send(PluginHostResponse::LoadPlugin(r)).await; let _ = tx.send(PluginHostResponse::LoadPlugin(r));
} }
PluginHostRequest::Run { PluginHostRequest::Run {
serialized_ast, serialized_ast,
@ -279,6 +274,7 @@ impl PluginHost {
source_text_info, source_text_info,
utf16_map, utf16_map,
maybe_token, maybe_token,
tx,
} => { } => {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let r = match self.run_plugins( let r = match self.run_plugins(
@ -295,7 +291,7 @@ impl PluginHost {
"Running plugins lint rules took {:?}", "Running plugins lint rules took {:?}",
std::time::Instant::now() - start std::time::Instant::now() - start
); );
let _ = self.tx.send(PluginHostResponse::Run(r)).await; let _ = tx.send(PluginHostResponse::Run(r));
} }
} }
} }
@ -452,16 +448,17 @@ impl PluginHostProxy {
specifiers: Vec<ModuleSpecifier>, specifiers: Vec<ModuleSpecifier>,
exclude_rules: Option<Vec<String>>, exclude_rules: Option<Vec<String>>,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let (tx, rx) = oneshot::channel();
self self
.tx .tx
.send(PluginHostRequest::LoadPlugins { .send(PluginHostRequest::LoadPlugins {
specifiers, specifiers,
exclude_rules, exclude_rules,
tx,
}) })
.await?; .await?;
let mut rx = self.rx.lock().await;
if let Some(val) = rx.recv().await { if let Ok(val) = rx.await {
let PluginHostResponse::LoadPlugin(result) = val else { let PluginHostResponse::LoadPlugin(result) = val else {
unreachable!() unreachable!()
}; };
@ -480,6 +477,7 @@ impl PluginHostProxy {
utf16_map: Utf16Map, utf16_map: Utf16Map,
maybe_token: Option<CancellationToken>, maybe_token: Option<CancellationToken>,
) -> Result<Vec<LintDiagnostic>, AnyError> { ) -> Result<Vec<LintDiagnostic>, AnyError> {
let (tx, rx) = oneshot::channel();
self self
.tx .tx
.send(PluginHostRequest::Run { .send(PluginHostRequest::Run {
@ -488,11 +486,11 @@ impl PluginHostProxy {
source_text_info, source_text_info,
utf16_map, utf16_map,
maybe_token, maybe_token,
tx,
}) })
.await?; .await?;
let mut rx = self.rx.lock().await;
if let Some(PluginHostResponse::Run(diagnostics_result)) = rx.recv().await { if let Ok(PluginHostResponse::Run(diagnostics_result)) = rx.await {
return diagnostics_result; return diagnostics_result;
} }
bail!("Plugin host has closed") bail!("Plugin host has closed")