diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs index 1f0107f49f..1730e6472c 100644 --- a/cli/file_watcher.rs +++ b/cli/file_watcher.rs @@ -1,8 +1,9 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::colors; +use core::task::{Context, Poll}; use deno_core::error::AnyError; -use deno_core::futures::stream::StreamExt; +use deno_core::futures::stream::{Stream, StreamExt}; use deno_core::futures::Future; use notify::event::Event as NotifyEvent; use notify::event::EventKind; @@ -13,12 +14,51 @@ use notify::RecursiveMode; use notify::Watcher; use std::path::PathBuf; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; use tokio::select; -use tokio::sync::mpsc; +use tokio::time::{interval, Interval}; + +const DEBOUNCE_INTERVAL_MS: Duration = Duration::from_millis(200); // TODO(bartlomieju): rename type WatchFuture = Pin>>>; +struct Debounce { + interval: Interval, + event_detected: Arc, +} + +impl Debounce { + fn new() -> Self { + Self { + interval: interval(DEBOUNCE_INTERVAL_MS), + event_detected: Arc::new(AtomicBool::new(false)), + } + } +} + +impl Stream for Debounce { + type Item = (); + + /// Note that this never returns `Poll::Ready(None)`, which means that file watcher will be alive + /// until the Deno process is terminated. + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let inner = self.get_mut(); + if inner.event_detected.load(Ordering::Relaxed) { + inner.event_detected.store(false, Ordering::Relaxed); + Poll::Ready(Some(())) + } else { + let _ = inner.interval.poll_tick(cx); + Poll::Pending + } + } +} + async fn error_handler(watch_future: WatchFuture) { let result = watch_future.await; if let Err(err) = result { @@ -28,52 +68,59 @@ async fn error_handler(watch_future: WatchFuture) { } pub async fn watch_func( - watch_paths: &[PathBuf], + paths: &[PathBuf], closure: F, ) -> Result<(), AnyError> where F: Fn() -> WatchFuture, { + let mut debounce = Debounce::new(); + // This binding is required for the watcher to work properly without being dropped. + let _watcher = new_watcher(paths, &debounce)?; + loop { let func = error_handler(closure()); let mut is_file_changed = false; select! { - _ = file_watcher(watch_paths) => { - is_file_changed = true; - info!( - "{} File change detected! Restarting!", - colors::intense_blue("Watcher") - ); - }, - _ = func => { }, - }; + _ = debounce.next() => { + is_file_changed = true; + info!( + "{} File change detected! Restarting!", + colors::intense_blue("Watcher"), + ); + }, + _ = func => {}, + } if !is_file_changed { info!( "{} Process terminated! Restarting on file change...", - colors::intense_blue("Watcher") + colors::intense_blue("Watcher"), ); - file_watcher(watch_paths).await?; + debounce.next().await; info!( "{} File change detected! Restarting!", - colors::intense_blue("Watcher") + colors::intense_blue("Watcher"), ); } } } -pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> { - let (sender, mut receiver) = - mpsc::channel::>(16); - let sender = std::sync::Mutex::new(sender); +fn new_watcher( + paths: &[PathBuf], + debounce: &Debounce, +) -> Result { + let event_detected = Arc::clone(&debounce.event_detected); - let mut watcher: RecommendedWatcher = - Watcher::new_immediate(move |res: Result| { - let res2 = res.map_err(AnyError::from); - let mut sender = sender.lock().unwrap(); - // Ignore result, if send failed it means that watcher was already closed, - // but not all messages have been flushed. - let _ = sender.try_send(res2); - })?; + let mut watcher: RecommendedWatcher = Watcher::new_immediate( + move |res: Result| { + if let Ok(event) = res { + if matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)) + { + event_detected.store(true, Ordering::Relaxed); + } + } + }, + )?; watcher.configure(Config::PreciseEvents(true)).unwrap(); @@ -81,14 +128,5 @@ pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> { watcher.watch(path, RecursiveMode::NonRecursive)?; } - while let Some(result) = receiver.next().await { - let event = result?; - match event.kind { - EventKind::Create(_) => break, - EventKind::Modify(_) => break, - EventKind::Remove(_) => break, - _ => continue, - } - } - Ok(()) + Ok(watcher) } diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index f7b4c345ac..0bba369fe7 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -1210,6 +1210,9 @@ fn run_watch() { std::fs::write(&file_to_watch, "console.log('Hello world2');") .expect("error writing file"); + // Events from the file watcher is "debounced", so we need to wait for the next execution to start + std::thread::sleep(std::time::Duration::from_secs(1)); + assert!(stderr_lines.next().unwrap().contains("Restarting")); assert!(stdout_lines.next().unwrap().contains("Hello world2")); assert!(stderr_lines.next().unwrap().contains("Process terminated"));