From 3d1123f8b09cecfa57a93d8b8b7d19af2b45f070 Mon Sep 17 00:00:00 2001
From: David Sherret <dsherret@users.noreply.github.com>
Date: Tue, 19 Apr 2022 10:12:30 -0400
Subject: [PATCH] fix: `--watch` was losing items (#14317)

---
 cli/file_watcher.rs | 28 +++++++++++++++++++---------
 1 file changed, 19 insertions(+), 9 deletions(-)

diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs
index fd7d4e1a1c..c326eb7dd3 100644
--- a/cli/file_watcher.rs
+++ b/cli/file_watcher.rs
@@ -25,29 +25,39 @@ const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
 const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
 
 struct DebouncedReceiver {
+  // The `recv()` call could be used in a tokio `select!` macro,
+  // and so we store this state on the struct to ensure we don't
+  // lose items if a `recv()` never completes
+  received_items: HashSet<PathBuf>,
   receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>,
 }
 
 impl DebouncedReceiver {
   fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
     let (sender, receiver) = mpsc::unbounded_channel();
-    (Arc::new(sender), Self { receiver })
+    (
+      Arc::new(sender),
+      Self {
+        receiver,
+        received_items: HashSet::new(),
+      },
+    )
   }
 
   async fn recv(&mut self) -> Option<Vec<PathBuf>> {
-    let mut received_items = self
-      .receiver
-      .recv()
-      .await?
-      .into_iter()
-      .collect::<HashSet<_>>(); // prevent duplicates
+    if self.received_items.is_empty() {
+      self
+        .received_items
+        .extend(self.receiver.recv().await?.into_iter());
+    }
+
     loop {
       tokio::select! {
         items = self.receiver.recv() => {
-          received_items.extend(items?);
+          self.received_items.extend(items?);
         }
         _ = sleep(DEBOUNCE_INTERVAL) => {
-          return Some(received_items.into_iter().collect());
+          return Some(self.received_items.drain().collect());
         }
       }
     }