common_config/
file_watcher.rs1use std::collections::HashSet;
26use std::path::{Path, PathBuf};
27use std::sync::mpsc::channel;
28
29use common_telemetry::{error, info, warn};
30use notify::{EventKind, RecursiveMode, Watcher};
31use snafu::ResultExt;
32
33use crate::error::{FileWatchSnafu, InvalidPathSnafu, Result};
34
35#[derive(Debug, Clone, Default)]
37pub struct FileWatcherConfig {
38 pub include_remove_events: bool,
40}
41
42impl FileWatcherConfig {
43 pub fn new() -> Self {
44 Default::default()
45 }
46
47 pub fn include_remove_events(mut self) -> Self {
48 self.include_remove_events = true;
49 self
50 }
51}
52
53pub struct FileWatcherBuilder {
59 config: FileWatcherConfig,
60 file_paths: Vec<PathBuf>,
62}
63
64impl FileWatcherBuilder {
65 pub fn new() -> Self {
67 Self {
68 config: FileWatcherConfig::default(),
69 file_paths: Vec::new(),
70 }
71 }
72
73 pub fn config(mut self, config: FileWatcherConfig) -> Self {
75 self.config = config;
76 self
77 }
78
79 pub fn watch_path<P: AsRef<Path>>(mut self, path: P) -> Result<Self> {
84 let path = path.as_ref();
85 snafu::ensure!(
86 path.is_file(),
87 InvalidPathSnafu {
88 path: path.display().to_string(),
89 }
90 );
91
92 self.file_paths.push(path.to_path_buf());
93 Ok(self)
94 }
95
96 pub fn watch_paths<P: AsRef<Path>, I: IntoIterator<Item = P>>(
100 mut self,
101 paths: I,
102 ) -> Result<Self> {
103 for path in paths {
104 self = self.watch_path(path)?;
105 }
106 Ok(self)
107 }
108
109 pub fn spawn<F>(self, callback: F) -> Result<()>
117 where
118 F: Fn() + Send + 'static,
119 {
120 let (tx, rx) = channel::<notify::Result<notify::Event>>();
121 let mut watcher =
122 notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
123
124 let mut watched_dirs: HashSet<PathBuf> = HashSet::new();
126 for file_path in &self.file_paths {
127 if let Some(parent) = file_path.parent()
128 && watched_dirs.insert(parent.to_path_buf())
129 {
130 watcher
131 .watch(parent, RecursiveMode::NonRecursive)
132 .context(FileWatchSnafu {
133 path: parent.display().to_string(),
134 })?;
135 }
136 }
137
138 let config = self.config;
139
140 info!(
141 "Spawning file watcher for paths: {:?} (watching parent directories)",
142 self.file_paths
143 .iter()
144 .map(|p| p.display().to_string())
145 .collect::<Vec<_>>()
146 );
147
148 std::thread::spawn(move || {
149 let _watcher = watcher;
151
152 while let Ok(res) = rx.recv() {
153 match res {
154 Ok(event) => {
155 if !is_relevant_event(&event.kind, &config) {
156 continue;
157 }
158
159 info!(?event.kind, ?event.paths, "Detected folder change");
160 callback();
161 }
162 Err(err) => {
163 warn!("File watcher error: {}", err);
164 }
165 }
166 }
167
168 error!("File watcher channel closed unexpectedly");
169 });
170
171 Ok(())
172 }
173}
174
175impl Default for FileWatcherBuilder {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool {
183 match kind {
184 EventKind::Modify(_) | EventKind::Create(_) => true,
185 EventKind::Remove(_) => config.include_remove_events,
186 _ => false,
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::sync::Arc;
193 use std::sync::atomic::{AtomicUsize, Ordering};
194 use std::time::Duration;
195
196 use common_test_util::temp_dir::create_temp_dir;
197
198 use super::*;
199
200 #[test]
201 fn test_file_watcher_detects_changes() {
202 common_telemetry::init_default_ut_logging();
203
204 let dir = create_temp_dir("test_file_watcher");
205 let file_path = dir.path().join("test_file.txt");
206
207 std::fs::write(&file_path, "initial content").unwrap();
209
210 let counter = Arc::new(AtomicUsize::new(0));
211 let counter_clone = counter.clone();
212
213 FileWatcherBuilder::new()
214 .watch_path(&file_path)
215 .unwrap()
216 .config(FileWatcherConfig::new())
217 .spawn(move || {
218 counter_clone.fetch_add(1, Ordering::SeqCst);
219 })
220 .unwrap();
221
222 std::thread::sleep(Duration::from_millis(100));
224
225 std::fs::write(&file_path, "modified content").unwrap();
227
228 std::thread::sleep(Duration::from_millis(500));
230
231 assert!(
232 counter.load(Ordering::SeqCst) >= 1,
233 "Watcher should have detected at least one change"
234 );
235 }
236
237 #[test]
238 fn test_file_watcher_detects_delete_and_recreate() {
239 common_telemetry::init_default_ut_logging();
240
241 let dir = create_temp_dir("test_file_watcher_recreate");
242 let file_path = dir.path().join("test_file.txt");
243
244 std::fs::write(&file_path, "initial content").unwrap();
246
247 let counter = Arc::new(AtomicUsize::new(0));
248 let counter_clone = counter.clone();
249
250 FileWatcherBuilder::new()
251 .watch_path(&file_path)
252 .unwrap()
253 .config(FileWatcherConfig::new())
254 .spawn(move || {
255 counter_clone.fetch_add(1, Ordering::SeqCst);
256 })
257 .unwrap();
258
259 std::thread::sleep(Duration::from_millis(100));
261
262 std::fs::remove_file(&file_path).unwrap();
264 std::thread::sleep(Duration::from_millis(100));
265
266 std::fs::write(&file_path, "recreated content").unwrap();
268
269 std::thread::sleep(Duration::from_millis(500));
271
272 assert!(
273 counter.load(Ordering::SeqCst) >= 1,
274 "Watcher should have detected file recreation"
275 );
276 }
277}