1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27pub trait FilePurger: Send + Sync + fmt::Debug {
29 fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
33
34 fn new_file(&self, _: &FileMeta) {
38 }
40}
41
42pub type FilePurgerRef = Arc<dyn FilePurger>;
43
44#[derive(Debug)]
46pub struct NoopFilePurger;
47
48impl FilePurger for NoopFilePurger {
49 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
50 }
52}
53
54pub struct LocalFilePurger {
56 scheduler: SchedulerRef,
57 sst_layer: AccessLayerRef,
58 cache_manager: Option<CacheManagerRef>,
59}
60
61impl fmt::Debug for LocalFilePurger {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("LocalFilePurger")
64 .field("sst_layer", &self.sst_layer)
65 .finish()
66 }
67}
68
69pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
70 sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
71}
72
73pub fn create_file_purger(
83 gc_enabled: bool,
84 scheduler: SchedulerRef,
85 sst_layer: AccessLayerRef,
86 cache_manager: Option<CacheManagerRef>,
87 file_ref_manager: FileReferenceManagerRef,
88) -> FilePurgerRef {
89 if gc_enabled && !is_local_fs(&sst_layer) {
90 Arc::new(ObjectStoreFilePurger { file_ref_manager })
91 } else {
92 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
93 }
94}
95
96pub fn create_local_file_purger(
98 scheduler: SchedulerRef,
99 sst_layer: AccessLayerRef,
100 cache_manager: Option<CacheManagerRef>,
101 _file_ref_manager: FileReferenceManagerRef,
102) -> FilePurgerRef {
103 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
104}
105
106impl LocalFilePurger {
107 pub fn new(
109 scheduler: SchedulerRef,
110 sst_layer: AccessLayerRef,
111 cache_manager: Option<CacheManagerRef>,
112 ) -> Self {
113 Self {
114 scheduler,
115 sst_layer,
116 cache_manager,
117 }
118 }
119
120 pub async fn stop_scheduler(&self) -> Result<()> {
122 self.scheduler.stop(true).await
123 }
124
125 fn delete_file(&self, file_meta: FileMeta) {
127 let sst_layer = self.sst_layer.clone();
128 let cache_manager = self.cache_manager.clone();
129 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
130 if let Err(e) = delete_files(
131 file_meta.region_id,
132 &[(file_meta.file_id, file_meta.index_file_id().file_id())],
133 file_meta.exists_index(),
134 &sst_layer,
135 &cache_manager,
136 )
137 .await
138 {
139 error!(e; "Failed to delete file {:?} from storage", file_meta);
140 }
141 })) {
142 error!(e; "Failed to schedule the file purge request");
143 }
144 }
145}
146
147impl FilePurger for LocalFilePurger {
148 fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
149 if is_delete {
150 self.delete_file(file_meta);
151 }
152 }
153}
154
155#[derive(Debug)]
156pub struct ObjectStoreFilePurger {
157 file_ref_manager: FileReferenceManagerRef,
158}
159
160impl FilePurger for ObjectStoreFilePurger {
161 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
162 self.file_ref_manager.remove_file(&file_meta);
166 }
168
169 fn new_file(&self, file_meta: &FileMeta) {
170 self.file_ref_manager.add_file(file_meta);
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::num::NonZeroU64;
177
178 use common_test_util::temp_dir::create_temp_dir;
179 use object_store::ObjectStore;
180 use object_store::services::Fs;
181 use smallvec::SmallVec;
182 use store_api::region_request::PathType;
183 use store_api::storage::{FileId, RegionId};
184
185 use super::*;
186 use crate::access_layer::AccessLayer;
187 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
188 use crate::sst::file::{
189 ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
190 };
191 use crate::sst::index::intermediate::IntermediateManager;
192 use crate::sst::index::puffin_manager::PuffinManagerFactory;
193 use crate::sst::location;
194
195 #[tokio::test]
196 async fn test_file_purge() {
197 common_telemetry::init_default_ut_logging();
198
199 let dir = create_temp_dir("file-purge");
200 let dir_path = dir.path().display().to_string();
201 let builder = Fs::default().root(&dir_path);
202 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
203 let sst_dir = "table1";
204
205 let index_aux_path = dir.path().join("index_aux");
206 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
207 .await
208 .unwrap();
209 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
210 .await
211 .unwrap();
212
213 let object_store = ObjectStore::new(builder).unwrap().finish();
214
215 let layer = Arc::new(AccessLayer::new(
216 sst_dir,
217 PathType::Bare,
218 object_store.clone(),
219 puffin_mgr,
220 intm_mgr,
221 ));
222 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
223 object_store.write(&path, vec![0; 4096]).await.unwrap();
224
225 let scheduler = Arc::new(LocalScheduler::new(3));
226
227 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
228
229 {
230 let handle = FileHandle::new(
231 FileMeta {
232 region_id: sst_file_id.region_id(),
233 file_id: sst_file_id.file_id(),
234 time_range: FileTimeRange::default(),
235 level: 0,
236 file_size: 4096,
237 available_indexes: Default::default(),
238 indexes: Default::default(),
239 index_file_size: 0,
240 index_file_id: None,
241 num_rows: 0,
242 num_row_groups: 0,
243 sequence: None,
244 partition_expr: None,
245 num_series: 0,
246 },
247 file_purger,
248 );
249 handle.mark_deleted();
251 }
252
253 scheduler.stop(true).await.unwrap();
254
255 assert!(!object_store.exists(&path).await.unwrap());
256 }
257
258 #[tokio::test]
259 async fn test_file_purge_with_index() {
260 common_telemetry::init_default_ut_logging();
261
262 let dir = create_temp_dir("file-purge");
263 let dir_path = dir.path().display().to_string();
264 let builder = Fs::default().root(&dir_path);
265 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
266 let sst_dir = "table1";
267
268 let index_aux_path = dir.path().join("index_aux");
269 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
270 .await
271 .unwrap();
272 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
273 .await
274 .unwrap();
275
276 let object_store = ObjectStore::new(builder).unwrap().finish();
277
278 let layer = Arc::new(AccessLayer::new(
279 sst_dir,
280 PathType::Bare,
281 object_store.clone(),
282 puffin_mgr,
283 intm_mgr,
284 ));
285 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
286 object_store.write(&path, vec![0; 4096]).await.unwrap();
287
288 let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
289 object_store
290 .write(&index_path, vec![0; 4096])
291 .await
292 .unwrap();
293
294 let scheduler = Arc::new(LocalScheduler::new(3));
295
296 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
297
298 {
299 let handle = FileHandle::new(
300 FileMeta {
301 region_id: sst_file_id.region_id(),
302 file_id: sst_file_id.file_id(),
303 time_range: FileTimeRange::default(),
304 level: 0,
305 file_size: 4096,
306 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
307 indexes: vec![ColumnIndexMetadata {
308 column_id: 0,
309 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
310 }],
311 index_file_size: 4096,
312 index_file_id: None,
313 num_rows: 1024,
314 num_row_groups: 1,
315 sequence: NonZeroU64::new(4096),
316 partition_expr: None,
317 num_series: 0,
318 },
319 file_purger,
320 );
321 handle.mark_deleted();
323 }
324
325 scheduler.stop(true).await.unwrap();
326
327 assert!(!object_store.exists(&path).await.unwrap());
328 assert!(!object_store.exists(&index_path).await.unwrap());
329 }
330}