common_config/
file_watcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common file watching utilities for configuration hot-reloading.
16//!
17//! This module provides a generic file watcher that can be used to watch
18//! files for changes and trigger callbacks when changes occur.
19//!
20//! The watcher monitors the parent directory of each file rather than the
21//! file itself. This ensures that file deletions and recreations are properly
22//! tracked, which is common with editors that use atomic saves or when
23//! configuration files are replaced.
24
25use std::collections::HashSet;
26use std::path::{Path, PathBuf};
27use std::sync::mpsc::channel;
28
29use common_telemetry::{error, info, warn};
30use notify::{EventKind, RecursiveMode, Watcher};
31use snafu::ResultExt;
32
33use crate::error::{CanonicalizePathSnafu, FileWatchSnafu, InvalidPathSnafu, Result};
34
35/// Configuration for the file watcher behavior.
36#[derive(Debug, Clone, Default)]
37pub struct FileWatcherConfig {
38    /// Whether to include Remove events in addition to Modify and Create.
39    pub include_remove_events: bool,
40}
41
42impl FileWatcherConfig {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    pub fn with_modify_and_create(mut self) -> Self {
48        self.include_remove_events = false;
49        self
50    }
51
52    pub fn with_remove_events(mut self) -> Self {
53        self.include_remove_events = true;
54        self
55    }
56}
57
58/// A builder for creating file watchers with flexible configuration.
59///
60/// The watcher monitors the parent directory of each file to handle file
61/// deletion and recreation properly. Events are filtered to only trigger
62/// callbacks for the specific files being watched.
63pub struct FileWatcherBuilder {
64    config: FileWatcherConfig,
65    /// Canonicalized paths of files to watch.
66    file_paths: Vec<PathBuf>,
67}
68
69impl FileWatcherBuilder {
70    /// Create a new builder with default configuration.
71    pub fn new() -> Self {
72        Self {
73            config: FileWatcherConfig::default(),
74            file_paths: Vec::new(),
75        }
76    }
77
78    /// Set the watcher configuration.
79    pub fn config(mut self, config: FileWatcherConfig) -> Self {
80        self.config = config;
81        self
82    }
83
84    /// Add a file path to watch.
85    ///
86    /// Returns an error if the path is a directory.
87    /// The path is canonicalized for reliable comparison with events.
88    pub fn watch_path<P: AsRef<Path>>(mut self, path: P) -> Result<Self> {
89        let path = path.as_ref();
90        snafu::ensure!(
91            path.is_file(),
92            InvalidPathSnafu {
93                path: path.display().to_string(),
94            }
95        );
96        // Canonicalize the path for reliable comparison with event paths
97        let canonical = path.canonicalize().context(CanonicalizePathSnafu {
98            path: path.display().to_string(),
99        })?;
100        self.file_paths.push(canonical);
101        Ok(self)
102    }
103
104    /// Add multiple file paths to watch.
105    ///
106    /// Returns an error if any path is a directory.
107    pub fn watch_paths<P: AsRef<Path>, I: IntoIterator<Item = P>>(
108        mut self,
109        paths: I,
110    ) -> Result<Self> {
111        for path in paths {
112            self = self.watch_path(path)?;
113        }
114        Ok(self)
115    }
116
117    /// Build and spawn the file watcher with the given callback.
118    ///
119    /// The callback is invoked when relevant file events are detected for
120    /// the watched files. The watcher monitors the parent directories to
121    /// handle file deletion and recreation properly.
122    ///
123    /// The spawned watcher thread runs for the lifetime of the process.
124    pub fn spawn<F>(self, callback: F) -> Result<()>
125    where
126        F: Fn() + Send + 'static,
127    {
128        let (tx, rx) = channel::<notify::Result<notify::Event>>();
129        let mut watcher =
130            notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
131
132        // Collect unique parent directories to watch
133        let mut watched_dirs: HashSet<PathBuf> = HashSet::new();
134        for file_path in &self.file_paths {
135            if let Some(parent) = file_path.parent()
136                && watched_dirs.insert(parent.to_path_buf())
137            {
138                watcher
139                    .watch(parent, RecursiveMode::NonRecursive)
140                    .context(FileWatchSnafu {
141                        path: parent.display().to_string(),
142                    })?;
143            }
144        }
145
146        let config = self.config;
147        let watched_files: HashSet<PathBuf> = self.file_paths.iter().cloned().collect();
148
149        info!(
150            "Spawning file watcher for paths: {:?} (watching parent directories)",
151            self.file_paths
152                .iter()
153                .map(|p| p.display().to_string())
154                .collect::<Vec<_>>()
155        );
156
157        std::thread::spawn(move || {
158            // Keep watcher alive in the thread
159            let _watcher = watcher;
160
161            while let Ok(res) = rx.recv() {
162                match res {
163                    Ok(event) => {
164                        if !is_relevant_event(&event.kind, &config) {
165                            continue;
166                        }
167
168                        // Check if any of the event paths match our watched files
169                        let is_watched_file = event.paths.iter().any(|event_path| {
170                            // Try to canonicalize the event path for comparison
171                            // If the file was deleted, canonicalize will fail, so we also
172                            // compare the raw path
173                            if let Ok(canonical) = event_path.canonicalize()
174                                && watched_files.contains(&canonical)
175                            {
176                                return true;
177                            }
178                            // For deleted files, compare using the raw path
179                            watched_files.contains(event_path)
180                        });
181
182                        if !is_watched_file {
183                            continue;
184                        }
185
186                        info!(?event.kind, ?event.paths, "Detected file change");
187                        callback();
188                    }
189                    Err(err) => {
190                        warn!("File watcher error: {}", err);
191                    }
192                }
193            }
194
195            error!("File watcher channel closed unexpectedly");
196        });
197
198        Ok(())
199    }
200}
201
202impl Default for FileWatcherBuilder {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208/// Check if an event kind is relevant based on the configuration.
209fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool {
210    match kind {
211        EventKind::Modify(_) | EventKind::Create(_) => true,
212        EventKind::Remove(_) => config.include_remove_events,
213        _ => false,
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::sync::Arc;
220    use std::sync::atomic::{AtomicUsize, Ordering};
221    use std::time::Duration;
222
223    use common_test_util::temp_dir::create_temp_dir;
224
225    use super::*;
226
227    #[test]
228    fn test_file_watcher_detects_changes() {
229        common_telemetry::init_default_ut_logging();
230
231        let dir = create_temp_dir("test_file_watcher");
232        let file_path = dir.path().join("test_file.txt");
233
234        // Create initial file
235        std::fs::write(&file_path, "initial content").unwrap();
236
237        let counter = Arc::new(AtomicUsize::new(0));
238        let counter_clone = counter.clone();
239
240        FileWatcherBuilder::new()
241            .watch_path(&file_path)
242            .unwrap()
243            .config(FileWatcherConfig::new())
244            .spawn(move || {
245                counter_clone.fetch_add(1, Ordering::SeqCst);
246            })
247            .unwrap();
248
249        // Give watcher time to start
250        std::thread::sleep(Duration::from_millis(100));
251
252        // Modify the file
253        std::fs::write(&file_path, "modified content").unwrap();
254
255        // Wait for the event to be processed
256        std::thread::sleep(Duration::from_millis(500));
257
258        assert!(
259            counter.load(Ordering::SeqCst) >= 1,
260            "Watcher should have detected at least one change"
261        );
262    }
263
264    #[test]
265    fn test_file_watcher_detects_delete_and_recreate() {
266        common_telemetry::init_default_ut_logging();
267
268        let dir = create_temp_dir("test_file_watcher_recreate");
269        let file_path = dir.path().join("test_file.txt");
270
271        // Create initial file
272        std::fs::write(&file_path, "initial content").unwrap();
273
274        let counter = Arc::new(AtomicUsize::new(0));
275        let counter_clone = counter.clone();
276
277        FileWatcherBuilder::new()
278            .watch_path(&file_path)
279            .unwrap()
280            .config(FileWatcherConfig::new())
281            .spawn(move || {
282                counter_clone.fetch_add(1, Ordering::SeqCst);
283            })
284            .unwrap();
285
286        // Give watcher time to start
287        std::thread::sleep(Duration::from_millis(100));
288
289        // Delete the file
290        std::fs::remove_file(&file_path).unwrap();
291        std::thread::sleep(Duration::from_millis(100));
292
293        // Recreate the file - this should still be detected because we watch the directory
294        std::fs::write(&file_path, "recreated content").unwrap();
295
296        // Wait for the event to be processed
297        std::thread::sleep(Duration::from_millis(500));
298
299        assert!(
300            counter.load(Ordering::SeqCst) >= 1,
301            "Watcher should have detected file recreation"
302        );
303    }
304
305    #[test]
306    fn test_file_watcher_ignores_other_files() {
307        common_telemetry::init_default_ut_logging();
308
309        let dir = create_temp_dir("test_file_watcher_other");
310        let watched_file = dir.path().join("watched.txt");
311        let other_file = dir.path().join("other.txt");
312
313        // Create both files
314        std::fs::write(&watched_file, "watched content").unwrap();
315        std::fs::write(&other_file, "other content").unwrap();
316
317        let counter = Arc::new(AtomicUsize::new(0));
318        let counter_clone = counter.clone();
319
320        FileWatcherBuilder::new()
321            .watch_path(&watched_file)
322            .unwrap()
323            .config(FileWatcherConfig::new())
324            .spawn(move || {
325                counter_clone.fetch_add(1, Ordering::SeqCst);
326            })
327            .unwrap();
328
329        // Give watcher time to start
330        std::thread::sleep(Duration::from_millis(100));
331
332        // Modify the other file - should NOT trigger callback
333        std::fs::write(&other_file, "modified other content").unwrap();
334
335        // Wait for potential event
336        std::thread::sleep(Duration::from_millis(500));
337
338        assert_eq!(
339            counter.load(Ordering::SeqCst),
340            0,
341            "Watcher should not have detected changes to other files"
342        );
343
344        // Now modify the watched file - SHOULD trigger callback
345        std::fs::write(&watched_file, "modified watched content").unwrap();
346
347        // Wait for the event to be processed
348        std::thread::sleep(Duration::from_millis(500));
349
350        assert!(
351            counter.load(Ordering::SeqCst) >= 1,
352            "Watcher should have detected change to watched file"
353        );
354    }
355}