common_config/
file_watcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common file watching utilities for configuration hot-reloading.
16//!
17//! This module provides a generic file watcher that can be used to watch
18//! files for changes and trigger callbacks when changes occur.
19//!
20//! The watcher monitors the parent directory of each file rather than the
21//! file itself. This ensures that file deletions and recreations are properly
22//! tracked, which is common with editors that use atomic saves or when
23//! configuration files are replaced.
24
25use std::collections::HashSet;
26use std::path::{Path, PathBuf};
27use std::sync::mpsc::channel;
28
29use common_telemetry::{error, info, warn};
30use notify::{EventKind, RecursiveMode, Watcher};
31use snafu::ResultExt;
32
33use crate::error::{FileWatchSnafu, InvalidPathSnafu, Result};
34
35/// Configuration for the file watcher behavior.
36#[derive(Debug, Clone, Default)]
37pub struct FileWatcherConfig {
38    /// Whether to include Remove events in addition to Modify and Create.
39    pub include_remove_events: bool,
40}
41
42impl FileWatcherConfig {
43    pub fn new() -> Self {
44        Default::default()
45    }
46
47    pub fn include_remove_events(mut self) -> Self {
48        self.include_remove_events = true;
49        self
50    }
51}
52
53/// A builder for creating file watchers with flexible configuration.
54///
55/// The watcher monitors the parent directory of each file to handle file
56/// deletion and recreation properly. Events are filtered to only trigger
57/// callbacks for the specific files being watched.
58pub struct FileWatcherBuilder {
59    config: FileWatcherConfig,
60    /// Canonicalized paths of files to watch.
61    file_paths: Vec<PathBuf>,
62}
63
64impl FileWatcherBuilder {
65    /// Create a new builder with default configuration.
66    pub fn new() -> Self {
67        Self {
68            config: FileWatcherConfig::default(),
69            file_paths: Vec::new(),
70        }
71    }
72
73    /// Set the watcher configuration.
74    pub fn config(mut self, config: FileWatcherConfig) -> Self {
75        self.config = config;
76        self
77    }
78
79    /// Add a file path to watch.
80    ///
81    /// Returns an error if the path is a directory.
82    /// The path is canonicalized for reliable comparison with events.
83    pub fn watch_path<P: AsRef<Path>>(mut self, path: P) -> Result<Self> {
84        let path = path.as_ref();
85        snafu::ensure!(
86            path.is_file(),
87            InvalidPathSnafu {
88                path: path.display().to_string(),
89            }
90        );
91
92        self.file_paths.push(path.to_path_buf());
93        Ok(self)
94    }
95
96    /// Add multiple file paths to watch.
97    ///
98    /// Returns an error if any path is a directory.
99    pub fn watch_paths<P: AsRef<Path>, I: IntoIterator<Item = P>>(
100        mut self,
101        paths: I,
102    ) -> Result<Self> {
103        for path in paths {
104            self = self.watch_path(path)?;
105        }
106        Ok(self)
107    }
108
109    /// Build and spawn the file watcher with the given callback.
110    ///
111    /// The callback is invoked when relevant file events are detected for
112    /// the watched files. The watcher monitors the parent directories to
113    /// handle file deletion and recreation properly.
114    ///
115    /// The spawned watcher thread runs for the lifetime of the process.
116    pub fn spawn<F>(self, callback: F) -> Result<()>
117    where
118        F: Fn() + Send + 'static,
119    {
120        let (tx, rx) = channel::<notify::Result<notify::Event>>();
121        let mut watcher =
122            notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
123
124        // Collect unique parent directories to watch
125        let mut watched_dirs: HashSet<PathBuf> = HashSet::new();
126        for file_path in &self.file_paths {
127            if let Some(parent) = file_path.parent()
128                && watched_dirs.insert(parent.to_path_buf())
129            {
130                watcher
131                    .watch(parent, RecursiveMode::NonRecursive)
132                    .context(FileWatchSnafu {
133                        path: parent.display().to_string(),
134                    })?;
135            }
136        }
137
138        let config = self.config;
139
140        info!(
141            "Spawning file watcher for paths: {:?} (watching parent directories)",
142            self.file_paths
143                .iter()
144                .map(|p| p.display().to_string())
145                .collect::<Vec<_>>()
146        );
147
148        std::thread::spawn(move || {
149            // Keep watcher alive in the thread
150            let _watcher = watcher;
151
152            while let Ok(res) = rx.recv() {
153                match res {
154                    Ok(event) => {
155                        if !is_relevant_event(&event.kind, &config) {
156                            continue;
157                        }
158
159                        info!(?event.kind, ?event.paths, "Detected folder change");
160                        callback();
161                    }
162                    Err(err) => {
163                        warn!("File watcher error: {}", err);
164                    }
165                }
166            }
167
168            error!("File watcher channel closed unexpectedly");
169        });
170
171        Ok(())
172    }
173}
174
175impl Default for FileWatcherBuilder {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181/// Check if an event kind is relevant based on the configuration.
182fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool {
183    match kind {
184        EventKind::Modify(_) | EventKind::Create(_) => true,
185        EventKind::Remove(_) => config.include_remove_events,
186        _ => false,
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use std::sync::Arc;
193    use std::sync::atomic::{AtomicUsize, Ordering};
194    use std::time::Duration;
195
196    use common_test_util::temp_dir::create_temp_dir;
197
198    use super::*;
199
200    #[test]
201    fn test_file_watcher_detects_changes() {
202        common_telemetry::init_default_ut_logging();
203
204        let dir = create_temp_dir("test_file_watcher");
205        let file_path = dir.path().join("test_file.txt");
206
207        // Create initial file
208        std::fs::write(&file_path, "initial content").unwrap();
209
210        let counter = Arc::new(AtomicUsize::new(0));
211        let counter_clone = counter.clone();
212
213        FileWatcherBuilder::new()
214            .watch_path(&file_path)
215            .unwrap()
216            .config(FileWatcherConfig::new())
217            .spawn(move || {
218                counter_clone.fetch_add(1, Ordering::SeqCst);
219            })
220            .unwrap();
221
222        // Give watcher time to start
223        std::thread::sleep(Duration::from_millis(100));
224
225        // Modify the file
226        std::fs::write(&file_path, "modified content").unwrap();
227
228        // Wait for the event to be processed
229        std::thread::sleep(Duration::from_millis(500));
230
231        assert!(
232            counter.load(Ordering::SeqCst) >= 1,
233            "Watcher should have detected at least one change"
234        );
235    }
236
237    #[test]
238    fn test_file_watcher_detects_delete_and_recreate() {
239        common_telemetry::init_default_ut_logging();
240
241        let dir = create_temp_dir("test_file_watcher_recreate");
242        let file_path = dir.path().join("test_file.txt");
243
244        // Create initial file
245        std::fs::write(&file_path, "initial content").unwrap();
246
247        let counter = Arc::new(AtomicUsize::new(0));
248        let counter_clone = counter.clone();
249
250        FileWatcherBuilder::new()
251            .watch_path(&file_path)
252            .unwrap()
253            .config(FileWatcherConfig::new())
254            .spawn(move || {
255                counter_clone.fetch_add(1, Ordering::SeqCst);
256            })
257            .unwrap();
258
259        // Give watcher time to start
260        std::thread::sleep(Duration::from_millis(100));
261
262        // Delete the file
263        std::fs::remove_file(&file_path).unwrap();
264        std::thread::sleep(Duration::from_millis(100));
265
266        // Recreate the file - this should still be detected because we watch the directory
267        std::fs::write(&file_path, "recreated content").unwrap();
268
269        // Wait for the event to be processed
270        std::thread::sleep(Duration::from_millis(500));
271
272        assert!(
273            counter.load(Ordering::SeqCst) >= 1,
274            "Watcher should have detected file recreation"
275        );
276    }
277}