mito2/sst/
file_purger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26/// Request to remove a file.
27#[derive(Debug)]
28pub struct PurgeRequest {
29    /// File meta.
30    pub file_meta: FileMeta,
31}
32
33/// A worker to delete files in background.
34pub trait FilePurger: Send + Sync + fmt::Debug {
35    /// Send a purge request to the background worker.
36    fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41/// A no-op file purger can be used in combination with reading SST files outside of this region.
42#[derive(Debug)]
43pub struct NoopFilePurger;
44
45impl FilePurger for NoopFilePurger {
46    fn send_request(&self, _: PurgeRequest) {
47        // noop
48    }
49}
50
51/// Purger that purges file for current region.
52pub struct LocalFilePurger {
53    scheduler: SchedulerRef,
54    sst_layer: AccessLayerRef,
55    cache_manager: Option<CacheManagerRef>,
56}
57
58impl fmt::Debug for LocalFilePurger {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("LocalFilePurger")
61            .field("sst_layer", &self.sst_layer)
62            .finish()
63    }
64}
65
66impl LocalFilePurger {
67    /// Creates a new purger.
68    pub fn new(
69        scheduler: SchedulerRef,
70        sst_layer: AccessLayerRef,
71        cache_manager: Option<CacheManagerRef>,
72    ) -> Self {
73        Self {
74            scheduler,
75            sst_layer,
76            cache_manager,
77        }
78    }
79}
80
81impl FilePurger for LocalFilePurger {
82    fn send_request(&self, request: PurgeRequest) {
83        let file_meta = request.file_meta;
84        let sst_layer = self.sst_layer.clone();
85
86        // Remove meta of the file from cache.
87        if let Some(cache) = &self.cache_manager {
88            cache.remove_parquet_meta_data(file_meta.file_id());
89        }
90
91        let cache_manager = self.cache_manager.clone();
92        if let Err(e) = self.scheduler.schedule(Box::pin(async move {
93            if let Err(e) = sst_layer.delete_sst(&file_meta).await {
94                error!(e; "Failed to delete SST file, file_id: {}, region: {}",
95                    file_meta.file_id, file_meta.region_id);
96            } else {
97                info!(
98                    "Successfully deleted SST file, file_id: {}, region: {}",
99                    file_meta.file_id, file_meta.region_id
100                );
101            }
102
103            if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
104            {
105                // Removes index file from the cache.
106                if file_meta.exists_index() {
107                    write_cache
108                        .remove(IndexKey::new(
109                            file_meta.region_id,
110                            file_meta.file_id,
111                            FileType::Puffin,
112                        ))
113                        .await;
114                }
115                // Remove the SST file from the cache.
116                write_cache
117                    .remove(IndexKey::new(
118                        file_meta.region_id,
119                        file_meta.file_id,
120                        FileType::Parquet,
121                    ))
122                    .await;
123            }
124
125            // Purges index content in the stager.
126            if let Err(e) = sst_layer
127                .puffin_manager_factory()
128                .purge_stager(file_meta.file_id())
129                .await
130            {
131                error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
132                    file_meta.file_id(), file_meta.region_id);
133            }
134        })) {
135            error!(e; "Failed to schedule the file purge request");
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use std::num::NonZeroU64;
143
144    use common_test_util::temp_dir::create_temp_dir;
145    use object_store::services::Fs;
146    use object_store::ObjectStore;
147    use smallvec::SmallVec;
148    use store_api::region_request::PathType;
149    use store_api::storage::RegionId;
150
151    use super::*;
152    use crate::access_layer::AccessLayer;
153    use crate::schedule::scheduler::{LocalScheduler, Scheduler};
154    use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
155    use crate::sst::index::intermediate::IntermediateManager;
156    use crate::sst::index::puffin_manager::PuffinManagerFactory;
157    use crate::sst::location;
158
159    #[tokio::test]
160    async fn test_file_purge() {
161        common_telemetry::init_default_ut_logging();
162
163        let dir = create_temp_dir("file-purge");
164        let dir_path = dir.path().display().to_string();
165        let builder = Fs::default().root(&dir_path);
166        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
167        let sst_dir = "table1";
168
169        let index_aux_path = dir.path().join("index_aux");
170        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
171            .await
172            .unwrap();
173        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
174            .await
175            .unwrap();
176
177        let object_store = ObjectStore::new(builder).unwrap().finish();
178
179        let layer = Arc::new(AccessLayer::new(
180            sst_dir,
181            PathType::Bare,
182            object_store.clone(),
183            puffin_mgr,
184            intm_mgr,
185        ));
186        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
187        object_store.write(&path, vec![0; 4096]).await.unwrap();
188
189        let scheduler = Arc::new(LocalScheduler::new(3));
190
191        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
192
193        {
194            let handle = FileHandle::new(
195                FileMeta {
196                    region_id: sst_file_id.region_id(),
197                    file_id: sst_file_id.file_id(),
198                    time_range: FileTimeRange::default(),
199                    level: 0,
200                    file_size: 4096,
201                    available_indexes: Default::default(),
202                    index_file_size: 0,
203                    num_rows: 0,
204                    num_row_groups: 0,
205                    sequence: None,
206                },
207                file_purger,
208            );
209            // mark file as deleted and drop the handle, we expect the file is deleted.
210            handle.mark_deleted();
211        }
212
213        scheduler.stop(true).await.unwrap();
214
215        assert!(!object_store.exists(&path).await.unwrap());
216    }
217
218    #[tokio::test]
219    async fn test_file_purge_with_index() {
220        common_telemetry::init_default_ut_logging();
221
222        let dir = create_temp_dir("file-purge");
223        let dir_path = dir.path().display().to_string();
224        let builder = Fs::default().root(&dir_path);
225        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
226        let sst_dir = "table1";
227
228        let index_aux_path = dir.path().join("index_aux");
229        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
230            .await
231            .unwrap();
232        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
233            .await
234            .unwrap();
235
236        let object_store = ObjectStore::new(builder).unwrap().finish();
237
238        let layer = Arc::new(AccessLayer::new(
239            sst_dir,
240            PathType::Bare,
241            object_store.clone(),
242            puffin_mgr,
243            intm_mgr,
244        ));
245        let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
246        object_store.write(&path, vec![0; 4096]).await.unwrap();
247
248        let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
249        object_store
250            .write(&index_path, vec![0; 4096])
251            .await
252            .unwrap();
253
254        let scheduler = Arc::new(LocalScheduler::new(3));
255
256        let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
257
258        {
259            let handle = FileHandle::new(
260                FileMeta {
261                    region_id: sst_file_id.region_id(),
262                    file_id: sst_file_id.file_id(),
263                    time_range: FileTimeRange::default(),
264                    level: 0,
265                    file_size: 4096,
266                    available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
267                    index_file_size: 4096,
268                    num_rows: 1024,
269                    num_row_groups: 1,
270                    sequence: NonZeroU64::new(4096),
271                },
272                file_purger,
273            );
274            // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
275            handle.mark_deleted();
276        }
277
278        scheduler.stop(true).await.unwrap();
279
280        assert!(!object_store.exists(&path).await.unwrap());
281        assert!(!object_store.exists(&index_path).await.unwrap());
282    }
283}