From 88b5fd90880b78498d0bbbdec6342b3db5887ef1 Mon Sep 17 00:00:00 2001
From: David Sherret <dsherret@users.noreply.github.com>
Date: Wed, 8 Mar 2023 10:13:13 -0500
Subject: [PATCH] fix: attempt to only allow one deno process to update the
 node_modules folder at a time (#18058)

This is implemented in such a way that it should still allow processes
to go through when a file lock wasn't properly cleaned up and the OS
hasn't released it yet (but with a 200ms-ish delay).

Closes #18039
---
 Cargo.lock                        |   1 +
 Cargo.toml                        |   1 +
 cli/Cargo.toml                    |   1 +
 cli/npm/resolvers/local.rs        |  10 ++
 cli/util/fs.rs                    | 246 ++++++++++++++++++++++++++++++
 cli/util/progress_bar/mod.rs      |  36 ++++-
 cli/util/progress_bar/renderer.rs |  11 +-
 runtime/Cargo.toml                |   2 +-
 8 files changed, 301 insertions(+), 7 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index adf3706b70..2d16aa88ca 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -850,6 +850,7 @@ dependencies = [
  "fancy-regex",
  "flaky_test",
  "flate2",
+ "fs3",
  "fwdansi",
  "glibc_version",
  "http",
diff --git a/Cargo.toml b/Cargo.toml
index 1fbdd5c657..48fe593e7e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -88,6 +88,7 @@ data-url = "=0.2.0"
 dlopen = "0.1.8"
 encoding_rs = "=0.8.31"
 flate2 = "=1.0.24"
+fs3 = "0.5.0"
 futures = "0.3.21"
 http = "=0.2.8"
 hyper = "0.14.18"
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 0ad4896f32..692ac52056 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -73,6 +73,7 @@ env_logger = "=0.9.0"
 eszip = "=0.37.0"
 fancy-regex = "=0.10.0"
 flate2.workspace = true
+fs3.workspace = true
 http.workspace = true
 import_map = "=0.15.0"
 indexmap.workspace = true
diff --git a/cli/npm/resolvers/local.rs b/cli/npm/resolvers/local.rs
index bf5b8529c4..52a783823f 100644
--- a/cli/npm/resolvers/local.rs
+++ b/cli/npm/resolvers/local.rs
@@ -10,6 +10,7 @@ use std::path::Path;
 use std::path::PathBuf;
 
 use crate::util::fs::symlink_dir;
+use crate::util::fs::LaxSingleProcessFsFlag;
 use async_trait::async_trait;
 use deno_ast::ModuleSpecifier;
 use deno_core::anyhow::bail;
@@ -236,6 +237,13 @@ async fn sync_resolution_with_fs(
     format!("Creating '{}'", deno_local_registry_dir.display())
   })?;
 
+  let single_process_lock = LaxSingleProcessFsFlag::lock(
+    deno_local_registry_dir.join(".deno.lock"),
+    // similar message used by cargo build
+    "waiting for file lock on node_modules directory",
+  )
+  .await;
+
   // 1. Write all the packages out the .deno directory.
   //
   // Copy (hardlink in future) <global_registry_cache>/<package_id>/ to
@@ -394,6 +402,8 @@ async fn sync_resolution_with_fs(
     }
   }
 
+  drop(single_process_lock);
+
   Ok(())
 }
 
diff --git a/cli/util/fs.rs b/cli/util/fs.rs
index 777b22c5fe..4ac57eac03 100644
--- a/cli/util/fs.rs
+++ b/cli/util/fs.rs
@@ -14,10 +14,14 @@ use std::io::ErrorKind;
 use std::io::Write;
 use std::path::Path;
 use std::path::PathBuf;
+use std::sync::Arc;
 use std::time::Duration;
 use walkdir::WalkDir;
 
 use crate::args::FilesConfig;
+use crate::util::progress_bar::ProgressBar;
+use crate::util::progress_bar::ProgressBarStyle;
+use crate::util::progress_bar::ProgressMessagePrompt;
 
 use super::path::specifier_to_file_path;
 
@@ -471,11 +475,167 @@ pub fn dir_size(path: &Path) -> std::io::Result<u64> {
   Ok(total)
 }
 
