mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27/// A worker to delete files in background.
28pub trait FilePurger: Send + Sync + fmt::Debug {
29    /// Send a request to remove the file.
30    /// If `is_delete` is true, the file will be deleted from the storage.
31    /// Otherwise, only the reference will be removed.
32    fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
33
34    /// Notify the purger of a new file created.
35    /// This is useful for object store based storage, where we need to track the file references
36    /// The default implementation is a no-op.
37    fn new_file(&self, _: &FileMeta) {
38        // noop
39    }
40}
41
42pub type FilePurgerRef = Arc<dyn FilePurger>;
43
44/// A no-op file purger can be used in combination with reading SST files outside of this region.
45#[derive(Debug)]
46pub struct NoopFilePurger;
47
48impl FilePurger for NoopFilePurger {
49    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
50        // noop
51    }
52}
53
54/// Purger that purges file for current region.
55pub struct LocalFilePurger {
56    scheduler: SchedulerRef,
57    sst_layer: AccessLayerRef,
58    cache_manager: Option<CacheManagerRef>,
59}
60
61impl fmt::Debug for LocalFilePurger {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("LocalFilePurger")
64            .field("sst_layer", &self.sst_layer)
65            .finish()
66    }
67}
68
69pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
70    sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
71}
72
73/// Creates a file purger based on the storage type of the access layer.
74/// Should be use in combination with Gc Worker.
75///
76/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
77/// the files from both the storage and the cache.
78///
79/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
80/// only manages the file references without deleting the actual files.
81///
82pub fn create_file_purger(
83    scheduler: SchedulerRef,
84    sst_layer: AccessLayerRef,
85    cache_manager: Option<CacheManagerRef>,
86    file_ref_manager: FileReferenceManagerRef,
87) -> FilePurgerRef {
88    if is_local_fs(&sst_layer) {
89        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
90    } else {
91        Arc::new(ObjectStoreFilePurger { file_ref_manager })
92    }
93}
94
95/// Creates a local file purger that deletes files from both the storage and the cache.
96pub fn create_local_file_purger(
97    scheduler: SchedulerRef,
98    sst_layer: AccessLayerRef,
99    cache_manager: Option<CacheManagerRef>,
100    _file_ref_manager: FileReferenceManagerRef,
101) -> FilePurgerRef {
102    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
103}
104
105impl LocalFilePurger {
106    /// Creates a new purger.
107    pub fn new(
108        scheduler: SchedulerRef,
109        sst_layer: AccessLayerRef,
110        cache_manager: Option<CacheManagerRef>,
111    ) -> Self {
112        Self {
113            scheduler,
114            sst_layer,
115            cache_manager,
116        }
117    }
118
119    /// Stop the scheduler of the file purger.
120    pub async fn stop_scheduler(&self) -> Result<()> {
121        self.scheduler.stop(true).await
122    }
123
124    /// Deletes the file(and it's index, if any) from cache and storage.
125    fn delete_file(&self, file_meta: FileMeta) {
126        let sst_layer = self.sst_layer.clone();
127        let cache_manager = self.cache_manager.clone();
128        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
129            if let Err(e) = delete_files(
130                file_meta.region_id,
131                &[file_meta.file_id],
132                file_meta.exists_index(),
133                &sst_layer,
134                &cache_manager,
135            )
136            .await
137            {
138                error!(e; "Failed to delete file {:?} from storage", file_meta);
139            }
140        })) {
141            error!(e; "Failed to schedule the file purge request");
142        }
143    }
144}
145
146impl FilePurger for LocalFilePurger {
147    fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
148        if is_delete {
149            self.delete_file(file_meta);
150        }
151    }
152}
153
154#[derive(Debug)]
155pub struct ObjectStoreFilePurger {
156    file_ref_manager: FileReferenceManagerRef,
157}
158
159impl FilePurger for ObjectStoreFilePurger {
160    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
161        // if not on local file system, instead inform the global file purger to remove the file reference.
162        // notice that no matter whether the file is deleted or not, we need to remove the reference
163        // because the file is no longer in use nonetheless.
164        self.file_ref_manager.remove_file(&file_meta);
165    }
166
167    fn new_file(&self, file_meta: &FileMeta) {
168        self.file_ref_manager.add_file(file_meta);
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::num::NonZeroU64;
175
176    use common_test_util::temp_dir::create_temp_dir;
177    use object_store::ObjectStore;
178    use object_store::services::Fs;
179    use smallvec::SmallVec;
180    use store_api::region_request::PathType;
181    use store_api::storage::{FileId, RegionId};
182
183    use super::*;
184    use crate::access_layer::AccessLayer;
185    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
186    use crate::sst::file::{FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId};
187    use crate::sst::index::intermediate::IntermediateManager;
188    use crate::sst::index::puffin_manager::PuffinManagerFactory;
189    use crate::sst::location;
190
191    #[tokio::test]
192    async fn test_file_purge() {
193        common_telemetry::init_default_ut_logging();
194
195        let dir = create_temp_dir("file-purge");
196        let dir_path = dir.path().display().to_string();
197        let builder = Fs::default().root(&dir_path);
198        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
199        let sst_dir = "table1";
200
201        let index_aux_path = dir.path().join("index_aux");
202        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
203            .await
204            .unwrap();
205        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
206            .await
207            .unwrap();
208
209        let object_store = ObjectStore::new(builder).unwrap().finish();
210
211        let layer = Arc::new(AccessLayer::new(
212            sst_dir,
213            PathType::Bare,
214            object_store.clone(),
215            puffin_mgr,
216            intm_mgr,
217        ));
218        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
219        object_store.write(&path, vec![0; 4096]).await.unwrap();
220
221        let scheduler = Arc::new(LocalScheduler::new(3));
222
223        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
224
225        {
226            let handle = FileHandle::new(
227                FileMeta {
228                    region_id: sst_file_id.region_id(),
229                    file_id: sst_file_id.file_id(),
230                    time_range: FileTimeRange::default(),
231                    level: 0,
232                    file_size: 4096,
233                    available_indexes: Default::default(),
234                    index_file_size: 0,
235                    num_rows: 0,
236                    num_row_groups: 0,
237                    sequence: None,
238                    partition_expr: None,
239                },
240                file_purger,
241            );
242            // mark file as deleted and drop the handle, we expect the file is deleted.
243            handle.mark_deleted();
244        }
245
246        scheduler.stop(true).await.unwrap();
247
248        assert!(!object_store.exists(&path).await.unwrap());
249    }
250
251    #[tokio::test]
252    async fn test_file_purge_with_index() {
253        common_telemetry::init_default_ut_logging();
254
255        let dir = create_temp_dir("file-purge");
256        let dir_path = dir.path().display().to_string();
257        let builder = Fs::default().root(&dir_path);
258        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
259        let sst_dir = "table1";
260
261        let index_aux_path = dir.path().join("index_aux");
262        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
263            .await
264            .unwrap();
265        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
266            .await
267            .unwrap();
268
269        let object_store = ObjectStore::new(builder).unwrap().finish();
270
271        let layer = Arc::new(AccessLayer::new(
272            sst_dir,
273            PathType::Bare,
274            object_store.clone(),
275            puffin_mgr,
276            intm_mgr,
277        ));
278        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
279        object_store.write(&path, vec![0; 4096]).await.unwrap();
280
281        let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
282        object_store
283            .write(&index_path, vec![0; 4096])
284            .await
285            .unwrap();
286
287        let scheduler = Arc::new(LocalScheduler::new(3));
288
289        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
290
291        {
292            let handle = FileHandle::new(
293                FileMeta {
294                    region_id: sst_file_id.region_id(),
295                    file_id: sst_file_id.file_id(),
296                    time_range: FileTimeRange::default(),
297                    level: 0,
298                    file_size: 4096,
299                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
300                    index_file_size: 4096,
301                    num_rows: 1024,
302                    num_row_groups: 1,
303                    sequence: NonZeroU64::new(4096),
304                    partition_expr: None,
305                },
306                file_purger,
307            );
308            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
309            handle.mark_deleted();
310        }
311
312        scheduler.stop(true).await.unwrap();
313
314        assert!(!object_store.exists(&path).await.unwrap());
315        assert!(!object_store.exists(&index_path).await.unwrap());
316    }
317}