Skip to main content

mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19use store_api::region_request::PathType;
20
21use crate::access_layer::AccessLayerRef;
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::{FileMeta, delete_files, delete_index};
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28/// A worker to delete files in background.
29pub trait FilePurger: Send + Sync + fmt::Debug {
30    /// Send a request to remove the file.
31    /// If `is_delete` is true, the file will be deleted from the storage.
32    /// Otherwise, only the reference will be removed.
33    /// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`.
34    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
35
36    /// Notify the purger of a new file created.
37    /// This is useful for object store based storage, where we need to track the file references
38    /// The default implementation is a no-op.
39    fn new_file(&self, _: &FileMeta) {
40        // noop
41    }
42}
43
44pub type FilePurgerRef = Arc<dyn FilePurger>;
45
46/// A no-op file purger can be used in combination with reading SST files outside of this region.
47#[derive(Debug)]
48pub struct NoopFilePurger;
49
50impl FilePurger for NoopFilePurger {
51    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
52        // noop
53    }
54}
55
56/// Purger that purges file for current region.
57pub struct LocalFilePurger {
58    scheduler: SchedulerRef,
59    sst_layer: AccessLayerRef,
60    cache_manager: Option<CacheManagerRef>,
61}
62
63impl fmt::Debug for LocalFilePurger {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("LocalFilePurger")
66            .field("sst_layer", &self.sst_layer)
67            .finish()
68    }
69}
70
71#[cfg(not(debug_assertions))]
72/// Whether to enable GC for the file purger.
73pub fn should_enable_gc(global_gc_enabled: bool, object_store_scheme: &'static str) -> bool {
74    global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME
75}
76
77#[cfg(debug_assertions)]
78/// For debug build, we may use Fs as the object store scheme,
79/// so we need to enable GC for local file system.
80pub fn should_enable_gc(global_gc_enabled: bool, _object_store_scheme: &'static str) -> bool {
81    global_gc_enabled
82}
83
84/// Creates a file purger based on the storage type of the access layer.
85/// Should be use in combination with Gc Worker.
86///
87/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
88/// the files from both the storage and the cache.
89///
90/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
91/// only manages the file references without deleting the actual files.
92///
93pub fn create_file_purger(
94    gc_enabled: bool,
95    path_type: PathType,
96    scheduler: SchedulerRef,
97    sst_layer: AccessLayerRef,
98    cache_manager: Option<CacheManagerRef>,
99    file_ref_manager: FileReferenceManagerRef,
100) -> FilePurgerRef {
101    // Only enable GC for:
102    // - object store based storage
103    // - data or bare path type (metadata region doesn't need to be GCed)
104    if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme())
105        && matches!(path_type, PathType::Data | PathType::Bare)
106    {
107        Arc::new(ObjectStoreFilePurger { file_ref_manager })
108    } else {
109        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
110    }
111}
112
113/// Creates a local file purger that deletes files from both the storage and the cache.
114pub fn create_local_file_purger(
115    scheduler: SchedulerRef,
116    sst_layer: AccessLayerRef,
117    cache_manager: Option<CacheManagerRef>,
118    _file_ref_manager: FileReferenceManagerRef,
119) -> FilePurgerRef {
120    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
121}
122
123impl LocalFilePurger {
124    /// Creates a new purger.
125    pub fn new(
126        scheduler: SchedulerRef,
127        sst_layer: AccessLayerRef,
128        cache_manager: Option<CacheManagerRef>,
129    ) -> Self {
130        Self {
131            scheduler,
132            sst_layer,
133            cache_manager,
134        }
135    }
136
137    /// Stop the scheduler of the file purger.
138    pub async fn stop_scheduler(&self) -> Result<()> {
139        self.scheduler.stop(true).await
140    }
141
142    /// Deletes the file(and it's index, if any) from cache and storage.
143    fn delete_file(&self, file_meta: FileMeta) {
144        let sst_layer = self.sst_layer.clone();
145        let cache_manager = self.cache_manager.clone();
146        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
147            if let Err(e) = delete_files(
148                file_meta.region_id,
149                &[(file_meta.file_id, file_meta.index_id().version)],
150                file_meta.exists_index(),
151                &sst_layer,
152                &cache_manager,
153            )
154            .await
155            {
156                error!(e; "Failed to delete file {:?} from storage", file_meta);
157            }
158        })) {
159            error!(e; "Failed to schedule the file purge request");
160        }
161    }
162
163    fn delete_index(&self, file_meta: FileMeta) {
164        let sst_layer = self.sst_layer.clone();
165        let cache_manager = self.cache_manager.clone();
166        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
167            let index_id = file_meta.index_id();
168            if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
169                error!(e; "Failed to delete index for file {:?} from storage", file_meta);
170            }
171        })) {
172            error!(e; "Failed to schedule the index purge request");
173        }
174    }
175}
176
177impl FilePurger for LocalFilePurger {
178    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
179        if is_delete {
180            self.delete_file(file_meta);
181        } else if index_outdated {
182            self.delete_index(file_meta);
183        }
184    }
185}
186
187#[derive(Debug)]
188pub struct ObjectStoreFilePurger {
189    file_ref_manager: FileReferenceManagerRef,
190}
191
192impl FilePurger for ObjectStoreFilePurger {
193    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
194        // if not on local file system, instead inform the global file purger to remove the file reference.
195        // notice that no matter whether the file is deleted or not, we need to remove the reference
196        // because the file is no longer in use nonetheless.
197        // for same reason, we don't care about index_outdated here.
198        self.file_ref_manager.remove_file(&file_meta);
199    }
200
201    fn new_file(&self, file_meta: &FileMeta) {
202        self.file_ref_manager.add_file(file_meta);
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::num::NonZeroU64;
209
210    use common_test_util::temp_dir::create_temp_dir;
211    use object_store::ObjectStore;
212    use object_store::services::Fs;
213    use smallvec::SmallVec;
214    use store_api::region_request::PathType;
215    use store_api::storage::{FileId, RegionId};
216
217    use super::*;
218    use crate::access_layer::AccessLayer;
219    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
220    use crate::sst::file::{
221        ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
222        RegionIndexId,
223    };
224    use crate::sst::index::intermediate::IntermediateManager;
225    use crate::sst::index::puffin_manager::PuffinManagerFactory;
226    use crate::sst::location;
227
228    #[tokio::test]
229    async fn test_file_purge() {
230        common_telemetry::init_default_ut_logging();
231
232        let dir = create_temp_dir("file-purge");
233        let dir_path = dir.path().display().to_string();
234        let builder = Fs::default().root(&dir_path);
235        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
236        let sst_dir = "table1";
237
238        let index_aux_path = dir.path().join("index_aux");
239        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
240            .await
241            .unwrap();
242        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
243            .await
244            .unwrap();
245
246        let object_store = ObjectStore::new(builder).unwrap().finish();
247
248        let layer = Arc::new(AccessLayer::new(
249            sst_dir,
250            PathType::Bare,
251            object_store.clone(),
252            puffin_mgr,
253            intm_mgr,
254        ));
255        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
256        object_store.write(&path, vec![0; 4096]).await.unwrap();
257
258        let scheduler = Arc::new(LocalScheduler::new(3));
259
260        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
261
262        {
263            let handle = FileHandle::new(
264                FileMeta {
265                    region_id: sst_file_id.region_id(),
266                    file_id: sst_file_id.file_id(),
267                    time_range: FileTimeRange::default(),
268                    level: 0,
269                    file_size: 4096,
270                    max_row_group_uncompressed_size: 4096,
271                    available_indexes: Default::default(),
272                    indexes: Default::default(),
273                    index_file_size: 0,
274                    index_version: 0,
275                    num_rows: 0,
276                    num_row_groups: 0,
277                    sequence: None,
278                    partition_expr: None,
279                    num_series: 0,
280                    ..Default::default()
281                },
282                file_purger,
283            );
284            // mark file as deleted and drop the handle, we expect the file is deleted.
285            handle.mark_deleted();
286        }
287
288        scheduler.stop(true).await.unwrap();
289
290        assert!(!object_store.exists(&path).await.unwrap());
291    }
292
293    #[tokio::test]
294    async fn test_file_purge_with_index() {
295        common_telemetry::init_default_ut_logging();
296
297        let dir = create_temp_dir("file-purge");
298        let dir_path = dir.path().display().to_string();
299        let builder = Fs::default().root(&dir_path);
300        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
301        let index_file_id = RegionIndexId::new(sst_file_id, 0);
302        let sst_dir = "table1";
303
304        let index_aux_path = dir.path().join("index_aux");
305        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
306            .await
307            .unwrap();
308        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
309            .await
310            .unwrap();
311
312        let object_store = ObjectStore::new(builder).unwrap().finish();
313
314        let layer = Arc::new(AccessLayer::new(
315            sst_dir,
316            PathType::Bare,
317            object_store.clone(),
318            puffin_mgr,
319            intm_mgr,
320        ));
321        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
322        object_store.write(&path, vec![0; 4096]).await.unwrap();
323
324        let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
325        object_store
326            .write(&index_path, vec![0; 4096])
327            .await
328            .unwrap();
329
330        let scheduler = Arc::new(LocalScheduler::new(3));
331
332        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
333
334        {
335            let handle = FileHandle::new(
336                FileMeta {
337                    region_id: sst_file_id.region_id(),
338                    file_id: sst_file_id.file_id(),
339                    time_range: FileTimeRange::default(),
340                    level: 0,
341                    file_size: 4096,
342                    max_row_group_uncompressed_size: 4096,
343                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
344                    indexes: vec![ColumnIndexMetadata {
345                        column_id: 0,
346                        created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
347                    }],
348                    index_file_size: 4096,
349                    index_version: 0,
350                    num_rows: 1024,
351                    num_row_groups: 1,
352                    sequence: NonZeroU64::new(4096),
353                    partition_expr: None,
354                    num_series: 0,
355                    ..Default::default()
356                },
357                file_purger,
358            );
359            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
360            handle.mark_deleted();
361        }
362
363        scheduler.stop(true).await.unwrap();
364
365        assert!(!object_store.exists(&path).await.unwrap());
366        assert!(!object_store.exists(&index_path).await.unwrap());
367    }
368}