1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19use store_api::region_request::PathType;
20
21use crate::access_layer::AccessLayerRef;
22use crate::cache::CacheManagerRef;
23use crate::error::Result;
24use crate::schedule::scheduler::SchedulerRef;
25use crate::sst::file::{FileMeta, delete_files, delete_index};
26use crate::sst::file_ref::FileReferenceManagerRef;
27
28pub trait FilePurger: Send + Sync + fmt::Debug {
30 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
35
36 fn new_file(&self, _: &FileMeta) {
40 }
42}
43
44pub type FilePurgerRef = Arc<dyn FilePurger>;
45
46#[derive(Debug)]
48pub struct NoopFilePurger;
49
50impl FilePurger for NoopFilePurger {
51 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
52 }
54}
55
56pub struct LocalFilePurger {
58 scheduler: SchedulerRef,
59 sst_layer: AccessLayerRef,
60 cache_manager: Option<CacheManagerRef>,
61}
62
63impl fmt::Debug for LocalFilePurger {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("LocalFilePurger")
66 .field("sst_layer", &self.sst_layer)
67 .finish()
68 }
69}
70
71#[cfg(not(debug_assertions))]
72pub fn should_enable_gc(
74 global_gc_enabled: bool,
75 object_store_scheme: object_store::Scheme,
76) -> bool {
77 global_gc_enabled && object_store_scheme != object_store::Scheme::Fs
78}
79
80#[cfg(debug_assertions)]
81pub fn should_enable_gc(
84 global_gc_enabled: bool,
85 _object_store_scheme: object_store::Scheme,
86) -> bool {
87 global_gc_enabled
88}
89
90pub fn create_file_purger(
100 gc_enabled: bool,
101 path_type: PathType,
102 scheduler: SchedulerRef,
103 sst_layer: AccessLayerRef,
104 cache_manager: Option<CacheManagerRef>,
105 file_ref_manager: FileReferenceManagerRef,
106) -> FilePurgerRef {
107 if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme())
111 && matches!(path_type, PathType::Data | PathType::Bare)
112 {
113 Arc::new(ObjectStoreFilePurger { file_ref_manager })
114 } else {
115 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
116 }
117}
118
119pub fn create_local_file_purger(
121 scheduler: SchedulerRef,
122 sst_layer: AccessLayerRef,
123 cache_manager: Option<CacheManagerRef>,
124 _file_ref_manager: FileReferenceManagerRef,
125) -> FilePurgerRef {
126 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
127}
128
129impl LocalFilePurger {
130 pub fn new(
132 scheduler: SchedulerRef,
133 sst_layer: AccessLayerRef,
134 cache_manager: Option<CacheManagerRef>,
135 ) -> Self {
136 Self {
137 scheduler,
138 sst_layer,
139 cache_manager,
140 }
141 }
142
143 pub async fn stop_scheduler(&self) -> Result<()> {
145 self.scheduler.stop(true).await
146 }
147
148 fn delete_file(&self, file_meta: FileMeta) {
150 let sst_layer = self.sst_layer.clone();
151 let cache_manager = self.cache_manager.clone();
152 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
153 if let Err(e) = delete_files(
154 file_meta.region_id,
155 &[(file_meta.file_id, file_meta.index_id().version)],
156 file_meta.exists_index(),
157 &sst_layer,
158 &cache_manager,
159 )
160 .await
161 {
162 error!(e; "Failed to delete file {:?} from storage", file_meta);
163 }
164 })) {
165 error!(e; "Failed to schedule the file purge request");
166 }
167 }
168
169 fn delete_index(&self, file_meta: FileMeta) {
170 let sst_layer = self.sst_layer.clone();
171 let cache_manager = self.cache_manager.clone();
172 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
173 let index_id = file_meta.index_id();
174 if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
175 error!(e; "Failed to delete index for file {:?} from storage", file_meta);
176 }
177 })) {
178 error!(e; "Failed to schedule the index purge request");
179 }
180 }
181}
182
183impl FilePurger for LocalFilePurger {
184 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
185 if is_delete {
186 self.delete_file(file_meta);
187 } else if index_outdated {
188 self.delete_index(file_meta);
189 }
190 }
191}
192
193#[derive(Debug)]
194pub struct ObjectStoreFilePurger {
195 file_ref_manager: FileReferenceManagerRef,
196}
197
198impl FilePurger for ObjectStoreFilePurger {
199 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
200 self.file_ref_manager.remove_file(&file_meta);
205 }
206
207 fn new_file(&self, file_meta: &FileMeta) {
208 self.file_ref_manager.add_file(file_meta);
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::num::NonZeroU64;
215
216 use common_test_util::temp_dir::create_temp_dir;
217 use object_store::ObjectStore;
218 use object_store::services::Fs;
219 use smallvec::SmallVec;
220 use store_api::region_request::PathType;
221 use store_api::storage::{FileId, RegionId};
222
223 use super::*;
224 use crate::access_layer::AccessLayer;
225 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
226 use crate::sst::file::{
227 ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
228 RegionIndexId,
229 };
230 use crate::sst::index::intermediate::IntermediateManager;
231 use crate::sst::index::puffin_manager::PuffinManagerFactory;
232 use crate::sst::location;
233
234 #[tokio::test]
235 async fn test_file_purge() {
236 common_telemetry::init_default_ut_logging();
237
238 let dir = create_temp_dir("file-purge");
239 let dir_path = dir.path().display().to_string();
240 let builder = Fs::default().root(&dir_path);
241 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
242 let sst_dir = "table1";
243
244 let index_aux_path = dir.path().join("index_aux");
245 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
246 .await
247 .unwrap();
248 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
249 .await
250 .unwrap();
251
252 let object_store = ObjectStore::new(builder).unwrap().finish();
253
254 let layer = Arc::new(AccessLayer::new(
255 sst_dir,
256 PathType::Bare,
257 object_store.clone(),
258 puffin_mgr,
259 intm_mgr,
260 ));
261 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
262 object_store.write(&path, vec![0; 4096]).await.unwrap();
263
264 let scheduler = Arc::new(LocalScheduler::new(3));
265
266 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
267
268 {
269 let handle = FileHandle::new(
270 FileMeta {
271 region_id: sst_file_id.region_id(),
272 file_id: sst_file_id.file_id(),
273 time_range: FileTimeRange::default(),
274 level: 0,
275 file_size: 4096,
276 max_row_group_uncompressed_size: 4096,
277 available_indexes: Default::default(),
278 indexes: Default::default(),
279 index_file_size: 0,
280 index_version: 0,
281 num_rows: 0,
282 num_row_groups: 0,
283 sequence: None,
284 partition_expr: None,
285 num_series: 0,
286 },
287 file_purger,
288 );
289 handle.mark_deleted();
291 }
292
293 scheduler.stop(true).await.unwrap();
294
295 assert!(!object_store.exists(&path).await.unwrap());
296 }
297
298 #[tokio::test]
299 async fn test_file_purge_with_index() {
300 common_telemetry::init_default_ut_logging();
301
302 let dir = create_temp_dir("file-purge");
303 let dir_path = dir.path().display().to_string();
304 let builder = Fs::default().root(&dir_path);
305 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
306 let index_file_id = RegionIndexId::new(sst_file_id, 0);
307 let sst_dir = "table1";
308
309 let index_aux_path = dir.path().join("index_aux");
310 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
311 .await
312 .unwrap();
313 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
314 .await
315 .unwrap();
316
317 let object_store = ObjectStore::new(builder).unwrap().finish();
318
319 let layer = Arc::new(AccessLayer::new(
320 sst_dir,
321 PathType::Bare,
322 object_store.clone(),
323 puffin_mgr,
324 intm_mgr,
325 ));
326 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
327 object_store.write(&path, vec![0; 4096]).await.unwrap();
328
329 let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
330 object_store
331 .write(&index_path, vec![0; 4096])
332 .await
333 .unwrap();
334
335 let scheduler = Arc::new(LocalScheduler::new(3));
336
337 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
338
339 {
340 let handle = FileHandle::new(
341 FileMeta {
342 region_id: sst_file_id.region_id(),
343 file_id: sst_file_id.file_id(),
344 time_range: FileTimeRange::default(),
345 level: 0,
346 file_size: 4096,
347 max_row_group_uncompressed_size: 4096,
348 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
349 indexes: vec![ColumnIndexMetadata {
350 column_id: 0,
351 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
352 }],
353 index_file_size: 4096,
354 index_version: 0,
355 num_rows: 1024,
356 num_row_groups: 1,
357 sequence: NonZeroU64::new(4096),
358 partition_expr: None,
359 num_series: 0,
360 },
361 file_purger,
362 );
363 handle.mark_deleted();
365 }
366
367 scheduler.stop(true).await.unwrap();
368
369 assert!(!object_store.exists(&path).await.unwrap());
370 assert!(!object_store.exists(&index_path).await.unwrap());
371 }
372}