1use std::fmt;
16use std::sync::Arc;
17
18use common_telemetry::{error, info};
19
20use crate::access_layer::AccessLayerRef;
21use crate::cache::file_cache::{FileType, IndexKey};
22use crate::cache::CacheManagerRef;
23use crate::schedule::scheduler::SchedulerRef;
24use crate::sst::file::FileMeta;
25
26#[derive(Debug)]
28pub struct PurgeRequest {
29 pub file_meta: FileMeta,
31}
32
33pub trait FilePurger: Send + Sync + fmt::Debug {
35 fn send_request(&self, request: PurgeRequest);
37}
38
39pub type FilePurgerRef = Arc<dyn FilePurger>;
40
41#[derive(Debug)]
43pub struct NoopFilePurger;
44
45impl FilePurger for NoopFilePurger {
46 fn send_request(&self, _: PurgeRequest) {
47 }
49}
50
51pub struct LocalFilePurger {
53 scheduler: SchedulerRef,
54 sst_layer: AccessLayerRef,
55 cache_manager: Option<CacheManagerRef>,
56}
57
58impl fmt::Debug for LocalFilePurger {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("LocalFilePurger")
61 .field("sst_layer", &self.sst_layer)
62 .finish()
63 }
64}
65
66impl LocalFilePurger {
67 pub fn new(
69 scheduler: SchedulerRef,
70 sst_layer: AccessLayerRef,
71 cache_manager: Option<CacheManagerRef>,
72 ) -> Self {
73 Self {
74 scheduler,
75 sst_layer,
76 cache_manager,
77 }
78 }
79}
80
81impl FilePurger for LocalFilePurger {
82 fn send_request(&self, request: PurgeRequest) {
83 let file_meta = request.file_meta;
84 let sst_layer = self.sst_layer.clone();
85
86 if let Some(cache) = &self.cache_manager {
88 cache.remove_parquet_meta_data(file_meta.file_id());
89 }
90
91 let cache_manager = self.cache_manager.clone();
92 if let Err(e) = self.scheduler.schedule(Box::pin(async move {
93 if let Err(e) = sst_layer.delete_sst(&file_meta).await {
94 error!(e; "Failed to delete SST file, file_id: {}, region: {}",
95 file_meta.file_id, file_meta.region_id);
96 } else {
97 info!(
98 "Successfully deleted SST file, file_id: {}, region: {}",
99 file_meta.file_id, file_meta.region_id
100 );
101 }
102
103 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
104 {
105 if file_meta.exists_index() {
107 write_cache
108 .remove(IndexKey::new(
109 file_meta.region_id,
110 file_meta.file_id,
111 FileType::Puffin,
112 ))
113 .await;
114 }
115 write_cache
117 .remove(IndexKey::new(
118 file_meta.region_id,
119 file_meta.file_id,
120 FileType::Parquet,
121 ))
122 .await;
123 }
124
125 if let Err(e) = sst_layer
127 .puffin_manager_factory()
128 .purge_stager(file_meta.file_id())
129 .await
130 {
131 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
132 file_meta.file_id(), file_meta.region_id);
133 }
134 })) {
135 error!(e; "Failed to schedule the file purge request");
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use std::num::NonZeroU64;
143
144 use common_test_util::temp_dir::create_temp_dir;
145 use object_store::services::Fs;
146 use object_store::ObjectStore;
147 use smallvec::SmallVec;
148 use store_api::region_request::PathType;
149 use store_api::storage::RegionId;
150
151 use super::*;
152 use crate::access_layer::AccessLayer;
153 use crate::schedule::scheduler::{LocalScheduler, Scheduler};
154 use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
155 use crate::sst::index::intermediate::IntermediateManager;
156 use crate::sst::index::puffin_manager::PuffinManagerFactory;
157 use crate::sst::location;
158
159 #[tokio::test]
160 async fn test_file_purge() {
161 common_telemetry::init_default_ut_logging();
162
163 let dir = create_temp_dir("file-purge");
164 let dir_path = dir.path().display().to_string();
165 let builder = Fs::default().root(&dir_path);
166 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
167 let sst_dir = "table1";
168
169 let index_aux_path = dir.path().join("index_aux");
170 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
171 .await
172 .unwrap();
173 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
174 .await
175 .unwrap();
176
177 let object_store = ObjectStore::new(builder).unwrap().finish();
178
179 let layer = Arc::new(AccessLayer::new(
180 sst_dir,
181 PathType::Bare,
182 object_store.clone(),
183 puffin_mgr,
184 intm_mgr,
185 ));
186 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
187 object_store.write(&path, vec![0; 4096]).await.unwrap();
188
189 let scheduler = Arc::new(LocalScheduler::new(3));
190
191 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
192
193 {
194 let handle = FileHandle::new(
195 FileMeta {
196 region_id: sst_file_id.region_id(),
197 file_id: sst_file_id.file_id(),
198 time_range: FileTimeRange::default(),
199 level: 0,
200 file_size: 4096,
201 available_indexes: Default::default(),
202 index_file_size: 0,
203 num_rows: 0,
204 num_row_groups: 0,
205 sequence: None,
206 },
207 file_purger,
208 );
209 handle.mark_deleted();
211 }
212
213 scheduler.stop(true).await.unwrap();
214
215 assert!(!object_store.exists(&path).await.unwrap());
216 }
217
218 #[tokio::test]
219 async fn test_file_purge_with_index() {
220 common_telemetry::init_default_ut_logging();
221
222 let dir = create_temp_dir("file-purge");
223 let dir_path = dir.path().display().to_string();
224 let builder = Fs::default().root(&dir_path);
225 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
226 let sst_dir = "table1";
227
228 let index_aux_path = dir.path().join("index_aux");
229 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
230 .await
231 .unwrap();
232 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
233 .await
234 .unwrap();
235
236 let object_store = ObjectStore::new(builder).unwrap().finish();
237
238 let layer = Arc::new(AccessLayer::new(
239 sst_dir,
240 PathType::Bare,
241 object_store.clone(),
242 puffin_mgr,
243 intm_mgr,
244 ));
245 let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
246 object_store.write(&path, vec![0; 4096]).await.unwrap();
247
248 let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
249 object_store
250 .write(&index_path, vec![0; 4096])
251 .await
252 .unwrap();
253
254 let scheduler = Arc::new(LocalScheduler::new(3));
255
256 let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
257
258 {
259 let handle = FileHandle::new(
260 FileMeta {
261 region_id: sst_file_id.region_id(),
262 file_id: sst_file_id.file_id(),
263 time_range: FileTimeRange::default(),
264 level: 0,
265 file_size: 4096,
266 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
267 index_file_size: 4096,
268 num_rows: 1024,
269 num_row_groups: 1,
270 sequence: NonZeroU64::new(4096),
271 },
272 file_purger,
273 );
274 handle.mark_deleted();
276 }
277
278 scheduler.stop(true).await.unwrap();
279
280 assert!(!object_store.exists(&path).await.unwrap());
281 assert!(!object_store.exists(&index_path).await.unwrap());
282 }
283}