common_config/
file_watcher.rs1use std::collections::HashSet;
26use std::path::{Path, PathBuf};
27use std::sync::mpsc::channel;
28
29use common_telemetry::{error, info, warn};
30use notify::{EventKind, RecursiveMode, Watcher};
31use snafu::ResultExt;
32
33use crate::error::{CanonicalizePathSnafu, FileWatchSnafu, InvalidPathSnafu, Result};
34
35#[derive(Debug, Clone, Default)]
37pub struct FileWatcherConfig {
38 pub include_remove_events: bool,
40}
41
42impl FileWatcherConfig {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn with_modify_and_create(mut self) -> Self {
48 self.include_remove_events = false;
49 self
50 }
51
52 pub fn with_remove_events(mut self) -> Self {
53 self.include_remove_events = true;
54 self
55 }
56}
57
58pub struct FileWatcherBuilder {
64 config: FileWatcherConfig,
65 file_paths: Vec<PathBuf>,
67}
68
69impl FileWatcherBuilder {
70 pub fn new() -> Self {
72 Self {
73 config: FileWatcherConfig::default(),
74 file_paths: Vec::new(),
75 }
76 }
77
78 pub fn config(mut self, config: FileWatcherConfig) -> Self {
80 self.config = config;
81 self
82 }
83
84 pub fn watch_path<P: AsRef<Path>>(mut self, path: P) -> Result<Self> {
89 let path = path.as_ref();
90 snafu::ensure!(
91 path.is_file(),
92 InvalidPathSnafu {
93 path: path.display().to_string(),
94 }
95 );
96 let canonical = path.canonicalize().context(CanonicalizePathSnafu {
98 path: path.display().to_string(),
99 })?;
100 self.file_paths.push(canonical);
101 Ok(self)
102 }
103
104 pub fn watch_paths<P: AsRef<Path>, I: IntoIterator<Item = P>>(
108 mut self,
109 paths: I,
110 ) -> Result<Self> {
111 for path in paths {
112 self = self.watch_path(path)?;
113 }
114 Ok(self)
115 }
116
117 pub fn spawn<F>(self, callback: F) -> Result<()>
125 where
126 F: Fn() + Send + 'static,
127 {
128 let (tx, rx) = channel::<notify::Result<notify::Event>>();
129 let mut watcher =
130 notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
131
132 let mut watched_dirs: HashSet<PathBuf> = HashSet::new();
134 for file_path in &self.file_paths {
135 if let Some(parent) = file_path.parent()
136 && watched_dirs.insert(parent.to_path_buf())
137 {
138 watcher
139 .watch(parent, RecursiveMode::NonRecursive)
140 .context(FileWatchSnafu {
141 path: parent.display().to_string(),
142 })?;
143 }
144 }
145
146 let config = self.config;
147 let watched_files: HashSet<PathBuf> = self.file_paths.iter().cloned().collect();
148
149 info!(
150 "Spawning file watcher for paths: {:?} (watching parent directories)",
151 self.file_paths
152 .iter()
153 .map(|p| p.display().to_string())
154 .collect::<Vec<_>>()
155 );
156
157 std::thread::spawn(move || {
158 let _watcher = watcher;
160
161 while let Ok(res) = rx.recv() {
162 match res {
163 Ok(event) => {
164 if !is_relevant_event(&event.kind, &config) {
165 continue;
166 }
167
168 let is_watched_file = event.paths.iter().any(|event_path| {
170 if let Ok(canonical) = event_path.canonicalize()
174 && watched_files.contains(&canonical)
175 {
176 return true;
177 }
178 watched_files.contains(event_path)
180 });
181
182 if !is_watched_file {
183 continue;
184 }
185
186 info!(?event.kind, ?event.paths, "Detected file change");
187 callback();
188 }
189 Err(err) => {
190 warn!("File watcher error: {}", err);
191 }
192 }
193 }
194
195 error!("File watcher channel closed unexpectedly");
196 });
197
198 Ok(())
199 }
200}
201
202impl Default for FileWatcherBuilder {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool {
210 match kind {
211 EventKind::Modify(_) | EventKind::Create(_) => true,
212 EventKind::Remove(_) => config.include_remove_events,
213 _ => false,
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::sync::Arc;
220 use std::sync::atomic::{AtomicUsize, Ordering};
221 use std::time::Duration;
222
223 use common_test_util::temp_dir::create_temp_dir;
224
225 use super::*;
226
227 #[test]
228 fn test_file_watcher_detects_changes() {
229 common_telemetry::init_default_ut_logging();
230
231 let dir = create_temp_dir("test_file_watcher");
232 let file_path = dir.path().join("test_file.txt");
233
234 std::fs::write(&file_path, "initial content").unwrap();
236
237 let counter = Arc::new(AtomicUsize::new(0));
238 let counter_clone = counter.clone();
239
240 FileWatcherBuilder::new()
241 .watch_path(&file_path)
242 .unwrap()
243 .config(FileWatcherConfig::new())
244 .spawn(move || {
245 counter_clone.fetch_add(1, Ordering::SeqCst);
246 })
247 .unwrap();
248
249 std::thread::sleep(Duration::from_millis(100));
251
252 std::fs::write(&file_path, "modified content").unwrap();
254
255 std::thread::sleep(Duration::from_millis(500));
257
258 assert!(
259 counter.load(Ordering::SeqCst) >= 1,
260 "Watcher should have detected at least one change"
261 );
262 }
263
264 #[test]
265 fn test_file_watcher_detects_delete_and_recreate() {
266 common_telemetry::init_default_ut_logging();
267
268 let dir = create_temp_dir("test_file_watcher_recreate");
269 let file_path = dir.path().join("test_file.txt");
270
271 std::fs::write(&file_path, "initial content").unwrap();
273
274 let counter = Arc::new(AtomicUsize::new(0));
275 let counter_clone = counter.clone();
276
277 FileWatcherBuilder::new()
278 .watch_path(&file_path)
279 .unwrap()
280 .config(FileWatcherConfig::new())
281 .spawn(move || {
282 counter_clone.fetch_add(1, Ordering::SeqCst);
283 })
284 .unwrap();
285
286 std::thread::sleep(Duration::from_millis(100));
288
289 std::fs::remove_file(&file_path).unwrap();
291 std::thread::sleep(Duration::from_millis(100));
292
293 std::fs::write(&file_path, "recreated content").unwrap();
295
296 std::thread::sleep(Duration::from_millis(500));
298
299 assert!(
300 counter.load(Ordering::SeqCst) >= 1,
301 "Watcher should have detected file recreation"
302 );
303 }
304
305 #[test]
306 fn test_file_watcher_ignores_other_files() {
307 common_telemetry::init_default_ut_logging();
308
309 let dir = create_temp_dir("test_file_watcher_other");
310 let watched_file = dir.path().join("watched.txt");
311 let other_file = dir.path().join("other.txt");
312
313 std::fs::write(&watched_file, "watched content").unwrap();
315 std::fs::write(&other_file, "other content").unwrap();
316
317 let counter = Arc::new(AtomicUsize::new(0));
318 let counter_clone = counter.clone();
319
320 FileWatcherBuilder::new()
321 .watch_path(&watched_file)
322 .unwrap()
323 .config(FileWatcherConfig::new())
324 .spawn(move || {
325 counter_clone.fetch_add(1, Ordering::SeqCst);
326 })
327 .unwrap();
328
329 std::thread::sleep(Duration::from_millis(100));
331
332 std::fs::write(&other_file, "modified other content").unwrap();
334
335 std::thread::sleep(Duration::from_millis(500));
337
338 assert_eq!(
339 counter.load(Ordering::SeqCst),
340 0,
341 "Watcher should not have detected changes to other files"
342 );
343
344 std::fs::write(&watched_file, "modified watched content").unwrap();
346
347 std::thread::sleep(Duration::from_millis(500));
349
350 assert!(
351 counter.load(Ordering::SeqCst) >= 1,
352 "Watcher should have detected change to watched file"
353 );
354 }
355}