mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26/// Request to remove a file.
27#[derive(Debug)]
28pub struct PurgeRequest {
29    /// File meta.
30    pub file_meta: FileMeta,
31}
32
33/// A worker to delete files in background.
34pub trait FilePurger: Send + Sync + fmt::Debug {
35    /// Send a purge request to the background worker.
36    fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41/// Purger that purges file for current region.
42pub struct LocalFilePurger {
43    scheduler: SchedulerRef,
44    sst_layer: AccessLayerRef,
45    cache_manager: Option<CacheManagerRef>,
46}
47
48impl fmt::Debug for LocalFilePurger {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        f.debug_struct("LocalFilePurger")
51            .field("sst_layer", &self.sst_layer)
52            .finish()
53    }
54}
55
56impl LocalFilePurger {
57    /// Creates a new purger.
58    pub fn new(
59        scheduler: SchedulerRef,
60        sst_layer: AccessLayerRef,
61        cache_manager: Option<CacheManagerRef>,
62    ) -> Self {
63        Self {
64            scheduler,
65            sst_layer,
66            cache_manager,
67        }
68    }
69}
70
71impl FilePurger for LocalFilePurger {
72    fn send_request(&self, request: PurgeRequest) {
73        let file_meta = request.file_meta;
74        let sst_layer = self.sst_layer.clone();
75
76        // Remove meta of the file from cache.
77        if let Some(cache) = &self.cache_manager {
78            cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
79        }
80
81        let cache_manager = self.cache_manager.clone();
82        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
83            if let Err(e) = sst_layer.delete_sst(&file_meta).await {
84                error!(e; "Failed to delete SST file, file_id: {}, region: {}",
85                    file_meta.file_id, file_meta.region_id);
86            } else {
87                info!(
88                    "Successfully deleted SST file, file_id: {}, region: {}",
89                    file_meta.file_id, file_meta.region_id
90                );
91            }
92
93            if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
94            {
95                // Removes index file from the cache.
96                if file_meta.exists_index() {
97                    write_cache
98                        .remove(IndexKey::new(
99                            file_meta.region_id,
100                            file_meta.file_id,
101                            FileType::Puffin,
102                        ))
103                        .await;
104                }
105                // Remove the SST file from the cache.
106                write_cache
107                    .remove(IndexKey::new(
108                        file_meta.region_id,
109                        file_meta.file_id,
110                        FileType::Parquet,
111                    ))
112                    .await;
113            }
114
115            // Purges index content in the stager.
116            if let Err(e) = sst_layer
117                .puffin_manager_factory()
118                .purge_stager(file_meta.file_id)
119                .await
120            {
121                error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
122                    file_meta.file_id, file_meta.region_id);
123            }
124        })) {
125            error!(e; "Failed to schedule the file purge request");
126        }
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use std::num::NonZeroU64;
133
134    use common_test_util::temp_dir::create_temp_dir;
135    use object_store::services::Fs;
136    use object_store::ObjectStore;
137    use smallvec::SmallVec;
138
139    use super::*;
140    use crate::access_layer::AccessLayer;
141    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
142    use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
143    use crate::sst::index::intermediate::IntermediateManager;
144    use crate::sst::index::puffin_manager::PuffinManagerFactory;
145    use crate::sst::location;
146
147    #[tokio::test]
148    async fn test_file_purge() {
149        common_telemetry::init_default_ut_logging();
150
151        let dir = create_temp_dir("file-purge");
152        let dir_path = dir.path().display().to_string();
153        let builder = Fs::default().root(&dir_path);
154        let sst_file_id = FileId::random();
155        let sst_dir = "table1";
156        let path = location::sst_file_path(sst_dir, sst_file_id);
157
158        let index_aux_path = dir.path().join("index_aux");
159        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
160            .await
161            .unwrap();
162        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
163            .await
164            .unwrap();
165
166        let object_store = ObjectStore::new(builder).unwrap().finish();
167        object_store.write(&path, vec![0; 4096]).await.unwrap();
168
169        let scheduler = Arc::new(LocalScheduler::new(3));
170        let layer = Arc::new(AccessLayer::new(
171            sst_dir,
172            object_store.clone(),
173            puffin_mgr,
174            intm_mgr,
175        ));
176
177        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
178
179        {
180            let handle = FileHandle::new(
181                FileMeta {
182                    region_id: 0.into(),
183                    file_id: sst_file_id,
184                    time_range: FileTimeRange::default(),
185                    level: 0,
186                    file_size: 4096,
187                    available_indexes: Default::default(),
188                    index_file_size: 0,
189                    num_rows: 0,
190                    num_row_groups: 0,
191                    sequence: None,
192                },
193                file_purger,
194            );
195            // mark file as deleted and drop the handle, we expect the file is deleted.
196            handle.mark_deleted();
197        }
198
199        scheduler.stop(true).await.unwrap();
200
201        assert!(!object_store.exists(&path).await.unwrap());
202    }
203
204    #[tokio::test]
205    async fn test_file_purge_with_index() {
206        common_telemetry::init_default_ut_logging();
207
208        let dir = create_temp_dir("file-purge");
209        let dir_path = dir.path().display().to_string();
210        let builder = Fs::default().root(&dir_path);
211        let sst_file_id = FileId::random();
212        let sst_dir = "table1";
213
214        let index_aux_path = dir.path().join("index_aux");
215        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
216            .await
217            .unwrap();
218        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
219            .await
220            .unwrap();
221
222        let path = location::sst_file_path(sst_dir, sst_file_id);
223        let object_store = ObjectStore::new(builder).unwrap().finish();
224        object_store.write(&path, vec![0; 4096]).await.unwrap();
225
226        let index_path = location::index_file_path(sst_dir, sst_file_id);
227        object_store
228            .write(&index_path, vec![0; 4096])
229            .await
230            .unwrap();
231
232        let scheduler = Arc::new(LocalScheduler::new(3));
233        let layer = Arc::new(AccessLayer::new(
234            sst_dir,
235            object_store.clone(),
236            puffin_mgr,
237            intm_mgr,
238        ));
239
240        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
241
242        {
243            let handle = FileHandle::new(
244                FileMeta {
245                    region_id: 0.into(),
246                    file_id: sst_file_id,
247                    time_range: FileTimeRange::default(),
248                    level: 0,
249                    file_size: 4096,
250                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
251                    index_file_size: 4096,
252                    num_rows: 1024,
253                    num_row_groups: 1,
254                    sequence: NonZeroU64::new(4096),
255                },
256                file_purger,
257            );
258            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
259            handle.mark_deleted();
260        }
261
262        scheduler.stop(true).await.unwrap();
263
264        assert!(!object_store.exists(&path).await.unwrap());
265        assert!(!object_store.exists(&index_path).await.unwrap());
266    }
267}