0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-01 20:25:12 -05:00

refactor(watch): create single watcher for whole process (#8083)

This commit rewrites file watcher used with --watch flag.

Instead of creating new watcher after each restart, only a single
watcher is created for whole process. Additionally debouncing
mechanism has been added to prevent infinite restart loops 
if multiple files were changed in quick succession.

Co-authored-by: bartossh <lenart.consulting@gmail.com>
This commit is contained in:
Yusuke Tanaka 2020-10-28 20:41:18 +09:00 committed by GitHub
parent 07359b7957
commit bfce376c68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 37 deletions

View file

@ -1,8 +1,9 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::colors; use crate::colors;
use core::task::{Context, Poll};
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::futures::stream::StreamExt; use deno_core::futures::stream::{Stream, StreamExt};
use deno_core::futures::Future; use deno_core::futures::Future;
use notify::event::Event as NotifyEvent; use notify::event::Event as NotifyEvent;
use notify::event::EventKind; use notify::event::EventKind;
@ -13,12 +14,51 @@ use notify::RecursiveMode;
use notify::Watcher; use notify::Watcher;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::mpsc; use tokio::time::{interval, Interval};
const DEBOUNCE_INTERVAL_MS: Duration = Duration::from_millis(200);
// TODO(bartlomieju): rename // TODO(bartlomieju): rename
type WatchFuture = Pin<Box<dyn Future<Output = Result<(), AnyError>>>>; type WatchFuture = Pin<Box<dyn Future<Output = Result<(), AnyError>>>>;
struct Debounce {
interval: Interval,
event_detected: Arc<AtomicBool>,
}
impl Debounce {
fn new() -> Self {
Self {
interval: interval(DEBOUNCE_INTERVAL_MS),
event_detected: Arc::new(AtomicBool::new(false)),
}
}
}
impl Stream for Debounce {
type Item = ();
/// Note that this never returns `Poll::Ready(None)`, which means that file watcher will be alive
/// until the Deno process is terminated.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
let inner = self.get_mut();
if inner.event_detected.load(Ordering::Relaxed) {
inner.event_detected.store(false, Ordering::Relaxed);
Poll::Ready(Some(()))
} else {
let _ = inner.interval.poll_tick(cx);
Poll::Pending
}
}
}
async fn error_handler(watch_future: WatchFuture) { async fn error_handler(watch_future: WatchFuture) {
let result = watch_future.await; let result = watch_future.await;
if let Err(err) = result { if let Err(err) = result {
@ -28,52 +68,59 @@ async fn error_handler(watch_future: WatchFuture) {
} }
pub async fn watch_func<F>( pub async fn watch_func<F>(
watch_paths: &[PathBuf], paths: &[PathBuf],
closure: F, closure: F,
) -> Result<(), AnyError> ) -> Result<(), AnyError>
where where
F: Fn() -> WatchFuture, F: Fn() -> WatchFuture,
{ {
let mut debounce = Debounce::new();
// This binding is required for the watcher to work properly without being dropped.
let _watcher = new_watcher(paths, &debounce)?;
loop { loop {
let func = error_handler(closure()); let func = error_handler(closure());
let mut is_file_changed = false; let mut is_file_changed = false;
select! { select! {
_ = file_watcher(watch_paths) => { _ = debounce.next() => {
is_file_changed = true; is_file_changed = true;
info!( info!(
"{} File change detected! Restarting!", "{} File change detected! Restarting!",
colors::intense_blue("Watcher") colors::intense_blue("Watcher"),
); );
}, },
_ = func => { }, _ = func => {},
}; }
if !is_file_changed { if !is_file_changed {
info!( info!(
"{} Process terminated! Restarting on file change...", "{} Process terminated! Restarting on file change...",
colors::intense_blue("Watcher") colors::intense_blue("Watcher"),
); );
file_watcher(watch_paths).await?; debounce.next().await;
info!( info!(
"{} File change detected! Restarting!", "{} File change detected! Restarting!",
colors::intense_blue("Watcher") colors::intense_blue("Watcher"),
); );
} }
} }
} }
pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> { fn new_watcher(
let (sender, mut receiver) = paths: &[PathBuf],
mpsc::channel::<Result<NotifyEvent, AnyError>>(16); debounce: &Debounce,
let sender = std::sync::Mutex::new(sender); ) -> Result<RecommendedWatcher, AnyError> {
let event_detected = Arc::clone(&debounce.event_detected);
let mut watcher: RecommendedWatcher = let mut watcher: RecommendedWatcher = Watcher::new_immediate(
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| { move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map_err(AnyError::from); if let Ok(event) = res {
let mut sender = sender.lock().unwrap(); if matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_))
// Ignore result, if send failed it means that watcher was already closed, {
// but not all messages have been flushed. event_detected.store(true, Ordering::Relaxed);
let _ = sender.try_send(res2); }
})?; }
},
)?;
watcher.configure(Config::PreciseEvents(true)).unwrap(); watcher.configure(Config::PreciseEvents(true)).unwrap();
@ -81,14 +128,5 @@ pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> {
watcher.watch(path, RecursiveMode::NonRecursive)?; watcher.watch(path, RecursiveMode::NonRecursive)?;
} }
while let Some(result) = receiver.next().await { Ok(watcher)
let event = result?;
match event.kind {
EventKind::Create(_) => break,
EventKind::Modify(_) => break,
EventKind::Remove(_) => break,
_ => continue,
}
}
Ok(())
} }

View file

@ -1210,6 +1210,9 @@ fn run_watch() {
std::fs::write(&file_to_watch, "console.log('Hello world2');") std::fs::write(&file_to_watch, "console.log('Hello world2');")
.expect("error writing file"); .expect("error writing file");
// Events from the file watcher is "debounced", so we need to wait for the next execution to start
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(stderr_lines.next().unwrap().contains("Restarting")); assert!(stderr_lines.next().unwrap().contains("Restarting"));
assert!(stdout_lines.next().unwrap().contains("Hello world2")); assert!(stdout_lines.next().unwrap().contains("Hello world2"));
assert!(stderr_lines.next().unwrap().contains("Process terminated")); assert!(stderr_lines.next().unwrap().contains("Process terminated"));