+struct LaxSingleProcessFsFlagInner {
+  file_path: PathBuf,
+  fs_file: std::fs::File,
+  finished_token: Arc<tokio_util::sync::CancellationToken>,
+}
+
+impl Drop for LaxSingleProcessFsFlagInner {
+  fn drop(&mut self) {
+    use fs3::FileExt;
+    // kill the poll thread
+    self.finished_token.cancel();
+    // release the file lock
+    if let Err(err) = self.fs_file.unlock() {
+      log::debug!(
+        "Failed releasing lock for {}. {:#}",
+        self.file_path.display(),
+        err
+      );
+    }
+  }
+}
+
+/// A file system based flag that will attempt to synchronize multiple
+/// processes so they go one after the other. In scenarios where
+/// synchronization cannot be achieved, it will allow the current process
+/// to proceed.
+///
+/// This should only be used in places where it's ideal for multiple
+/// processes to not update something on the file system at the same time,
+/// but it's not that big of a deal.
+pub struct LaxSingleProcessFsFlag(Option<LaxSingleProcessFsFlagInner>);
+
+impl LaxSingleProcessFsFlag {
+  pub async fn lock(file_path: PathBuf, long_wait_message: &str) -> Self {
+    log::debug!("Acquiring file lock at {}", file_path.display());
+    use fs3::FileExt;
+    let last_updated_path = file_path.with_extension("lock.poll");
+    let start_instant = std::time::Instant::now();
+    let open_result = std::fs::OpenOptions::new()
+      .read(true)
+      .write(true)
+      .create(true)
+      .open(&file_path);
+
+    match open_result {
+      Ok(fs_file) => {
+        let mut pb_update_guard = None;
+        let mut error_count = 0;
+        while error_count < 10 {
+          let lock_result = fs_file.try_lock_exclusive();
+          let poll_file_update_ms = 100;
+          match lock_result {
+            Ok(_) => {
+              log::debug!("Acquired file lock at {}", file_path.display());
+              let _ignore = std::fs::write(&last_updated_path, "");
+              let token = Arc::new(tokio_util::sync::CancellationToken::new());
+
+              // Spawn a blocking task that will continually update a file
+              // signalling the lock is alive. This is a fail safe for when
+              // a file lock is never released. For example, on some operating
+              // systems, if a process does not release the lock (say it's
+              // killed), then the OS may release it at an indeterminate time
+              //
+              // This uses a blocking task because we use a single threaded
+              // runtime and this is time sensitive so we don't want it to update
+              // at the whims of of whatever is occurring on the runtime thread.
+              tokio::task::spawn_blocking({
+                let token = token.clone();
+                let last_updated_path = last_updated_path.clone();
+                move || {
+                  let mut i = 0;
+                  while !token.is_cancelled() {
+                    i += 1;
+                    let _ignore =
+                      std::fs::write(&last_updated_path, i.to_string());
+                    std::thread::sleep(Duration::from_millis(
+                      poll_file_update_ms,
+                    ));
+                  }
+                }
+              });
+
+              return Self(Some(LaxSingleProcessFsFlagInner {
+                file_path,
+                fs_file,
+                finished_token: token,
+              }));
+            }
+            Err(_) => {
+              // show a message if it's been a while
+              if pb_update_guard.is_none()
+                && start_instant.elapsed().as_millis() > 1_000
+              {
+                let pb = ProgressBar::new(ProgressBarStyle::TextOnly);
+                let guard = pb.update_with_prompt(
+                  ProgressMessagePrompt::Blocking,
+                  long_wait_message,
+                );
+                pb_update_guard = Some((guard, pb));
+              }
+
+              // sleep for a little bit
+              tokio::time::sleep(Duration::from_millis(20)).await;
+
+              // Poll the last updated path to check if it's stopped updating,
+              // which is an indication that the file lock is claimed, but
+              // was never properly released.
+              match std::fs::metadata(&last_updated_path)
+                .and_then(|p| p.modified())
+              {
+                Ok(last_updated_time) => {
+                  let current_time = std::time::SystemTime::now();
+                  match current_time.duration_since(last_updated_time) {
+                    Ok(duration) => {
+                      if duration.as_millis()
+                        > (poll_file_update_ms * 2) as u128
+                      {
+                        // the other process hasn't updated this file in a long time
+                        // so maybe it was killed and the operating system hasn't
+                        // released the file lock yet
+                        return Self(None);
+                      } else {
+                        error_count = 0; // reset
+                      }
+                    }
+                    Err(_) => {
+                      error_count += 1;
+                    }
+                  }
+                }
+                Err(_) => {
+                  error_count += 1;
+                }
+              }
+            }
+          }
+        }
+
+        drop(pb_update_guard); // explicit for clarity
+        Self(None)
+      }
+      Err(err) => {
+        log::debug!(
+          "Failed to open file lock at {}. {:#}",
+          file_path.display(),
+          err
+        );
+        Self(None) // let the process through
+      }
+    }
+  }
+}
+
 #[cfg(test)]
 mod tests {
   use super::*;
+  use deno_core::futures;
+  use deno_core::parking_lot::Mutex;
   use pretty_assertions::assert_eq;
   use test_util::TempDir;
+  use tokio::sync::Notify;
 
   #[test]
   fn resolve_from_cwd_child() {
@@ -793,4 +953,90 @@ mod tests {
       );
     }
   }
