1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26#[derive(Debug)]
28pub struct PurgeRequest {
29 pub file_meta: FileMeta,
31}
32
33pub trait FilePurger: Send + Sync + fmt::Debug {
35 fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41#[derive(Debug)]
43pub struct NoopFilePurger;
44
45impl FilePurger for NoopFilePurger {
46 fn send_request(&self, _: PurgeRequest) {
47 }
49}
50
51pub struct LocalFilePurger {
53 scheduler: SchedulerRef,
54 sst_layer: AccessLayerRef,
55 cache_manager: Option<CacheManagerRef>,
56}
57
58impl fmt::Debug for LocalFilePurger {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("LocalFilePurger")
61 .field("sst_layer", &self.sst_layer)
62 .finish()
63 }
64}
65
66impl LocalFilePurger {
67 pub fn new(
69 scheduler: SchedulerRef,
70 sst_layer: AccessLayerRef,
71 cache_manager: Option<CacheManagerRef>,
72 ) -> Self {
73 Self {
74 scheduler,
75 sst_layer,
76 cache_manager,
77 }
78 }
79}
80
81impl FilePurger for LocalFilePurger {
82 fn send_request(&self, request: PurgeRequest) {
83 let file_meta = request.file_meta;
84 let sst_layer = self.sst_layer.clone();
85
86 if let Some(cache) = &self.cache_manager {
88 cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
89 }
90
91 let cache_manager = self.cache_manager.clone();
92 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
93 if let Err(e) = sst_layer.delete_sst(&file_meta).await {
94 error!(e; "Failed to delete SST file, file_id: {}, region: {}",
95 file_meta.file_id, file_meta.region_id);
96 } else {
97 info!(
98 "Successfully deleted SST file, file_id: {}, region: {}",
99 file_meta.file_id, file_meta.region_id
100 );
101 }
102
103 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
104 {
105 if file_meta.exists_index() {
107 write_cache
108 .remove(IndexKey::new(
109 file_meta.region_id,
110 file_meta.file_id,
111 FileType::Puffin,
112 ))
113 .await;
114 }
115 write_cache
117 .remove(IndexKey::new(
118 file_meta.region_id,
119 file_meta.file_id,
120 FileType::Parquet,
121 ))
122 .await;
123 }
124
125 if let Err(e) = sst_layer
127 .puffin_manager_factory()
128 .purge_stager(file_meta.file_id)
129 .await
130 {
131 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
132 file_meta.file_id, file_meta.region_id);
133 }
134 })) {
135 error!(e; "Failed to schedule the file purge request");
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use std::num::NonZeroU64;
143
144 use common_test_util::temp_dir::create_temp_dir;
145 use object_store::services::Fs;
146 use object_store::ObjectStore;
147 use smallvec::SmallVec;
148
149 use super::*;
150 use crate::access_layer::AccessLayer;
151 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
152 use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
153 use crate::sst::index::intermediate::IntermediateManager;
154 use crate::sst::index::puffin_manager::PuffinManagerFactory;
155 use crate::sst::location;
156
157 #[tokio::test]
158 async fn test_file_purge() {
159 common_telemetry::init_default_ut_logging();
160
161 let dir = create_temp_dir("file-purge");
162 let dir_path = dir.path().display().to_string();
163 let builder = Fs::default().root(&dir_path);
164 let sst_file_id = FileId::random();
165 let sst_dir = "table1";
166 let path = location::sst_file_path(sst_dir, sst_file_id);
167
168 let index_aux_path = dir.path().join("index_aux");
169 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
170 .await
171 .unwrap();
172 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
173 .await
174 .unwrap();
175
176 let object_store = ObjectStore::new(builder).unwrap().finish();
177 object_store.write(&path, vec![0; 4096]).await.unwrap();
178
179 let scheduler = Arc::new(LocalScheduler::new(3));
180 let layer = Arc::new(AccessLayer::new(
181 sst_dir,
182 object_store.clone(),
183 puffin_mgr,
184 intm_mgr,
185 ));
186
187 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
188
189 {
190 let handle = FileHandle::new(
191 FileMeta {
192 region_id: 0.into(),
193 file_id: sst_file_id,
194 time_range: FileTimeRange::default(),
195 level: 0,
196 file_size: 4096,
197 available_indexes: Default::default(),
198 index_file_size: 0,
199 num_rows: 0,
200 num_row_groups: 0,
201 sequence: None,
202 },
203 file_purger,
204 );
205 handle.mark_deleted();
207 }
208
209 scheduler.stop(true).await.unwrap();
210
211 assert!(!object_store.exists(&path).await.unwrap());
212 }
213
214 #[tokio::test]
215 async fn test_file_purge_with_index() {
216 common_telemetry::init_default_ut_logging();
217
218 let dir = create_temp_dir("file-purge");
219 let dir_path = dir.path().display().to_string();
220 let builder = Fs::default().root(&dir_path);
221 let sst_file_id = FileId::random();
222 let sst_dir = "table1";
223
224 let index_aux_path = dir.path().join("index_aux");
225 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
226 .await
227 .unwrap();
228 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
229 .await
230 .unwrap();
231
232 let path = location::sst_file_path(sst_dir, sst_file_id);
233 let object_store = ObjectStore::new(builder).unwrap().finish();
234 object_store.write(&path, vec![0; 4096]).await.unwrap();
235
236 let index_path = location::index_file_path(sst_dir, sst_file_id);
237 object_store
238 .write(&index_path, vec![0; 4096])
239 .await
240 .unwrap();
241
242 let scheduler = Arc::new(LocalScheduler::new(3));
243 let layer = Arc::new(AccessLayer::new(
244 sst_dir,
245 object_store.clone(),
246 puffin_mgr,
247 intm_mgr,
248 ));
249
250 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
251
252 {
253 let handle = FileHandle::new(
254 FileMeta {
255 region_id: 0.into(),
256 file_id: sst_file_id,
257 time_range: FileTimeRange::default(),
258 level: 0,
259 file_size: 4096,
260 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
261 index_file_size: 4096,
262 num_rows: 1024,
263 num_row_groups: 1,
264 sequence: NonZeroU64::new(4096),
265 },
266 file_purger,
267 );
268 handle.mark_deleted();
270 }
271
272 scheduler.stop(true).await.unwrap();
273
274 assert!(!object_store.exists(&path).await.unwrap());
275 assert!(!object_store.exists(&index_path).await.unwrap());
276 }
277}