diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs index fd7d4e1a1c..c326eb7dd3 100644 --- a/cli/file_watcher.rs +++ b/cli/file_watcher.rs @@ -25,29 +25,39 @@ const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H"; const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200); struct DebouncedReceiver { + // The `recv()` call could be used in a tokio `select!` macro, + // and so we store this state on the struct to ensure we don't + // lose items if a `recv()` never completes + received_items: HashSet, receiver: mpsc::UnboundedReceiver>, } impl DebouncedReceiver { fn new_with_sender() -> (Arc>>, Self) { let (sender, receiver) = mpsc::unbounded_channel(); - (Arc::new(sender), Self { receiver }) + ( + Arc::new(sender), + Self { + receiver, + received_items: HashSet::new(), + }, + ) } async fn recv(&mut self) -> Option> { - let mut received_items = self - .receiver - .recv() - .await? - .into_iter() - .collect::>(); // prevent duplicates + if self.received_items.is_empty() { + self + .received_items + .extend(self.receiver.recv().await?.into_iter()); + } + loop { tokio::select! { items = self.receiver.recv() => { - received_items.extend(items?); + self.received_items.extend(items?); } _ = sleep(DEBOUNCE_INTERVAL) => { - return Some(received_items.into_iter().collect()); + return Some(self.received_items.drain().collect()); } } }