+
+  #[tokio::test]
+  async fn lax_fs_lock() {
+    let temp_dir = TempDir::new();
+    let lock_path = temp_dir.path().join("file.lock");
+    let signal1 = Arc::new(Notify::new());
+    let signal2 = Arc::new(Notify::new());
+    let signal3 = Arc::new(Notify::new());
+    let signal4 = Arc::new(Notify::new());
+    tokio::spawn({
+      let lock_path = lock_path.clone();
+      let signal1 = signal1.clone();
+      let signal2 = signal2.clone();
+      let signal3 = signal3.clone();
+      let signal4 = signal4.clone();
+      let temp_dir = temp_dir.clone();
+      async move {
+        let flag =
+          LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
+        signal1.notify_one();
+        signal2.notified().await;
+        tokio::time::sleep(Duration::from_millis(10)).await; // give the other thread time to acquire the lock
+        temp_dir.write("file.txt", "update1");
+        signal3.notify_one();
+        signal4.notified().await;
+        drop(flag);
+      }
+    });
+    let signal5 = Arc::new(Notify::new());
+    tokio::spawn({
+      let temp_dir = temp_dir.clone();
+      let signal5 = signal5.clone();
+      async move {
+        signal1.notified().await;
+        signal2.notify_one();
+        let flag = LaxSingleProcessFsFlag::lock(lock_path, "waiting").await;
+        temp_dir.write("file.txt", "update2");
+        signal5.notify_one();
+        drop(flag);
+      }
+    });
+
+    signal3.notified().await;
+    assert_eq!(temp_dir.read_to_string("file.txt"), "update1");
+    signal4.notify_one();
+    signal5.notified().await;
+    assert_eq!(temp_dir.read_to_string("file.txt"), "update2");
+  }
+
+  #[tokio::test]
+  async fn lax_fs_lock_ordered() {
+    let temp_dir = TempDir::new();
+    let lock_path = temp_dir.path().join("file.lock");
+    let output_path = temp_dir.path().join("output");
+    let expected_order = Arc::new(Mutex::new(Vec::new()));
+    let count = 10;
+    let mut tasks = Vec::with_capacity(count);
+
+    std::fs::write(&output_path, "").unwrap();
+
+    for i in 0..count {
+      let lock_path = lock_path.clone();
+      let output_path = output_path.clone();
+      let expected_order = expected_order.clone();
+      tasks.push(tokio::spawn(async move {
+        let flag =
+          LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
+        expected_order.lock().push(i.to_string());
+        // be extremely racy
+        let mut output = std::fs::read_to_string(&output_path).unwrap();
+        if !output.is_empty() {
+          output.push('\n');
+        }
+        output.push_str(&i.to_string());
+        std::fs::write(&output_path, output).unwrap();
+        drop(flag);
+      }));
+    }
+
+    futures::future::join_all(tasks).await;
+    let expected_output = expected_order.lock().join("\n");
+    assert_eq!(
+      std::fs::read_to_string(output_path).unwrap(),
+      expected_output
+    );
+  }
 }
diff --git a/cli/util/progress_bar/mod.rs b/cli/util/progress_bar/mod.rs
index 004b48b2f8..2568710795 100644
--- a/cli/util/progress_bar/mod.rs
+++ b/cli/util/progress_bar/mod.rs
@@ -23,6 +23,21 @@ mod renderer;
 // Inspired by Indicatif, but this custom implementation allows
 // for more control over what's going on under the hood.
 
