mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19use store_api::region_request::PathType;
20
21use crate::access_layer::AccessLayerRef;
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::{FileMeta, delete_files, delete_index};
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28/// A worker to delete files in background.
29pub trait FilePurger: Send + Sync + fmt::Debug {
30    /// Send a request to remove the file.
31    /// If `is_delete` is true, the file will be deleted from the storage.
32    /// Otherwise, only the reference will be removed.
33    /// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`.
34    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
35
36    /// Notify the purger of a new file created.
37    /// This is useful for object store based storage, where we need to track the file references
38    /// The default implementation is a no-op.
39    fn new_file(&self, _: &FileMeta) {
40        // noop
41    }
42}
43
44pub type FilePurgerRef = Arc<dyn FilePurger>;
45
46/// A no-op file purger can be used in combination with reading SST files outside of this region.
47#[derive(Debug)]
48pub struct NoopFilePurger;
49
50impl FilePurger for NoopFilePurger {
51    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
52        // noop
53    }
54}
55
56/// Purger that purges file for current region.
57pub struct LocalFilePurger {
58    scheduler: SchedulerRef,
59    sst_layer: AccessLayerRef,
60    cache_manager: Option<CacheManagerRef>,
61}
62
63impl fmt::Debug for LocalFilePurger {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("LocalFilePurger")
66            .field("sst_layer", &self.sst_layer)
67            .finish()
68    }
69}
70
71#[cfg(not(debug_assertions))]
72/// Whether to enable GC for the file purger.
73pub fn should_enable_gc(
74    global_gc_enabled: bool,
75    object_store_scheme: object_store::Scheme,
76) -> bool {
77    global_gc_enabled && object_store_scheme != object_store::Scheme::Fs
78}
79
80#[cfg(debug_assertions)]
81/// For debug build, we may use Fs as the object store scheme,
82/// so we need to enable GC for local file system.
83pub fn should_enable_gc(
84    global_gc_enabled: bool,
85    _object_store_scheme: object_store::Scheme,
86) -> bool {
87    global_gc_enabled
88}
89
90/// Creates a file purger based on the storage type of the access layer.
91/// Should be use in combination with Gc Worker.
92///
93/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
94/// the files from both the storage and the cache.
95///
96/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
97/// only manages the file references without deleting the actual files.
98///
99pub fn create_file_purger(
100    gc_enabled: bool,
101    path_type: PathType,
102    scheduler: SchedulerRef,
103    sst_layer: AccessLayerRef,
104    cache_manager: Option<CacheManagerRef>,
105    file_ref_manager: FileReferenceManagerRef,
106) -> FilePurgerRef {
107    // Only enable GC for:
108    // - object store based storage
109    // - data or bare path type (metadata region doesn't need to be GCed)
110    if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme())
111        && matches!(path_type, PathType::Data | PathType::Bare)
112    {
113        Arc::new(ObjectStoreFilePurger { file_ref_manager })
114    } else {
115        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
116    }
117}
118
119/// Creates a local file purger that deletes files from both the storage and the cache.
120pub fn create_local_file_purger(
121    scheduler: SchedulerRef,
122    sst_layer: AccessLayerRef,
123    cache_manager: Option<CacheManagerRef>,
124    _file_ref_manager: FileReferenceManagerRef,
125) -> FilePurgerRef {
126    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
127}
128
129impl LocalFilePurger {
130    /// Creates a new purger.
131    pub fn new(
132        scheduler: SchedulerRef,
133        sst_layer: AccessLayerRef,
134        cache_manager: Option<CacheManagerRef>,
135    ) -> Self {
136        Self {
137            scheduler,
138            sst_layer,
139            cache_manager,
140        }
141    }
142
143    /// Stop the scheduler of the file purger.
144    pub async fn stop_scheduler(&self) -> Result<()> {
145        self.scheduler.stop(true).await
146    }
147
148    /// Deletes the file(and it's index, if any) from cache and storage.
149    fn delete_file(&self, file_meta: FileMeta) {
150        let sst_layer = self.sst_layer.clone();
151        let cache_manager = self.cache_manager.clone();
152        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
153            if let Err(e) = delete_files(
154                file_meta.region_id,
155                &[(file_meta.file_id, file_meta.index_id().version)],
156                file_meta.exists_index(),
157                &sst_layer,
158                &cache_manager,
159            )
160            .await
161            {
162                error!(e; "Failed to delete file {:?} from storage", file_meta);
163            }
164        })) {
165            error!(e; "Failed to schedule the file purge request");
166        }
167    }
168
169    fn delete_index(&self, file_meta: FileMeta) {
170        let sst_layer = self.sst_layer.clone();
171        let cache_manager = self.cache_manager.clone();
172        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
173            let index_id = file_meta.index_id();
174            if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
175                error!(e; "Failed to delete index for file {:?} from storage", file_meta);
176            }
177        })) {
178            error!(e; "Failed to schedule the index purge request");
179        }
180    }
181}
182
183impl FilePurger for LocalFilePurger {
184    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
185        if is_delete {
186            self.delete_file(file_meta);
187        } else if index_outdated {
188            self.delete_index(file_meta);
189        }
190    }
191}
192
193#[derive(Debug)]
194pub struct ObjectStoreFilePurger {
195    file_ref_manager: FileReferenceManagerRef,
196}
197
198impl FilePurger for ObjectStoreFilePurger {
199    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
200        // if not on local file system, instead inform the global file purger to remove the file reference.
201        // notice that no matter whether the file is deleted or not, we need to remove the reference
202        // because the file is no longer in use nonetheless.
203        // for same reason, we don't care about index_outdated here.
204        self.file_ref_manager.remove_file(&file_meta);
205    }
206
207    fn new_file(&self, file_meta: &FileMeta) {
208        self.file_ref_manager.add_file(file_meta);
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use std::num::NonZeroU64;
215
216    use common_test_util::temp_dir::create_temp_dir;
217    use object_store::ObjectStore;
218    use object_store::services::Fs;
219    use smallvec::SmallVec;
220    use store_api::region_request::PathType;
221    use store_api::storage::{FileId, RegionId};
222
223    use super::*;
224    use crate::access_layer::AccessLayer;
225    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
226    use crate::sst::file::{
227        ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
228        RegionIndexId,
229    };
230    use crate::sst::index::intermediate::IntermediateManager;
231    use crate::sst::index::puffin_manager::PuffinManagerFactory;
232    use crate::sst::location;
233
234    #[tokio::test]
235    async fn test_file_purge() {
236        common_telemetry::init_default_ut_logging();
237
238        let dir = create_temp_dir("file-purge");
239        let dir_path = dir.path().display().to_string();
240        let builder = Fs::default().root(&dir_path);
241        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
242        let sst_dir = "table1";
243
244        let index_aux_path = dir.path().join("index_aux");
245        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
246            .await
247            .unwrap();
248        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
249            .await
250            .unwrap();
251
252        let object_store = ObjectStore::new(builder).unwrap().finish();
253
254        let layer = Arc::new(AccessLayer::new(
255            sst_dir,
256            PathType::Bare,
257            object_store.clone(),
258            puffin_mgr,
259            intm_mgr,
260        ));
261        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
262        object_store.write(&path, vec![0; 4096]).await.unwrap();
263
264        let scheduler = Arc::new(LocalScheduler::new(3));
265
266        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
267
268        {
269            let handle = FileHandle::new(
270                FileMeta {
271                    region_id: sst_file_id.region_id(),
272                    file_id: sst_file_id.file_id(),
273                    time_range: FileTimeRange::default(),
274                    level: 0,
275                    file_size: 4096,
276                    max_row_group_uncompressed_size: 4096,
277                    available_indexes: Default::default(),
278                    indexes: Default::default(),
279                    index_file_size: 0,
280                    index_version: 0,
281                    num_rows: 0,
282                    num_row_groups: 0,
283                    sequence: None,
284                    partition_expr: None,
285                    num_series: 0,
286                },
287                file_purger,
288            );
289            // mark file as deleted and drop the handle, we expect the file is deleted.
290            handle.mark_deleted();
291        }
292
293        scheduler.stop(true).await.unwrap();
294
295        assert!(!object_store.exists(&path).await.unwrap());
296    }
297
298    #[tokio::test]
299    async fn test_file_purge_with_index() {
300        common_telemetry::init_default_ut_logging();
301
302        let dir = create_temp_dir("file-purge");
303        let dir_path = dir.path().display().to_string();
304        let builder = Fs::default().root(&dir_path);
305        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
306        let index_file_id = RegionIndexId::new(sst_file_id, 0);
307        let sst_dir = "table1";
308
309        let index_aux_path = dir.path().join("index_aux");
310        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
311            .await
312            .unwrap();
313        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
314            .await
315            .unwrap();
316
317        let object_store = ObjectStore::new(builder).unwrap().finish();
318
319        let layer = Arc::new(AccessLayer::new(
320            sst_dir,
321            PathType::Bare,
322            object_store.clone(),
323            puffin_mgr,
324            intm_mgr,
325        ));
326        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
327        object_store.write(&path, vec![0; 4096]).await.unwrap();
328
329        let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
330        object_store
331            .write(&index_path, vec![0; 4096])
332            .await
333            .unwrap();
334
335        let scheduler = Arc::new(LocalScheduler::new(3));
336
337        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
338
339        {
340            let handle = FileHandle::new(
341                FileMeta {
342                    region_id: sst_file_id.region_id(),
343                    file_id: sst_file_id.file_id(),
344                    time_range: FileTimeRange::default(),
345                    level: 0,
346                    file_size: 4096,
347                    max_row_group_uncompressed_size: 4096,
348                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
349                    indexes: vec![ColumnIndexMetadata {
350                        column_id: 0,
351                        created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
352                    }],
353                    index_file_size: 4096,
354                    index_version: 0,
355                    num_rows: 1024,
356                    num_row_groups: 1,
357                    sequence: NonZeroU64::new(4096),
358                    partition_expr: None,
359                    num_series: 0,
360                },
361                file_purger,
362            );
363            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
364            handle.mark_deleted();
365        }
366
367        scheduler.stop(true).await.unwrap();
368
369        assert!(!object_store.exists(&path).await.unwrap());
370        assert!(!object_store.exists(&index_path).await.unwrap());
371    }
372}