mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27/// A worker to delete files in background.
28pub trait FilePurger: Send + Sync + fmt::Debug {
29    /// Send a request to remove the file.
30    /// If `is_delete` is true, the file will be deleted from the storage.
31    /// Otherwise, only the reference will be removed.
32    fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
33
34    /// Notify the purger of a new file created.
35    /// This is useful for object store based storage, where we need to track the file references
36    /// The default implementation is a no-op.
37    fn new_file(&self, _: &FileMeta) {
38        // noop
39    }
40}
41
42pub type FilePurgerRef = Arc<dyn FilePurger>;
43
44/// A no-op file purger can be used in combination with reading SST files outside of this region.
45#[derive(Debug)]
46pub struct NoopFilePurger;
47
48impl FilePurger for NoopFilePurger {
49    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
50        // noop
51    }
52}
53
54/// Purger that purges file for current region.
55pub struct LocalFilePurger {
56    scheduler: SchedulerRef,
57    sst_layer: AccessLayerRef,
58    cache_manager: Option<CacheManagerRef>,
59}
60
61impl fmt::Debug for LocalFilePurger {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("LocalFilePurger")
64            .field("sst_layer", &self.sst_layer)
65            .finish()
66    }
67}
68
69pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
70    sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
71}
72
73/// Creates a file purger based on the storage type of the access layer.
74/// Should be use in combination with Gc Worker.
75///
76/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
77/// the files from both the storage and the cache.
78///
79/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
80/// only manages the file references without deleting the actual files.
81///
82pub fn create_file_purger(
83    gc_enabled: bool,
84    scheduler: SchedulerRef,
85    sst_layer: AccessLayerRef,
86    cache_manager: Option<CacheManagerRef>,
87    file_ref_manager: FileReferenceManagerRef,
88) -> FilePurgerRef {
89    if gc_enabled && !is_local_fs(&sst_layer) {
90        Arc::new(ObjectStoreFilePurger { file_ref_manager })
91    } else {
92        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
93    }
94}
95
96/// Creates a local file purger that deletes files from both the storage and the cache.
97pub fn create_local_file_purger(
98    scheduler: SchedulerRef,
99    sst_layer: AccessLayerRef,
100    cache_manager: Option<CacheManagerRef>,
101    _file_ref_manager: FileReferenceManagerRef,
102) -> FilePurgerRef {
103    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
104}
105
106impl LocalFilePurger {
107    /// Creates a new purger.
108    pub fn new(
109        scheduler: SchedulerRef,
110        sst_layer: AccessLayerRef,
111        cache_manager: Option<CacheManagerRef>,
112    ) -> Self {
113        Self {
114            scheduler,
115            sst_layer,
116            cache_manager,
117        }
118    }
119
120    /// Stop the scheduler of the file purger.
121    pub async fn stop_scheduler(&self) -> Result<()> {
122        self.scheduler.stop(true).await
123    }
124
125    /// Deletes the file(and it's index, if any) from cache and storage.
126    fn delete_file(&self, file_meta: FileMeta) {
127        let sst_layer = self.sst_layer.clone();
128        let cache_manager = self.cache_manager.clone();
129        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
130            if let Err(e) = delete_files(
131                file_meta.region_id,
132                &[(file_meta.file_id, file_meta.index_file_id().file_id())],
133                file_meta.exists_index(),
134                &sst_layer,
135                &cache_manager,
136            )
137            .await
138            {
139                error!(e; "Failed to delete file {:?} from storage", file_meta);
140            }
141        })) {
142            error!(e; "Failed to schedule the file purge request");
143        }
144    }
145}
146
147impl FilePurger for LocalFilePurger {
148    fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
149        if is_delete {
150            self.delete_file(file_meta);
151        }
152    }
153}
154
155#[derive(Debug)]
156pub struct ObjectStoreFilePurger {
157    file_ref_manager: FileReferenceManagerRef,
158}
159
160impl FilePurger for ObjectStoreFilePurger {
161    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
162        // if not on local file system, instead inform the global file purger to remove the file reference.
163        // notice that no matter whether the file is deleted or not, we need to remove the reference
164        // because the file is no longer in use nonetheless.
165        self.file_ref_manager.remove_file(&file_meta);
166        // TODO(discord9): consider impl a .tombstone file to reduce files needed to list
167    }
168
169    fn new_file(&self, file_meta: &FileMeta) {
170        self.file_ref_manager.add_file(file_meta);
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::num::NonZeroU64;
177
178    use common_test_util::temp_dir::create_temp_dir;
179    use object_store::ObjectStore;
180    use object_store::services::Fs;
181    use smallvec::SmallVec;
182    use store_api::region_request::PathType;
183    use store_api::storage::{FileId, RegionId};
184
185    use super::*;
186    use crate::access_layer::AccessLayer;
187    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
188    use crate::sst::file::{FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId};
189    use crate::sst::index::intermediate::IntermediateManager;
190    use crate::sst::index::puffin_manager::PuffinManagerFactory;
191    use crate::sst::location;
192
193    #[tokio::test]
194    async fn test_file_purge() {
195        common_telemetry::init_default_ut_logging();
196
197        let dir = create_temp_dir("file-purge");
198        let dir_path = dir.path().display().to_string();
199        let builder = Fs::default().root(&dir_path);
200        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
201        let sst_dir = "table1";
202
203        let index_aux_path = dir.path().join("index_aux");
204        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
205            .await
206            .unwrap();
207        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
208            .await
209            .unwrap();
210
211        let object_store = ObjectStore::new(builder).unwrap().finish();
212
213        let layer = Arc::new(AccessLayer::new(
214            sst_dir,
215            PathType::Bare,
216            object_store.clone(),
217            puffin_mgr,
218            intm_mgr,
219        ));
220        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
221        object_store.write(&path, vec![0; 4096]).await.unwrap();
222
223        let scheduler = Arc::new(LocalScheduler::new(3));
224
225        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
226
227        {
228            let handle = FileHandle::new(
229                FileMeta {
230                    region_id: sst_file_id.region_id(),
231                    file_id: sst_file_id.file_id(),
232                    time_range: FileTimeRange::default(),
233                    level: 0,
234                    file_size: 4096,
235                    available_indexes: Default::default(),
236                    index_file_size: 0,
237                    index_file_id: None,
238                    num_rows: 0,
239                    num_row_groups: 0,
240                    sequence: None,
241                    partition_expr: None,
242                    num_series: 0,
243                },
244                file_purger,
245            );
246            // mark file as deleted and drop the handle, we expect the file is deleted.
247            handle.mark_deleted();
248        }
249
250        scheduler.stop(true).await.unwrap();
251
252        assert!(!object_store.exists(&path).await.unwrap());
253    }
254
255    #[tokio::test]
256    async fn test_file_purge_with_index() {
257        common_telemetry::init_default_ut_logging();
258
259        let dir = create_temp_dir("file-purge");
260        let dir_path = dir.path().display().to_string();
261        let builder = Fs::default().root(&dir_path);
262        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
263        let sst_dir = "table1";
264
265        let index_aux_path = dir.path().join("index_aux");
266        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
267            .await
268            .unwrap();
269        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
270            .await
271            .unwrap();
272
273        let object_store = ObjectStore::new(builder).unwrap().finish();
274
275        let layer = Arc::new(AccessLayer::new(
276            sst_dir,
277            PathType::Bare,
278            object_store.clone(),
279            puffin_mgr,
280            intm_mgr,
281        ));
282        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
283        object_store.write(&path, vec![0; 4096]).await.unwrap();
284
285        let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
286        object_store
287            .write(&index_path, vec![0; 4096])
288            .await
289            .unwrap();
290
291        let scheduler = Arc::new(LocalScheduler::new(3));
292
293        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
294
295        {
296            let handle = FileHandle::new(
297                FileMeta {
298                    region_id: sst_file_id.region_id(),
299                    file_id: sst_file_id.file_id(),
300                    time_range: FileTimeRange::default(),
301                    level: 0,
302                    file_size: 4096,
303                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
304                    index_file_size: 4096,
305                    index_file_id: None,
306                    num_rows: 1024,
307                    num_row_groups: 1,
308                    sequence: NonZeroU64::new(4096),
309                    partition_expr: None,
310                    num_series: 0,
311                },
312                file_purger,
313            );
314            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
315            handle.mark_deleted();
316        }
317
318        scheduler.stop(true).await.unwrap();
319
320        assert!(!object_store.exists(&path).await.unwrap());
321        assert!(!object_store.exists(&index_path).await.unwrap());
322    }
323}