+#[derive(Debug, Clone, Copy)]
+pub enum ProgressMessagePrompt {
+  Download,
+  Blocking,
+}
+
+impl ProgressMessagePrompt {
+  pub fn as_text(&self) -> String {
+    match self {
+      ProgressMessagePrompt::Download => colors::green("Download").to_string(),
+      ProgressMessagePrompt::Blocking => colors::cyan("Blocking").to_string(),
+    }
+  }
+}
+
 #[derive(Debug)]
 pub struct UpdateGuard {
   maybe_entry: Option<ProgressBarEntry>,
@@ -59,6 +74,7 @@ pub enum ProgressBarStyle {
 #[derive(Clone, Debug)]
 struct ProgressBarEntry {
   id: usize,
+  prompt: ProgressMessagePrompt,
   pub message: String,
   pos: Arc<AtomicU64>,
   total_size: Arc<AtomicU64>,
@@ -128,11 +144,16 @@ impl ProgressBarInner {
     }
   }
 
-  pub fn add_entry(&self, message: String) -> ProgressBarEntry {
+  pub fn add_entry(
+    &self,
+    kind: ProgressMessagePrompt,
+    message: String,
+  ) -> ProgressBarEntry {
     let mut internal_state = self.state.lock();
     let id = internal_state.total_entries;
     let entry = ProgressBarEntry {
       id,
+      prompt: kind,
       message,
       pos: Default::default(),
       total_size: Default::default(),
@@ -208,6 +229,7 @@ impl DrawThreadRenderer for ProgressBarInner {
         pending_entries: state.entries.len(),
         total_entries: state.total_entries,
         display_entry: ProgressDataDisplayEntry {
+          prompt: preferred_entry.prompt,
           message: preferred_entry.message.clone(),
           position: preferred_entry.position(),
           total_size: preferred_entry.total_size(),
@@ -255,9 +277,17 @@ impl ProgressBar {
   }
 
   pub fn update(&self, msg: &str) -> UpdateGuard {
+    self.update_with_prompt(ProgressMessagePrompt::Download, msg)
+  }
+
+  pub fn update_with_prompt(
+    &self,
+    kind: ProgressMessagePrompt,
+    msg: &str,
+  ) -> UpdateGuard {
     match &self.inner {
       Some(inner) => {
-        let entry = inner.add_entry(msg.to_string());
+        let entry = inner.add_entry(kind, msg.to_string());
         UpdateGuard {
           maybe_entry: Some(entry),
         }
@@ -265,7 +295,7 @@ impl ProgressBar {
       None => {
         // if we're not running in TTY, fallback to using logger crate
         if !msg.is_empty() {
-          log::log!(log::Level::Info, "{} {}", colors::green("Download"), msg);
+          log::log!(log::Level::Info, "{} {}", kind.as_text(), msg);
         }
         UpdateGuard { maybe_entry: None }
       }
diff --git a/cli/util/progress_bar/renderer.rs b/cli/util/progress_bar/renderer.rs
index 0ea275e773..5635ad3165 100644
--- a/cli/util/progress_bar/renderer.rs
+++ b/cli/util/progress_bar/renderer.rs
@@ -6,8 +6,11 @@ use deno_runtime::colors;
 
 use crate::util::display::human_download_size;
 
+use super::ProgressMessagePrompt;
+
 #[derive(Clone)]
 pub struct ProgressDataDisplayEntry {
+  pub prompt: ProgressMessagePrompt,
   pub message: String,
   pub position: u64,
   pub total_size: u64,
@@ -142,7 +145,7 @@ impl ProgressBarRenderer for TextOnlyProgressBarRenderer {
 
     format!(
       "{} {}{}{}",
-      colors::green("Download"),
+      data.display_entry.prompt.as_text(),
       data.display_entry.message,
       colors::gray(bytes_text),
       colors::gray(total_text),
@@ -195,6 +198,7 @@ mod test {
     let renderer = BarProgressBarRenderer;
     let mut data = ProgressData {
       display_entry: ProgressDataDisplayEntry {
+        prompt: ProgressMessagePrompt::Download,
         message: "data".to_string(),
         position: 0,
         total_size: 10 * BYTES_TO_KIB,
@@ -251,6 +255,7 @@ mod test {
     let renderer = TextOnlyProgressBarRenderer;
     let mut data = ProgressData {
       display_entry: ProgressDataDisplayEntry {
+        prompt: ProgressMessagePrompt::Blocking,
         message: "data".to_string(),
         position: 0,
         total_size: 10 * BYTES_TO_KIB,
@@ -263,7 +268,7 @@ mod test {
     };
     let text = renderer.render(data.clone());
     let text = test_util::strip_ansi_codes(&text);
-    assert_eq!(text, "Download data 0.00KiB/10.00KiB (2/3)");
+    assert_eq!(text, "Blocking data 0.00KiB/10.00KiB (2/3)");
 
     data.pending_entries = 0;
     data.total_entries = 1;
@@ -271,6 +276,6 @@ mod test {
     data.display_entry.total_size = 0;
     let text = renderer.render(data);
     let text = test_util::strip_ansi_codes(&text);
-    assert_eq!(text, "Download data");
+    assert_eq!(text, "Blocking data");
   }
 }
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 1121562030..0ba8f8d3ad 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -89,7 +89,7 @@ atty.workspace = true
 dlopen.workspace = true
 encoding_rs.workspace = true
 filetime = "0.2.16"
-fs3 = "0.5.0"
+fs3.workspace = true
 http.workspace = true
 hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
 libc.workspace = true