1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::error;
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::CacheManagerRef;
22use crate::error::Result;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::{FileMeta, delete_files};
25use crate::sst::file_ref::FileReferenceManagerRef;
26
27pub trait FilePurger: Send + Sync + fmt::Debug {
29 fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
33
34 fn new_file(&self, _: &FileMeta) {
38 }
40}
41
42pub type FilePurgerRef = Arc<dyn FilePurger>;
43
44#[derive(Debug)]
46pub struct NoopFilePurger;
47
48impl FilePurger for NoopFilePurger {
49 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
50 }
52}
53
54pub struct LocalFilePurger {
56 scheduler: SchedulerRef,
57 sst_layer: AccessLayerRef,
58 cache_manager: Option<CacheManagerRef>,
59}
60
61impl fmt::Debug for LocalFilePurger {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("LocalFilePurger")
64 .field("sst_layer", &self.sst_layer)
65 .finish()
66 }
67}
68
69pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
70 sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
71}
72
73pub fn create_file_purger(
83 scheduler: SchedulerRef,
84 sst_layer: AccessLayerRef,
85 cache_manager: Option<CacheManagerRef>,
86 file_ref_manager: FileReferenceManagerRef,
87) -> FilePurgerRef {
88 if is_local_fs(&sst_layer) {
89 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
90 } else {
91 Arc::new(ObjectStoreFilePurger { file_ref_manager })
92 }
93}
94
95pub fn create_local_file_purger(
97 scheduler: SchedulerRef,
98 sst_layer: AccessLayerRef,
99 cache_manager: Option<CacheManagerRef>,
100 _file_ref_manager: FileReferenceManagerRef,
101) -> FilePurgerRef {
102 Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
103}
104
105impl LocalFilePurger {
106 pub fn new(
108 scheduler: SchedulerRef,
109 sst_layer: AccessLayerRef,
110 cache_manager: Option<CacheManagerRef>,
111 ) -> Self {
112 Self {
113 scheduler,
114 sst_layer,
115 cache_manager,
116 }
117 }
118
119 pub async fn stop_scheduler(&self) -> Result<()> {
121 self.scheduler.stop(true).await
122 }
123
124 fn delete_file(&self, file_meta: FileMeta) {
126 let sst_layer = self.sst_layer.clone();
127 let cache_manager = self.cache_manager.clone();
128 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
129 if let Err(e) = delete_files(
130 file_meta.region_id,
131 &[file_meta.file_id],
132 file_meta.exists_index(),
133 &sst_layer,
134 &cache_manager,
135 )
136 .await
137 {
138 error!(e; "Failed to delete file {:?} from storage", file_meta);
139 }
140 })) {
141 error!(e; "Failed to schedule the file purge request");
142 }
143 }
144}
145
146impl FilePurger for LocalFilePurger {
147 fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
148 if is_delete {
149 self.delete_file(file_meta);
150 }
151 }
152}
153
154#[derive(Debug)]
155pub struct ObjectStoreFilePurger {
156 file_ref_manager: FileReferenceManagerRef,
157}
158
159impl FilePurger for ObjectStoreFilePurger {
160 fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
161 self.file_ref_manager.remove_file(&file_meta);
165 }
166
167 fn new_file(&self, file_meta: &FileMeta) {
168 self.file_ref_manager.add_file(file_meta);
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::num::NonZeroU64;
175
176 use common_test_util::temp_dir::create_temp_dir;
177 use object_store::ObjectStore;
178 use object_store::services::Fs;
179 use smallvec::SmallVec;
180 use store_api::region_request::PathType;
181 use store_api::storage::{FileId, RegionId};
182
183 use super::*;
184 use crate::access_layer::AccessLayer;
185 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
186 use crate::sst::file::{FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId};
187 use crate::sst::index::intermediate::IntermediateManager;
188 use crate::sst::index::puffin_manager::PuffinManagerFactory;
189 use crate::sst::location;
190
191 #[tokio::test]
192 async fn test_file_purge() {
193 common_telemetry::init_default_ut_logging();
194
195 let dir = create_temp_dir("file-purge");
196 let dir_path = dir.path().display().to_string();
197 let builder = Fs::default().root(&dir_path);
198 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
199 let sst_dir = "table1";
200
201 let index_aux_path = dir.path().join("index_aux");
202 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
203 .await
204 .unwrap();
205 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
206 .await
207 .unwrap();
208
209 let object_store = ObjectStore::new(builder).unwrap().finish();
210
211 let layer = Arc::new(AccessLayer::new(
212 sst_dir,
213 PathType::Bare,
214 object_store.clone(),
215 puffin_mgr,
216 intm_mgr,
217 ));
218 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
219 object_store.write(&path, vec![0; 4096]).await.unwrap();
220
221 let scheduler = Arc::new(LocalScheduler::new(3));
222
223 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
224
225 {
226 let handle = FileHandle::new(
227 FileMeta {
228 region_id: sst_file_id.region_id(),
229 file_id: sst_file_id.file_id(),
230 time_range: FileTimeRange::default(),
231 level: 0,
232 file_size: 4096,
233 available_indexes: Default::default(),
234 index_file_size: 0,
235 num_rows: 0,
236 num_row_groups: 0,
237 sequence: None,
238 partition_expr: None,
239 },
240 file_purger,
241 );
242 handle.mark_deleted();
244 }
245
246 scheduler.stop(true).await.unwrap();
247
248 assert!(!object_store.exists(&path).await.unwrap());
249 }
250
251 #[tokio::test]
252 async fn test_file_purge_with_index() {
253 common_telemetry::init_default_ut_logging();
254
255 let dir = create_temp_dir("file-purge");
256 let dir_path = dir.path().display().to_string();
257 let builder = Fs::default().root(&dir_path);
258 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
259 let sst_dir = "table1";
260
261 let index_aux_path = dir.path().join("index_aux");
262 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
263 .await
264 .unwrap();
265 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
266 .await
267 .unwrap();
268
269 let object_store = ObjectStore::new(builder).unwrap().finish();
270
271 let layer = Arc::new(AccessLayer::new(
272 sst_dir,
273 PathType::Bare,
274 object_store.clone(),
275 puffin_mgr,
276 intm_mgr,
277 ));
278 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
279 object_store.write(&path, vec![0; 4096]).await.unwrap();
280
281 let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
282 object_store
283 .write(&index_path, vec![0; 4096])
284 .await
285 .unwrap();
286
287 let scheduler = Arc::new(LocalScheduler::new(3));
288
289 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
290
291 {
292 let handle = FileHandle::new(
293 FileMeta {
294 region_id: sst_file_id.region_id(),
295 file_id: sst_file_id.file_id(),
296 time_range: FileTimeRange::default(),
297 level: 0,
298 file_size: 4096,
299 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
300 index_file_size: 4096,
301 num_rows: 1024,
302 num_row_groups: 1,
303 sequence: NonZeroU64::new(4096),
304 partition_expr: None,
305 },
306 file_purger,
307 );
308 handle.mark_deleted();
310 }
311
312 scheduler.stop(true).await.unwrap();
313
314 assert!(!object_store.exists(&path).await.unwrap());
315 assert!(!object_store.exists(&index_path).await.unwrap());
316 }
317}