1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::FileMeta;
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28pub trait FilePurger: Send + Sync + fmt::Debug {
30 fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
34
35 fn new_file(&self, _: &FileMeta) {
39 }
41}
42
43pub type FilePurgerRef = Arc<dyn FilePurger>;
44
45#[derive(Debug)]
47pub struct NoopFilePurger;
48
49impl FilePurger for NoopFilePurger {
50 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
51 }
53}
54
55pub struct LocalFilePurger {
57 scheduler: SchedulerRef,
58 sst_layer: AccessLayerRef,
59 cache_manager: Option<CacheManagerRef>,
60}
61
62impl fmt::Debug for LocalFilePurger {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("LocalFilePurger")
65 .field("sst_layer", &self.sst_layer)
66 .finish()
67 }
68}
69
70pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
71 sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
72}
73
74pub fn create_file_purger(
84 scheduler: SchedulerRef,
85 sst_layer: AccessLayerRef,
86 cache_manager: Option<CacheManagerRef>,
87 file_ref_manager: FileReferenceManagerRef,
88) -> FilePurgerRef {
89 if is_local_fs(&sst_layer) {
90 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
91 } else {
92 Arc::new(ObjectStoreFilePurger { file_ref_manager })
93 }
94}
95
96pub fn create_local_file_purger(
98 scheduler: SchedulerRef,
99 sst_layer: AccessLayerRef,
100 cache_manager: Option<CacheManagerRef>,
101 _file_ref_manager: FileReferenceManagerRef,
102) -> FilePurgerRef {
103 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
104}
105
106impl LocalFilePurger {
107 pub fn new(
109 scheduler: SchedulerRef,
110 sst_layer: AccessLayerRef,
111 cache_manager: Option<CacheManagerRef>,
112 ) -> Self {
113 Self {
114 scheduler,
115 sst_layer,
116 cache_manager,
117 }
118 }
119
120 pub async fn stop_scheduler(&self) -> Result<()> {
122 self.scheduler.stop(true).await
123 }
124
125 fn delete_file(&self, file_meta: FileMeta) {
127 let sst_layer = self.sst_layer.clone();
128
129 if let Some(cache) = &self.cache_manager {
131 cache.remove_parquet_meta_data(file_meta.file_id());
132 }
133
134 let cache_manager = self.cache_manager.clone();
135 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
136 if let Err(e) = sst_layer.delete_sst(&file_meta.file_id()).await {
137 error!(e; "Failed to delete SST file, file_id: {}, region: {}",
138 file_meta.file_id, file_meta.region_id);
139 } else {
140 info!(
141 "Successfully deleted SST file, file_id: {}, region: {}",
142 file_meta.file_id, file_meta.region_id
143 );
144 }
145
146 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
147 {
148 if file_meta.exists_index() {
150 write_cache
151 .remove(IndexKey::new(
152 file_meta.region_id,
153 file_meta.file_id,
154 FileType::Puffin,
155 ))
156 .await;
157 }
158 write_cache
160 .remove(IndexKey::new(
161 file_meta.region_id,
162 file_meta.file_id,
163 FileType::Parquet,
164 ))
165 .await;
166 }
167
168 if let Err(e) = sst_layer
170 .puffin_manager_factory()
171 .purge_stager(file_meta.file_id())
172 .await
173 {
174 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
175 file_meta.file_id(), file_meta.region_id);
176 }
177 let file_id = file_meta.file_id();
178 if let Err(e) = sst_layer
179 .intermediate_manager()
180 .prune_sst_dir(&file_id.region_id(), &file_id.file_id())
181 .await
182 {
183 error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id());
184 }
185 })) {
186 error!(e; "Failed to schedule the file purge request");
187 }
188 }
189}
190
191impl FilePurger for LocalFilePurger {
192 fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
193 if is_delete {
194 self.delete_file(file_meta);
195 }
196 }
197}
198
199#[derive(Debug)]
200pub struct ObjectStoreFilePurger {
201 file_ref_manager: FileReferenceManagerRef,
202}
203
204impl FilePurger for ObjectStoreFilePurger {
205 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
206 self.file_ref_manager.remove_file(&file_meta);
210 }
211
212 fn new_file(&self, file_meta: &FileMeta) {
213 self.file_ref_manager.add_file(file_meta);
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::num::NonZeroU64;
220
221 use common_test_util::temp_dir::create_temp_dir;
222 use object_store::services::Fs;
223 use object_store::ObjectStore;
224 use smallvec::SmallVec;
225 use store_api::region_request::PathType;
226 use store_api::storage::RegionId;
227
228 use super::*;
229 use crate::access_layer::AccessLayer;
230 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
231 use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
232 use crate::sst::index::intermediate::IntermediateManager;
233 use crate::sst::index::puffin_manager::PuffinManagerFactory;
234 use crate::sst::location;
235
236 #[tokio::test]
237 async fn test_file_purge() {
238 common_telemetry::init_default_ut_logging();
239
240 let dir = create_temp_dir("file-purge");
241 let dir_path = dir.path().display().to_string();
242 let builder = Fs::default().root(&dir_path);
243 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
244 let sst_dir = "table1";
245
246 let index_aux_path = dir.path().join("index_aux");
247 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
248 .await
249 .unwrap();
250 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
251 .await
252 .unwrap();
253
254 let object_store = ObjectStore::new(builder).unwrap().finish();
255
256 let layer = Arc::new(AccessLayer::new(
257 sst_dir,
258 PathType::Bare,
259 object_store.clone(),
260 puffin_mgr,
261 intm_mgr,
262 ));
263 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
264 object_store.write(&path, vec![0; 4096]).await.unwrap();
265
266 let scheduler = Arc::new(LocalScheduler::new(3));
267
268 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
269
270 {
271 let handle = FileHandle::new(
272 FileMeta {
273 region_id: sst_file_id.region_id(),
274 file_id: sst_file_id.file_id(),
275 time_range: FileTimeRange::default(),
276 level: 0,
277 file_size: 4096,
278 available_indexes: Default::default(),
279 index_file_size: 0,
280 num_rows: 0,
281 num_row_groups: 0,
282 sequence: None,
283 },
284 file_purger,
285 );
286 handle.mark_deleted();
288 }
289
290 scheduler.stop(true).await.unwrap();
291
292 assert!(!object_store.exists(&path).await.unwrap());
293 }
294
295 #[tokio::test]
296 async fn test_file_purge_with_index() {
297 common_telemetry::init_default_ut_logging();
298
299 let dir = create_temp_dir("file-purge");
300 let dir_path = dir.path().display().to_string();
301 let builder = Fs::default().root(&dir_path);
302 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
303 let sst_dir = "table1";
304
305 let index_aux_path = dir.path().join("index_aux");
306 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
307 .await
308 .unwrap();
309 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
310 .await
311 .unwrap();
312
313 let object_store = ObjectStore::new(builder).unwrap().finish();
314
315 let layer = Arc::new(AccessLayer::new(
316 sst_dir,
317 PathType::Bare,
318 object_store.clone(),
319 puffin_mgr,
320 intm_mgr,
321 ));
322 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
323 object_store.write(&path, vec![0; 4096]).await.unwrap();
324
325 let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
326 object_store
327 .write(&index_path, vec![0; 4096])
328 .await
329 .unwrap();
330
331 let scheduler = Arc::new(LocalScheduler::new(3));
332
333 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
334
335 {
336 let handle = FileHandle::new(
337 FileMeta {
338 region_id: sst_file_id.region_id(),
339 file_id: sst_file_id.file_id(),
340 time_range: FileTimeRange::default(),
341 level: 0,
342 file_size: 4096,
343 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
344 index_file_size: 4096,
345 num_rows: 1024,
346 num_row_groups: 1,
347 sequence: NonZeroU64::new(4096),
348 },
349 file_purger,
350 );
351 handle.mark_deleted();
353 }
354
355 scheduler.stop(true).await.unwrap();
356
357 assert!(!object_store.exists(&path).await.unwrap());
358 assert!(!object_store.exists(&index_path).await.unwrap());
359 }
360}