mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::FileMeta;
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28/// A worker to delete files in background.
29pub trait FilePurger: Send + Sync + fmt::Debug {
30    /// Send a request to remove the file.
31    /// If `is_delete` is true, the file will be deleted from the storage.
32    /// Otherwise, only the reference will be removed.
33    fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
34
35    /// Notify the purger of a new file created.
36    /// This is useful for object store based storage, where we need to track the file references
37    /// The default implementation is a no-op.
38    fn new_file(&self, _: &FileMeta) {
39        // noop
40    }
41}
42
43pub type FilePurgerRef = Arc<dyn FilePurger>;
44
45/// A no-op file purger can be used in combination with reading SST files outside of this region.
46#[derive(Debug)]
47pub struct NoopFilePurger;
48
49impl FilePurger for NoopFilePurger {
50    fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
51        // noop
52    }
53}
54
55/// Purger that purges file for current region.
56pub struct LocalFilePurger {
57    scheduler: SchedulerRef,
58    sst_layer: AccessLayerRef,
59    cache_manager: Option<CacheManagerRef>,
60}
61
62impl fmt::Debug for LocalFilePurger {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("LocalFilePurger")
65            .field("sst_layer", &self.sst_layer)
66            .finish()
67    }
68}
69
70pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
71    sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
72}
73
74/// Creates a file purger based on the storage type of the access layer.
75/// Should be use in combination with Gc Worker.
76///
77/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
78/// the files from both the storage and the cache.
79///
80/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
81/// only manages the file references without deleting the actual files.
82///
83pub fn create_file_purger(
84    scheduler: SchedulerRef,
85    sst_layer: AccessLayerRef,
86    cache_manager: Option<CacheManagerRef>,
87    file_ref_manager: FileReferenceManagerRef,
88) -> FilePurgerRef {
89    if is_local_fs(&sst_layer) {
90        Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
91    } else {
92        Arc::new(ObjectStoreFilePurger { file_ref_manager })
93    }
94}
95
96/// Creates a local file purger that deletes files from both the storage and the cache.
97pub fn create_local_file_purger(
98    scheduler: SchedulerRef,
99    sst_layer: AccessLayerRef,
100    cache_manager: Option<CacheManagerRef>,
101    _file_ref_manager: FileReferenceManagerRef,
102) -> FilePurgerRef {
103    Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
104}
105
106impl LocalFilePurger {
107    /// Creates a new purger.
108    pub fn new(
109        scheduler: SchedulerRef,
110        sst_layer: AccessLayerRef,
111        cache_manager: Option<CacheManagerRef>,
112    ) -> Self {
113        Self {
114            scheduler,
115            sst_layer,
116            cache_manager,
117        }
118    }
119
120    /// Stop the scheduler of the file purger.
121    pub async fn stop_scheduler(&self) -> Result<()> {
122        self.scheduler.stop(true).await
123    }
124
125    /// Deletes the file(and it's index, if any) from cache and storage.
126    fn delete_file(&self, file_meta: FileMeta) {
127        let sst_layer = self.sst_layer.clone();
128
129        // Remove meta of the file from cache.
130        if let Some(cache) = &self.cache_manager {
131            cache.remove_parquet_meta_data(file_meta.file_id());
132        }
133
134        let cache_manager = self.cache_manager.clone();
135        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
136            if let Err(e) = sst_layer.delete_sst(&file_meta.file_id()).await {
137                error!(e; "Failed to delete SST file, file_id: {}, region: {}",
138                    file_meta.file_id, file_meta.region_id);
139            } else {
140                info!(
141                    "Successfully deleted SST file, file_id: {}, region: {}",
142                    file_meta.file_id, file_meta.region_id
143                );
144            }
145
146            if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
147            {
148                // Removes index file from the cache.
149                if file_meta.exists_index() {
150                    write_cache
151                        .remove(IndexKey::new(
152                            file_meta.region_id,
153                            file_meta.file_id,
154                            FileType::Puffin,
155                        ))
156                        .await;
157                }
158                // Remove the SST file from the cache.
159                write_cache
160                    .remove(IndexKey::new(
161                        file_meta.region_id,
162                        file_meta.file_id,
163                        FileType::Parquet,
164                    ))
165                    .await;
166            }
167
168            // Purges index content in the stager.
169            if let Err(e) = sst_layer
170                .puffin_manager_factory()
171                .purge_stager(file_meta.file_id())
172                .await
173            {
174                error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
175                    file_meta.file_id(), file_meta.region_id);
176            }
177            let file_id = file_meta.file_id();
178            if let Err(e) = sst_layer
179                .intermediate_manager()
180                .prune_sst_dir(&file_id.region_id(), &file_id.file_id())
181                .await
182            {
183                error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id());
184            }
185        })) {
186            error!(e; "Failed to schedule the file purge request");
187        }
188    }
189}
190
191impl FilePurger for LocalFilePurger {
192    fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
193        if is_delete {
194            self.delete_file(file_meta);
195        }
196    }
197}
198
199#[derive(Debug)]
200pub struct ObjectStoreFilePurger {
201    file_ref_manager: FileReferenceManagerRef,
202}
203
204impl FilePurger for ObjectStoreFilePurger {
205    fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
206        // if not on local file system, instead inform the global file purger to remove the file reference.
207        // notice that no matter whether the file is deleted or not, we need to remove the reference
208        // because the file is no longer in use nonetheless.
209        self.file_ref_manager.remove_file(&file_meta);
210    }
211
212    fn new_file(&self, file_meta: &FileMeta) {
213        self.file_ref_manager.add_file(file_meta);
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::num::NonZeroU64;
220
221    use common_test_util::temp_dir::create_temp_dir;
222    use object_store::services::Fs;
223    use object_store::ObjectStore;
224    use smallvec::SmallVec;
225    use store_api::region_request::PathType;
226    use store_api::storage::RegionId;
227
228    use super::*;
229    use crate::access_layer::AccessLayer;
230    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
231    use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
232    use crate::sst::index::intermediate::IntermediateManager;
233    use crate::sst::index::puffin_manager::PuffinManagerFactory;
234    use crate::sst::location;
235
236    #[tokio::test]
237    async fn test_file_purge() {
238        common_telemetry::init_default_ut_logging();
239
240        let dir = create_temp_dir("file-purge");
241        let dir_path = dir.path().display().to_string();
242        let builder = Fs::default().root(&dir_path);
243        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
244        let sst_dir = "table1";
245
246        let index_aux_path = dir.path().join("index_aux");
247        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
248            .await
249            .unwrap();
250        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
251            .await
252            .unwrap();
253
254        let object_store = ObjectStore::new(builder).unwrap().finish();
255
256        let layer = Arc::new(AccessLayer::new(
257            sst_dir,
258            PathType::Bare,
259            object_store.clone(),
260            puffin_mgr,
261            intm_mgr,
262        ));
263        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
264        object_store.write(&path, vec![0; 4096]).await.unwrap();
265
266        let scheduler = Arc::new(LocalScheduler::new(3));
267
268        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
269
270        {
271            let handle = FileHandle::new(
272                FileMeta {
273                    region_id: sst_file_id.region_id(),
274                    file_id: sst_file_id.file_id(),
275                    time_range: FileTimeRange::default(),
276                    level: 0,
277                    file_size: 4096,
278                    available_indexes: Default::default(),
279                    index_file_size: 0,
280                    num_rows: 0,
281                    num_row_groups: 0,
282                    sequence: None,
283                },
284                file_purger,
285            );
286            // mark file as deleted and drop the handle, we expect the file is deleted.
287            handle.mark_deleted();
288        }
289
290        scheduler.stop(true).await.unwrap();
291
292        assert!(!object_store.exists(&path).await.unwrap());
293    }
294
295    #[tokio::test]
296    async fn test_file_purge_with_index() {
297        common_telemetry::init_default_ut_logging();
298
299        let dir = create_temp_dir("file-purge");
300        let dir_path = dir.path().display().to_string();
301        let builder = Fs::default().root(&dir_path);
302        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
303        let sst_dir = "table1";
304
305        let index_aux_path = dir.path().join("index_aux");
306        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
307            .await
308            .unwrap();
309        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
310            .await
311            .unwrap();
312
313        let object_store = ObjectStore::new(builder).unwrap().finish();
314
315        let layer = Arc::new(AccessLayer::new(
316            sst_dir,
317            PathType::Bare,
318            object_store.clone(),
319            puffin_mgr,
320            intm_mgr,
321        ));
322        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
323        object_store.write(&path, vec![0; 4096]).await.unwrap();
324
325        let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
326        object_store
327            .write(&index_path, vec![0; 4096])
328            .await
329            .unwrap();
330
331        let scheduler = Arc::new(LocalScheduler::new(3));
332
333        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
334
335        {
336            let handle = FileHandle::new(
337                FileMeta {
338                    region_id: sst_file_id.region_id(),
339                    file_id: sst_file_id.file_id(),
340                    time_range: FileTimeRange::default(),
341                    level: 0,
342                    file_size: 4096,
343                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
344                    index_file_size: 4096,
345                    num_rows: 1024,
346                    num_row_groups: 1,
347                    sequence: NonZeroU64::new(4096),
348                },
349                file_purger,
350            );
351            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
352            handle.mark_deleted();
353        }
354
355        scheduler.stop(true).await.unwrap();
356
357        assert!(!object_store.exists(&path).await.unwrap());
358        assert!(!object_store.exists(&index_path).await.unwrap());
359    }
360}