1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19use store_api::region_request::PathType;
20
21use crate::access_layer::AccessLayerRef;
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::{FileMeta, delete_files, delete_index};
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28pub trait FilePurger: Send + Sync + fmt::Debug {
30 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
35
36 fn new_file(&self, _: &FileMeta) {
40 }
42}
43
44pub type FilePurgerRef = Arc<dyn FilePurger>;
45
46#[derive(Debug)]
48pub struct NoopFilePurger;
49
50impl FilePurger for NoopFilePurger {
51 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
52 }
54}
55
56pub struct LocalFilePurger {
58 scheduler: SchedulerRef,
59 sst_layer: AccessLayerRef,
60 cache_manager: Option<CacheManagerRef>,
61}
62
63impl fmt::Debug for LocalFilePurger {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("LocalFilePurger")
66 .field("sst_layer", &self.sst_layer)
67 .finish()
68 }
69}
70
71#[cfg(not(debug_assertions))]
72pub fn should_enable_gc(global_gc_enabled: bool, object_store_scheme: &'static str) -> bool {
74 global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME
75}
76
77#[cfg(debug_assertions)]
78pub fn should_enable_gc(global_gc_enabled: bool, _object_store_scheme: &'static str) -> bool {
81 global_gc_enabled
82}
83
84pub fn create_file_purger(
94 gc_enabled: bool,
95 path_type: PathType,
96 scheduler: SchedulerRef,
97 sst_layer: AccessLayerRef,
98 cache_manager: Option<CacheManagerRef>,
99 file_ref_manager: FileReferenceManagerRef,
100) -> FilePurgerRef {
101 if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme())
105 && matches!(path_type, PathType::Data | PathType::Bare)
106 {
107 Arc::new(ObjectStoreFilePurger { file_ref_manager })
108 } else {
109 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
110 }
111}
112
113pub fn create_local_file_purger(
115 scheduler: SchedulerRef,
116 sst_layer: AccessLayerRef,
117 cache_manager: Option<CacheManagerRef>,
118 _file_ref_manager: FileReferenceManagerRef,
119) -> FilePurgerRef {
120 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
121}
122
123impl LocalFilePurger {
124 pub fn new(
126 scheduler: SchedulerRef,
127 sst_layer: AccessLayerRef,
128 cache_manager: Option<CacheManagerRef>,
129 ) -> Self {
130 Self {
131 scheduler,
132 sst_layer,
133 cache_manager,
134 }
135 }
136
137 pub async fn stop_scheduler(&self) -> Result<()> {
139 self.scheduler.stop(true).await
140 }
141
142 fn delete_file(&self, file_meta: FileMeta) {
144 let sst_layer = self.sst_layer.clone();
145 let cache_manager = self.cache_manager.clone();
146 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
147 if let Err(e) = delete_files(
148 file_meta.region_id,
149 &[(file_meta.file_id, file_meta.index_id().version)],
150 file_meta.exists_index(),
151 &sst_layer,
152 &cache_manager,
153 )
154 .await
155 {
156 error!(e; "Failed to delete file {:?} from storage", file_meta);
157 }
158 })) {
159 error!(e; "Failed to schedule the file purge request");
160 }
161 }
162
163 fn delete_index(&self, file_meta: FileMeta) {
164 let sst_layer = self.sst_layer.clone();
165 let cache_manager = self.cache_manager.clone();
166 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
167 let index_id = file_meta.index_id();
168 if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
169 error!(e; "Failed to delete index for file {:?} from storage", file_meta);
170 }
171 })) {
172 error!(e; "Failed to schedule the index purge request");
173 }
174 }
175}
176
177impl FilePurger for LocalFilePurger {
178 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
179 if is_delete {
180 self.delete_file(file_meta);
181 } else if index_outdated {
182 self.delete_index(file_meta);
183 }
184 }
185}
186
187#[derive(Debug)]
188pub struct ObjectStoreFilePurger {
189 file_ref_manager: FileReferenceManagerRef,
190}
191
192impl FilePurger for ObjectStoreFilePurger {
193 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
194 self.file_ref_manager.remove_file(&file_meta);
199 }
200
201 fn new_file(&self, file_meta: &FileMeta) {
202 self.file_ref_manager.add_file(file_meta);
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::num::NonZeroU64;
209
210 use common_test_util::temp_dir::create_temp_dir;
211 use object_store::ObjectStore;
212 use object_store::services::Fs;
213 use smallvec::SmallVec;
214 use store_api::region_request::PathType;
215 use store_api::storage::{FileId, RegionId};
216
217 use super::*;
218 use crate::access_layer::AccessLayer;
219 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
220 use crate::sst::file::{
221 ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
222 RegionIndexId,
223 };
224 use crate::sst::index::intermediate::IntermediateManager;
225 use crate::sst::index::puffin_manager::PuffinManagerFactory;
226 use crate::sst::location;
227
228 #[tokio::test]
229 async fn test_file_purge() {
230 common_telemetry::init_default_ut_logging();
231
232 let dir = create_temp_dir("file-purge");
233 let dir_path = dir.path().display().to_string();
234 let builder = Fs::default().root(&dir_path);
235 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
236 let sst_dir = "table1";
237
238 let index_aux_path = dir.path().join("index_aux");
239 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
240 .await
241 .unwrap();
242 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
243 .await
244 .unwrap();
245
246 let object_store = ObjectStore::new(builder).unwrap().finish();
247
248 let layer = Arc::new(AccessLayer::new(
249 sst_dir,
250 PathType::Bare,
251 object_store.clone(),
252 puffin_mgr,
253 intm_mgr,
254 ));
255 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
256 object_store.write(&path, vec![0; 4096]).await.unwrap();
257
258 let scheduler = Arc::new(LocalScheduler::new(3));
259
260 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
261
262 {
263 let handle = FileHandle::new(
264 FileMeta {
265 region_id: sst_file_id.region_id(),
266 file_id: sst_file_id.file_id(),
267 time_range: FileTimeRange::default(),
268 level: 0,
269 file_size: 4096,
270 max_row_group_uncompressed_size: 4096,
271 available_indexes: Default::default(),
272 indexes: Default::default(),
273 index_file_size: 0,
274 index_version: 0,
275 num_rows: 0,
276 num_row_groups: 0,
277 sequence: None,
278 partition_expr: None,
279 num_series: 0,
280 ..Default::default()
281 },
282 file_purger,
283 );
284 handle.mark_deleted();
286 }
287
288 scheduler.stop(true).await.unwrap();
289
290 assert!(!object_store.exists(&path).await.unwrap());
291 }
292
293 #[tokio::test]
294 async fn test_file_purge_with_index() {
295 common_telemetry::init_default_ut_logging();
296
297 let dir = create_temp_dir("file-purge");
298 let dir_path = dir.path().display().to_string();
299 let builder = Fs::default().root(&dir_path);
300 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
301 let index_file_id = RegionIndexId::new(sst_file_id, 0);
302 let sst_dir = "table1";
303
304 let index_aux_path = dir.path().join("index_aux");
305 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
306 .await
307 .unwrap();
308 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
309 .await
310 .unwrap();
311
312 let object_store = ObjectStore::new(builder).unwrap().finish();
313
314 let layer = Arc::new(AccessLayer::new(
315 sst_dir,
316 PathType::Bare,
317 object_store.clone(),
318 puffin_mgr,
319 intm_mgr,
320 ));
321 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
322 object_store.write(&path, vec![0; 4096]).await.unwrap();
323
324 let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
325 object_store
326 .write(&index_path, vec![0; 4096])
327 .await
328 .unwrap();
329
330 let scheduler = Arc::new(LocalScheduler::new(3));
331
332 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
333
334 {
335 let handle = FileHandle::new(
336 FileMeta {
337 region_id: sst_file_id.region_id(),
338 file_id: sst_file_id.file_id(),
339 time_range: FileTimeRange::default(),
340 level: 0,
341 file_size: 4096,
342 max_row_group_uncompressed_size: 4096,
343 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
344 indexes: vec![ColumnIndexMetadata {
345 column_id: 0,
346 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
347 }],
348 index_file_size: 4096,
349 index_version: 0,
350 num_rows: 1024,
351 num_row_groups: 1,
352 sequence: NonZeroU64::new(4096),
353 partition_expr: None,
354 num_series: 0,
355 ..Default::default()
356 },
357 file_purger,
358 );
359 handle.mark_deleted();
361 }
362
363 scheduler.stop(true).await.unwrap();
364
365 assert!(!object_store.exists(&path).await.unwrap());
366 assert!(!object_store.exists(&index_path).await.unwrap());
367 }
368}