mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files, delete_index};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27/// A worker to delete files in background.
28pub trait FilePurger: Send + Sync + fmt::Debug {
29    /// Send a request to remove the file.
30    /// If `is_delete` is true, the file will be deleted from the storage.
31    /// Otherwise, only the reference will be removed.
32    /// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`.
33    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
34
35    /// Notify the purger of a new file created.
36    /// This is useful for object store based storage, where we need to track the file references
37    /// The default implementation is a no-op.
38    fn new_file(&self, _: &FileMeta) {
39        // noop
40    }
41}
42
43pub type FilePurgerRef = Arc<dyn FilePurger>;
44
45/// A no-op file purger can be used in combination with reading SST files outside of this region.
46#[derive(Debug)]
47pub struct NoopFilePurger;
48
49impl FilePurger for NoopFilePurger {
50    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
51        // noop
52    }
53}
54
55/// Purger that purges file for current region.
56pub struct LocalFilePurger {
57    scheduler: SchedulerRef,
58    sst_layer: AccessLayerRef,
59    cache_manager: Option<CacheManagerRef>,
60}
61
62impl fmt::Debug for LocalFilePurger {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("LocalFilePurger")
65            .field("sst_layer", &self.sst_layer)
66            .finish()
67    }
68}
69
70pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
71    sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
72}
73
74/// Creates a file purger based on the storage type of the access layer.
75/// Should be use in combination with Gc Worker.
76///
77/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
78/// the files from both the storage and the cache.
79///
80/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
81/// only manages the file references without deleting the actual files.
82///
83pub fn create_file_purger(
84    gc_enabled: bool,
85    scheduler: SchedulerRef,
86    sst_layer: AccessLayerRef,
87    cache_manager: Option<CacheManagerRef>,
88    file_ref_manager: FileReferenceManagerRef,
89) -> FilePurgerRef {
90    if gc_enabled && !is_local_fs(&sst_layer) {
91        Arc::new(ObjectStoreFilePurger { file_ref_manager })
92    } else {
93        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
94    }
95}
96
97/// Creates a local file purger that deletes files from both the storage and the cache.
98pub fn create_local_file_purger(
99    scheduler: SchedulerRef,
100    sst_layer: AccessLayerRef,
101    cache_manager: Option<CacheManagerRef>,
102    _file_ref_manager: FileReferenceManagerRef,
103) -> FilePurgerRef {
104    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
105}
106
107impl LocalFilePurger {
108    /// Creates a new purger.
109    pub fn new(
110        scheduler: SchedulerRef,
111        sst_layer: AccessLayerRef,
112        cache_manager: Option<CacheManagerRef>,
113    ) -> Self {
114        Self {
115            scheduler,
116            sst_layer,
117            cache_manager,
118        }
119    }
120
121    /// Stop the scheduler of the file purger.
122    pub async fn stop_scheduler(&self) -> Result<()> {
123        self.scheduler.stop(true).await
124    }
125
126    /// Deletes the file(and it's index, if any) from cache and storage.
127    fn delete_file(&self, file_meta: FileMeta) {
128        let sst_layer = self.sst_layer.clone();
129        let cache_manager = self.cache_manager.clone();
130        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
131            if let Err(e) = delete_files(
132                file_meta.region_id,
133                &[(file_meta.file_id, file_meta.index_id().version)],
134                file_meta.exists_index(),
135                &sst_layer,
136                &cache_manager,
137            )
138            .await
139            {
140                error!(e; "Failed to delete file {:?} from storage", file_meta);
141            }
142        })) {
143            error!(e; "Failed to schedule the file purge request");
144        }
145    }
146
147    fn delete_index(&self, file_meta: FileMeta) {
148        let sst_layer = self.sst_layer.clone();
149        let cache_manager = self.cache_manager.clone();
150        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
151            let index_id = file_meta.index_id();
152            if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
153                error!(e; "Failed to delete index for file {:?} from storage", file_meta);
154            }
155        })) {
156            error!(e; "Failed to schedule the index purge request");
157        }
158    }
159}
160
161impl FilePurger for LocalFilePurger {
162    fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
163        if is_delete {
164            self.delete_file(file_meta);
165        } else if index_outdated {
166            self.delete_index(file_meta);
167        }
168    }
169}
170
171#[derive(Debug)]
172pub struct ObjectStoreFilePurger {
173    file_ref_manager: FileReferenceManagerRef,
174}
175
176impl FilePurger for ObjectStoreFilePurger {
177    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
178        // if not on local file system, instead inform the global file purger to remove the file reference.
179        // notice that no matter whether the file is deleted or not, we need to remove the reference
180        // because the file is no longer in use nonetheless.
181        self.file_ref_manager.remove_file(&file_meta);
182        // TODO(discord9): consider impl a .tombstone file to reduce files needed to list
183    }
184
185    fn new_file(&self, file_meta: &FileMeta) {
186        self.file_ref_manager.add_file(file_meta);
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use std::num::NonZeroU64;
193
194    use common_test_util::temp_dir::create_temp_dir;
195    use object_store::ObjectStore;
196    use object_store::services::Fs;
197    use smallvec::SmallVec;
198    use store_api::region_request::PathType;
199    use store_api::storage::{FileId, RegionId};
200
201    use super::*;
202    use crate::access_layer::AccessLayer;
203    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
204    use crate::sst::file::{
205        ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
206        RegionIndexId,
207    };
208    use crate::sst::index::intermediate::IntermediateManager;
209    use crate::sst::index::puffin_manager::PuffinManagerFactory;
210    use crate::sst::location;
211
212    #[tokio::test]
213    async fn test_file_purge() {
214        common_telemetry::init_default_ut_logging();
215
216        let dir = create_temp_dir("file-purge");
217        let dir_path = dir.path().display().to_string();
218        let builder = Fs::default().root(&dir_path);
219        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
220        let sst_dir = "table1";
221
222        let index_aux_path = dir.path().join("index_aux");
223        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
224            .await
225            .unwrap();
226        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
227            .await
228            .unwrap();
229
230        let object_store = ObjectStore::new(builder).unwrap().finish();
231
232        let layer = Arc::new(AccessLayer::new(
233            sst_dir,
234            PathType::Bare,
235            object_store.clone(),
236            puffin_mgr,
237            intm_mgr,
238        ));
239        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
240        object_store.write(&path, vec![0; 4096]).await.unwrap();
241
242        let scheduler = Arc::new(LocalScheduler::new(3));
243
244        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
245
246        {
247            let handle = FileHandle::new(
248                FileMeta {
249                    region_id: sst_file_id.region_id(),
250                    file_id: sst_file_id.file_id(),
251                    time_range: FileTimeRange::default(),
252                    level: 0,
253                    file_size: 4096,
254                    max_row_group_uncompressed_size: 4096,
255                    available_indexes: Default::default(),
256                    indexes: Default::default(),
257                    index_file_size: 0,
258                    index_version: 0,
259                    num_rows: 0,
260                    num_row_groups: 0,
261                    sequence: None,
262                    partition_expr: None,
263                    num_series: 0,
264                },
265                file_purger,
266            );
267            // mark file as deleted and drop the handle, we expect the file is deleted.
268            handle.mark_deleted();
269        }
270
271        scheduler.stop(true).await.unwrap();
272
273        assert!(!object_store.exists(&path).await.unwrap());
274    }
275
276    #[tokio::test]
277    async fn test_file_purge_with_index() {
278        common_telemetry::init_default_ut_logging();
279
280        let dir = create_temp_dir("file-purge");
281        let dir_path = dir.path().display().to_string();
282        let builder = Fs::default().root(&dir_path);
283        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
284        let index_file_id = RegionIndexId::new(sst_file_id, 0);
285        let sst_dir = "table1";
286
287        let index_aux_path = dir.path().join("index_aux");
288        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
289            .await
290            .unwrap();
291        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
292            .await
293            .unwrap();
294
295        let object_store = ObjectStore::new(builder).unwrap().finish();
296
297        let layer = Arc::new(AccessLayer::new(
298            sst_dir,
299            PathType::Bare,
300            object_store.clone(),
301            puffin_mgr,
302            intm_mgr,
303        ));
304        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
305        object_store.write(&path, vec![0; 4096]).await.unwrap();
306
307        let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
308        object_store
309            .write(&index_path, vec![0; 4096])
310            .await
311            .unwrap();
312
313        let scheduler = Arc::new(LocalScheduler::new(3));
314
315        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
316
317        {
318            let handle = FileHandle::new(
319                FileMeta {
320                    region_id: sst_file_id.region_id(),
321                    file_id: sst_file_id.file_id(),
322                    time_range: FileTimeRange::default(),
323                    level: 0,
324                    file_size: 4096,
325                    max_row_group_uncompressed_size: 4096,
326                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
327                    indexes: vec![ColumnIndexMetadata {
328                        column_id: 0,
329                        created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
330                    }],
331                    index_file_size: 4096,
332                    index_version: 0,
333                    num_rows: 1024,
334                    num_row_groups: 1,
335                    sequence: NonZeroU64::new(4096),
336                    partition_expr: None,
337                    num_series: 0,
338                },
339                file_purger,
340            );
341            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
342            handle.mark_deleted();
343        }
344
345        scheduler.stop(true).await.unwrap();
346
347        assert!(!object_store.exists(&path).await.unwrap());
348        assert!(!object_store.exists(&index_path).await.unwrap());
349    }
350}