1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26#[derive(Debug)]
28pub struct PurgeRequest {
29 pub file_meta: FileMeta,
31}
32
33pub trait FilePurger: Send + Sync + fmt::Debug {
35 fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41pub struct LocalFilePurger {
43 scheduler: SchedulerRef,
44 sst_layer: AccessLayerRef,
45 cache_manager: Option<CacheManagerRef>,
46}
47
48impl fmt::Debug for LocalFilePurger {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 f.debug_struct("LocalFilePurger")
51 .field("sst_layer", &self.sst_layer)
52 .finish()
53 }
54}
55
56impl LocalFilePurger {
57 pub fn new(
59 scheduler: SchedulerRef,
60 sst_layer: AccessLayerRef,
61 cache_manager: Option<CacheManagerRef>,
62 ) -> Self {
63 Self {
64 scheduler,
65 sst_layer,
66 cache_manager,
67 }
68 }
69}
70
71impl FilePurger for LocalFilePurger {
72 fn send_request(&self, request: PurgeRequest) {
73 let file_meta = request.file_meta;
74 let sst_layer = self.sst_layer.clone();
75
76 if let Some(cache) = &self.cache_manager {
78 cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
79 }
80
81 let cache_manager = self.cache_manager.clone();
82 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
83 if let Err(e) = sst_layer.delete_sst(&file_meta).await {
84 error!(e; "Failed to delete SST file, file_id: {}, region: {}",
85 file_meta.file_id, file_meta.region_id);
86 } else {
87 info!(
88 "Successfully deleted SST file, file_id: {}, region: {}",
89 file_meta.file_id, file_meta.region_id
90 );
91 }
92
93 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
94 {
95 if file_meta.exists_index() {
97 write_cache
98 .remove(IndexKey::new(
99 file_meta.region_id,
100 file_meta.file_id,
101 FileType::Puffin,
102 ))
103 .await;
104 }
105 write_cache
107 .remove(IndexKey::new(
108 file_meta.region_id,
109 file_meta.file_id,
110 FileType::Parquet,
111 ))
112 .await;
113 }
114
115 if let Err(e) = sst_layer
117 .puffin_manager_factory()
118 .purge_stager(file_meta.file_id)
119 .await
120 {
121 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
122 file_meta.file_id, file_meta.region_id);
123 }
124 })) {
125 error!(e; "Failed to schedule the file purge request");
126 }
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use std::num::NonZeroU64;
133
134 use common_test_util::temp_dir::create_temp_dir;
135 use object_store::services::Fs;
136 use object_store::ObjectStore;
137 use smallvec::SmallVec;
138
139 use super::*;
140 use crate::access_layer::AccessLayer;
141 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
142 use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
143 use crate::sst::index::intermediate::IntermediateManager;
144 use crate::sst::index::puffin_manager::PuffinManagerFactory;
145 use crate::sst::location;
146
147 #[tokio::test]
148 async fn test_file_purge() {
149 common_telemetry::init_default_ut_logging();
150
151 let dir = create_temp_dir("file-purge");
152 let dir_path = dir.path().display().to_string();
153 let builder = Fs::default().root(&dir_path);
154 let sst_file_id = FileId::random();
155 let sst_dir = "table1";
156 let path = location::sst_file_path(sst_dir, sst_file_id);
157
158 let index_aux_path = dir.path().join("index_aux");
159 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
160 .await
161 .unwrap();
162 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
163 .await
164 .unwrap();
165
166 let object_store = ObjectStore::new(builder).unwrap().finish();
167 object_store.write(&path, vec![0; 4096]).await.unwrap();
168
169 let scheduler = Arc::new(LocalScheduler::new(3));
170 let layer = Arc::new(AccessLayer::new(
171 sst_dir,
172 object_store.clone(),
173 puffin_mgr,
174 intm_mgr,
175 ));
176
177 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
178
179 {
180 let handle = FileHandle::new(
181 FileMeta {
182 region_id: 0.into(),
183 file_id: sst_file_id,
184 time_range: FileTimeRange::default(),
185 level: 0,
186 file_size: 4096,
187 available_indexes: Default::default(),
188 index_file_size: 0,
189 num_rows: 0,
190 num_row_groups: 0,
191 sequence: None,
192 },
193 file_purger,
194 );
195 handle.mark_deleted();
197 }
198
199 scheduler.stop(true).await.unwrap();
200
201 assert!(!object_store.exists(&path).await.unwrap());
202 }
203
204 #[tokio::test]
205 async fn test_file_purge_with_index() {
206 common_telemetry::init_default_ut_logging();
207
208 let dir = create_temp_dir("file-purge");
209 let dir_path = dir.path().display().to_string();
210 let builder = Fs::default().root(&dir_path);
211 let sst_file_id = FileId::random();
212 let sst_dir = "table1";
213
214 let index_aux_path = dir.path().join("index_aux");
215 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
216 .await
217 .unwrap();
218 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
219 .await
220 .unwrap();
221
222 let path = location::sst_file_path(sst_dir, sst_file_id);
223 let object_store = ObjectStore::new(builder).unwrap().finish();
224 object_store.write(&path, vec![0; 4096]).await.unwrap();
225
226 let index_path = location::index_file_path(sst_dir, sst_file_id);
227 object_store
228 .write(&index_path, vec![0; 4096])
229 .await
230 .unwrap();
231
232 let scheduler = Arc::new(LocalScheduler::new(3));
233 let layer = Arc::new(AccessLayer::new(
234 sst_dir,
235 object_store.clone(),
236 puffin_mgr,
237 intm_mgr,
238 ));
239
240 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
241
242 {
243 let handle = FileHandle::new(
244 FileMeta {
245 region_id: 0.into(),
246 file_id: sst_file_id,
247 time_range: FileTimeRange::default(),
248 level: 0,
249 file_size: 4096,
250 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
251 index_file_size: 4096,
252 num_rows: 1024,
253 num_row_groups: 1,
254 sequence: NonZeroU64::new(4096),
255 },
256 file_purger,
257 );
258 handle.mark_deleted();
260 }
261
262 scheduler.stop(true).await.unwrap();
263
264 assert!(!object_store.exists(&path).await.unwrap());
265 assert!(!object_store.exists(&index_path).await.unwrap());
266 }
267}