1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files, delete_index};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27pub trait FilePurger: Send + Sync + fmt::Debug {
29 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
34
35 fn new_file(&self, _: &FileMeta) {
39 }
41}
42
43pub type FilePurgerRef = Arc<dyn FilePurger>;
44
45#[derive(Debug)]
47pub struct NoopFilePurger;
48
49impl FilePurger for NoopFilePurger {
50 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
51 }
53}
54
55pub struct LocalFilePurger {
57 scheduler: SchedulerRef,
58 sst_layer: AccessLayerRef,
59 cache_manager: Option<CacheManagerRef>,
60}
61
62impl fmt::Debug for LocalFilePurger {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("LocalFilePurger")
65 .field("sst_layer", &self.sst_layer)
66 .finish()
67 }
68}
69
70pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
71 sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
72}
73
74pub fn create_file_purger(
84 gc_enabled: bool,
85 scheduler: SchedulerRef,
86 sst_layer: AccessLayerRef,
87 cache_manager: Option<CacheManagerRef>,
88 file_ref_manager: FileReferenceManagerRef,
89) -> FilePurgerRef {
90 if gc_enabled && !is_local_fs(&sst_layer) {
91 Arc::new(ObjectStoreFilePurger { file_ref_manager })
92 } else {
93 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
94 }
95}
96
97pub fn create_local_file_purger(
99 scheduler: SchedulerRef,
100 sst_layer: AccessLayerRef,
101 cache_manager: Option<CacheManagerRef>,
102 _file_ref_manager: FileReferenceManagerRef,
103) -> FilePurgerRef {
104 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
105}
106
107impl LocalFilePurger {
108 pub fn new(
110 scheduler: SchedulerRef,
111 sst_layer: AccessLayerRef,
112 cache_manager: Option<CacheManagerRef>,
113 ) -> Self {
114 Self {
115 scheduler,
116 sst_layer,
117 cache_manager,
118 }
119 }
120
121 pub async fn stop_scheduler(&self) -> Result<()> {
123 self.scheduler.stop(true).await
124 }
125
126 fn delete_file(&self, file_meta: FileMeta) {
128 let sst_layer = self.sst_layer.clone();
129 let cache_manager = self.cache_manager.clone();
130 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
131 if let Err(e) = delete_files(
132 file_meta.region_id,
133 &[(file_meta.file_id, file_meta.index_id().version)],
134 file_meta.exists_index(),
135 &sst_layer,
136 &cache_manager,
137 )
138 .await
139 {
140 error!(e; "Failed to delete file {:?} from storage", file_meta);
141 }
142 })) {
143 error!(e; "Failed to schedule the file purge request");
144 }
145 }
146
147 fn delete_index(&self, file_meta: FileMeta) {
148 let sst_layer = self.sst_layer.clone();
149 let cache_manager = self.cache_manager.clone();
150 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
151 let index_id = file_meta.index_id();
152 if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
153 error!(e; "Failed to delete index for file {:?} from storage", file_meta);
154 }
155 })) {
156 error!(e; "Failed to schedule the index purge request");
157 }
158 }
159}
160
161impl FilePurger for LocalFilePurger {
162 fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
163 if is_delete {
164 self.delete_file(file_meta);
165 } else if index_outdated {
166 self.delete_index(file_meta);
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct ObjectStoreFilePurger {
173 file_ref_manager: FileReferenceManagerRef,
174}
175
176impl FilePurger for ObjectStoreFilePurger {
177 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
178 self.file_ref_manager.remove_file(&file_meta);
182 }
184
185 fn new_file(&self, file_meta: &FileMeta) {
186 self.file_ref_manager.add_file(file_meta);
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::num::NonZeroU64;
193
194 use common_test_util::temp_dir::create_temp_dir;
195 use object_store::ObjectStore;
196 use object_store::services::Fs;
197 use smallvec::SmallVec;
198 use store_api::region_request::PathType;
199 use store_api::storage::{FileId, RegionId};
200
201 use super::*;
202 use crate::access_layer::AccessLayer;
203 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
204 use crate::sst::file::{
205 ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
206 RegionIndexId,
207 };
208 use crate::sst::index::intermediate::IntermediateManager;
209 use crate::sst::index::puffin_manager::PuffinManagerFactory;
210 use crate::sst::location;
211
212 #[tokio::test]
213 async fn test_file_purge() {
214 common_telemetry::init_default_ut_logging();
215
216 let dir = create_temp_dir("file-purge");
217 let dir_path = dir.path().display().to_string();
218 let builder = Fs::default().root(&dir_path);
219 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
220 let sst_dir = "table1";
221
222 let index_aux_path = dir.path().join("index_aux");
223 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
224 .await
225 .unwrap();
226 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
227 .await
228 .unwrap();
229
230 let object_store = ObjectStore::new(builder).unwrap().finish();
231
232 let layer = Arc::new(AccessLayer::new(
233 sst_dir,
234 PathType::Bare,
235 object_store.clone(),
236 puffin_mgr,
237 intm_mgr,
238 ));
239 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
240 object_store.write(&path, vec![0; 4096]).await.unwrap();
241
242 let scheduler = Arc::new(LocalScheduler::new(3));
243
244 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
245
246 {
247 let handle = FileHandle::new(
248 FileMeta {
249 region_id: sst_file_id.region_id(),
250 file_id: sst_file_id.file_id(),
251 time_range: FileTimeRange::default(),
252 level: 0,
253 file_size: 4096,
254 max_row_group_uncompressed_size: 4096,
255 available_indexes: Default::default(),
256 indexes: Default::default(),
257 index_file_size: 0,
258 index_version: 0,
259 num_rows: 0,
260 num_row_groups: 0,
261 sequence: None,
262 partition_expr: None,
263 num_series: 0,
264 },
265 file_purger,
266 );
267 handle.mark_deleted();
269 }
270
271 scheduler.stop(true).await.unwrap();
272
273 assert!(!object_store.exists(&path).await.unwrap());
274 }
275
276 #[tokio::test]
277 async fn test_file_purge_with_index() {
278 common_telemetry::init_default_ut_logging();
279
280 let dir = create_temp_dir("file-purge");
281 let dir_path = dir.path().display().to_string();
282 let builder = Fs::default().root(&dir_path);
283 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
284 let index_file_id = RegionIndexId::new(sst_file_id, 0);
285 let sst_dir = "table1";
286
287 let index_aux_path = dir.path().join("index_aux");
288 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
289 .await
290 .unwrap();
291 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
292 .await
293 .unwrap();
294
295 let object_store = ObjectStore::new(builder).unwrap().finish();
296
297 let layer = Arc::new(AccessLayer::new(
298 sst_dir,
299 PathType::Bare,
300 object_store.clone(),
301 puffin_mgr,
302 intm_mgr,
303 ));
304 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
305 object_store.write(&path, vec![0; 4096]).await.unwrap();
306
307 let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
308 object_store
309 .write(&index_path, vec![0; 4096])
310 .await
311 .unwrap();
312
313 let scheduler = Arc::new(LocalScheduler::new(3));
314
315 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
316
317 {
318 let handle = FileHandle::new(
319 FileMeta {
320 region_id: sst_file_id.region_id(),
321 file_id: sst_file_id.file_id(),
322 time_range: FileTimeRange::default(),
323 level: 0,
324 file_size: 4096,
325 max_row_group_uncompressed_size: 4096,
326 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
327 indexes: vec![ColumnIndexMetadata {
328 column_id: 0,
329 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
330 }],
331 index_file_size: 4096,
332 index_version: 0,
333 num_rows: 1024,
334 num_row_groups: 1,
335 sequence: NonZeroU64::new(4096),
336 partition_expr: None,
337 num_series: 0,
338 },
339 file_purger,
340 );
341 handle.mark_deleted();
343 }
344
345 scheduler.stop(true).await.unwrap();
346
347 assert!(!object_store.exists(&path).await.unwrap());
348 assert!(!object_store.exists(&index_path).await.unwrap());
349 }
350}