mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26/// Request to remove a file.
27#[derive(Debug)]
28pub struct PurgeRequest {
29    /// File meta.
30    pub file_meta: FileMeta,
31}
32
33/// A worker to delete files in background.
34pub trait FilePurger: Send + Sync + fmt::Debug {
35    /// Send a purge request to the background worker.
36    fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41/// A no-op file purger can be used in combination with reading SST files outside of this region.
42#[derive(Debug)]
43pub struct NoopFilePurger;
44
45impl FilePurger for NoopFilePurger {
46    fn send_request(&self, _: PurgeRequest) {
47        // noop
48    }
49}
50
51/// Purger that purges file for current region.
52pub struct LocalFilePurger {
53    scheduler: SchedulerRef,
54    sst_layer: AccessLayerRef,
55    cache_manager: Option<CacheManagerRef>,
56}
57
58impl fmt::Debug for LocalFilePurger {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("LocalFilePurger")
61            .field("sst_layer", &self.sst_layer)
62            .finish()
63    }
64}
65
66impl LocalFilePurger {
67    /// Creates a new purger.
68    pub fn new(
69        scheduler: SchedulerRef,
70        sst_layer: AccessLayerRef,
71        cache_manager: Option<CacheManagerRef>,
72    ) -> Self {
73        Self {
74            scheduler,
75            sst_layer,
76            cache_manager,
77        }
78    }
79}
80
81impl FilePurger for LocalFilePurger {
82    fn send_request(&self, request: PurgeRequest) {
83        let file_meta = request.file_meta;
84        let sst_layer = self.sst_layer.clone();
85
86        // Remove meta of the file from cache.
87        if let Some(cache) = &self.cache_manager {
88            cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
89        }
90
91        let cache_manager = self.cache_manager.clone();
92        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
93            if let Err(e) = sst_layer.delete_sst(&file_meta).await {
94                error!(e; "Failed to delete SST file, file_id: {}, region: {}",
95                    file_meta.file_id, file_meta.region_id);
96            } else {
97                info!(
98                    "Successfully deleted SST file, file_id: {}, region: {}",
99                    file_meta.file_id, file_meta.region_id
100                );
101            }
102
103            if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
104            {
105                // Removes index file from the cache.
106                if file_meta.exists_index() {
107                    write_cache
108                        .remove(IndexKey::new(
109                            file_meta.region_id,
110                            file_meta.file_id,
111                            FileType::Puffin,
112                        ))
113                        .await;
114                }
115                // Remove the SST file from the cache.
116                write_cache
117                    .remove(IndexKey::new(
118                        file_meta.region_id,
119                        file_meta.file_id,
120                        FileType::Parquet,
121                    ))
122                    .await;
123            }
124
125            // Purges index content in the stager.
126            if let Err(e) = sst_layer
127                .puffin_manager_factory()
128                .purge_stager(file_meta.file_id)
129                .await
130            {
131                error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
132                    file_meta.file_id, file_meta.region_id);
133            }
134        })) {
135            error!(e; "Failed to schedule the file purge request");
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use std::num::NonZeroU64;
143
144    use common_test_util::temp_dir::create_temp_dir;
145    use object_store::services::Fs;
146    use object_store::ObjectStore;
147    use smallvec::SmallVec;
148
149    use super::*;
150    use crate::access_layer::AccessLayer;
151    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
152    use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
153    use crate::sst::index::intermediate::IntermediateManager;
154    use crate::sst::index::puffin_manager::PuffinManagerFactory;
155    use crate::sst::location;
156
157    #[tokio::test]
158    async fn test_file_purge() {
159        common_telemetry::init_default_ut_logging();
160
161        let dir = create_temp_dir("file-purge");
162        let dir_path = dir.path().display().to_string();
163        let builder = Fs::default().root(&dir_path);
164        let sst_file_id = FileId::random();
165        let sst_dir = "table1";
166        let path = location::sst_file_path(sst_dir, sst_file_id);
167
168        let index_aux_path = dir.path().join("index_aux");
169        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
170            .await
171            .unwrap();
172        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
173            .await
174            .unwrap();
175
176        let object_store = ObjectStore::new(builder).unwrap().finish();
177        object_store.write(&path, vec![0; 4096]).await.unwrap();
178
179        let scheduler = Arc::new(LocalScheduler::new(3));
180        let layer = Arc::new(AccessLayer::new(
181            sst_dir,
182            object_store.clone(),
183            puffin_mgr,
184            intm_mgr,
185        ));
186
187        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
188
189        {
190            let handle = FileHandle::new(
191                FileMeta {
192                    region_id: 0.into(),
193                    file_id: sst_file_id,
194                    time_range: FileTimeRange::default(),
195                    level: 0,
196                    file_size: 4096,
197                    available_indexes: Default::default(),
198                    index_file_size: 0,
199                    num_rows: 0,
200                    num_row_groups: 0,
201                    sequence: None,
202                },
203                file_purger,
204            );
205            // mark file as deleted and drop the handle, we expect the file is deleted.
206            handle.mark_deleted();
207        }
208
209        scheduler.stop(true).await.unwrap();
210
211        assert!(!object_store.exists(&path).await.unwrap());
212    }
213
214    #[tokio::test]
215    async fn test_file_purge_with_index() {
216        common_telemetry::init_default_ut_logging();
217
218        let dir = create_temp_dir("file-purge");
219        let dir_path = dir.path().display().to_string();
220        let builder = Fs::default().root(&dir_path);
221        let sst_file_id = FileId::random();
222        let sst_dir = "table1";
223
224        let index_aux_path = dir.path().join("index_aux");
225        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
226            .await
227            .unwrap();
228        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
229            .await
230            .unwrap();
231
232        let path = location::sst_file_path(sst_dir, sst_file_id);
233        let object_store = ObjectStore::new(builder).unwrap().finish();
234        object_store.write(&path, vec![0; 4096]).await.unwrap();
235
236        let index_path = location::index_file_path(sst_dir, sst_file_id);
237        object_store
238            .write(&index_path, vec![0; 4096])
239            .await
240            .unwrap();
241
242        let scheduler = Arc::new(LocalScheduler::new(3));
243        let layer = Arc::new(AccessLayer::new(
244            sst_dir,
245            object_store.clone(),
246            puffin_mgr,
247            intm_mgr,
248        ));
249
250        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
251
252        {
253            let handle = FileHandle::new(
254                FileMeta {
255                    region_id: 0.into(),
256                    file_id: sst_file_id,
257                    time_range: FileTimeRange::default(),
258                    level: 0,
259                    file_size: 4096,
260                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
261                    index_file_size: 4096,
262                    num_rows: 1024,
263                    num_row_groups: 1,
264                    sequence: NonZeroU64::new(4096),
265                },
266                file_purger,
267            );
268            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
269            handle.mark_deleted();
270        }
271
272        scheduler.stop(true).await.unwrap();
273
274        assert!(!object_store.exists(&path).await.unwrap());
275        assert!(!object_store.exists(&index_path).await.unwrap());
276    }
277}