1use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39const MANIFEST_TYPE: &str = "manifest";
41
42#[derive(Debug, Clone)]
45pub struct ManifestCache {
46 local_store: ObjectStore,
48 index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53 pub async fn new(
55 local_store: ObjectStore,
56 capacity: ReadableSize,
57 ttl: Option<Duration>,
58 ) -> ManifestCache {
59 let total_capacity = capacity.as_bytes();
60
61 info!(
62 "Initializing manifest cache with capacity: {}",
63 ReadableSize(total_capacity)
64 );
65
66 let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
67
68 let cache = ManifestCache { local_store, index };
69
70 cache.recover(false).await;
72
73 cache
74 }
75
76 fn build_cache(
78 local_store: ObjectStore,
79 capacity: u64,
80 ttl: Option<Duration>,
81 ) -> Cache<String, IndexValue> {
82 let cache_store = local_store;
83 let mut builder = Cache::builder()
84 .eviction_policy(EvictionPolicy::lru())
85 .weigher(|key: &String, value: &IndexValue| -> u32 {
86 key.len() as u32 + value.file_size
87 })
88 .max_capacity(capacity)
89 .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
90 let store = cache_store.clone();
91 let file_path = join_path(MANIFEST_DIR, &key);
93 async move {
94 if let RemovalCause::Replaced = cause {
95 CACHE_BYTES
98 .with_label_values(&[MANIFEST_TYPE])
99 .sub(value.file_size.into());
100 return;
101 }
102
103 match store.delete(&file_path).await {
104 Ok(()) => {
105 CACHE_BYTES
106 .with_label_values(&[MANIFEST_TYPE])
107 .sub(value.file_size.into());
108 }
109 Err(e) => {
110 warn!(e; "Failed to delete cached manifest file {}", file_path);
111 }
112 }
113 }
114 .boxed()
115 });
116 if let Some(ttl) = ttl {
117 builder = builder.time_to_idle(ttl);
118 }
119 builder.build()
120 }
121
122 pub(crate) async fn put(&self, key: String, value: IndexValue) {
126 CACHE_BYTES
127 .with_label_values(&[MANIFEST_TYPE])
128 .add(value.file_size.into());
129 self.index.insert(key, value).await;
130
131 self.index.run_pending_tasks().await;
133 }
134
135 pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
137 self.index.get(key).await
138 }
139
140 pub(crate) async fn remove(&self, key: &str) {
142 let file_path = self.cache_file_path(key);
143 self.index.remove(key).await;
144 if let Err(e) = self.local_store.delete(&file_path).await {
146 warn!(e; "Failed to delete a cached manifest file {}", file_path);
147 }
148 }
149
150 pub(crate) async fn remove_batch(&self, keys: &[String]) {
152 if keys.is_empty() {
153 return;
154 }
155
156 for key in keys {
157 self.index.remove(key).await;
158 }
159
160 let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
161
162 if let Err(e) = self.local_store.delete_iter(file_paths).await {
163 warn!(e; "Failed to delete cached manifest files in batch");
164 }
165 }
166
167 async fn recover_inner(&self) -> Result<()> {
168 let now = Instant::now();
169 let mut lister = self
170 .local_store
171 .lister_with(MANIFEST_DIR)
172 .recursive(true)
173 .await
174 .context(OpenDalSnafu)?;
175 let (mut total_size, mut total_keys) = (0i64, 0);
176 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
177 let meta = entry.metadata();
178 if !meta.is_file() {
179 continue;
180 }
181
182 let meta = self
183 .local_store
184 .stat(entry.path())
185 .await
186 .context(OpenDalSnafu)?;
187 let file_size = meta.content_length() as u32;
188 let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
189 common_telemetry::info!("Manifest cache recover {}, size: {}", key, file_size);
190 self.index.insert(key, IndexValue { file_size }).await;
191 let size = i64::from(file_size);
192 total_size += size;
193 total_keys += 1;
194 }
195 CACHE_BYTES
196 .with_label_values(&[MANIFEST_TYPE])
197 .add(total_size);
198
199 self.index.run_pending_tasks().await;
202
203 let weight = self.index.weighted_size();
204 let count = self.index.entry_count();
205 info!(
206 "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
207 total_keys,
208 total_size,
209 count,
210 weight,
211 now.elapsed()
212 );
213 Ok(())
214 }
215
216 pub(crate) async fn recover(&self, sync: bool) {
218 let moved_self = self.clone();
219 let handle = tokio::spawn(async move {
220 if let Err(err) = moved_self.recover_inner().await {
221 error!(err; "Failed to recover manifest cache.")
222 }
223
224 moved_self.clean_empty_dirs(true).await;
225 });
226
227 if sync {
228 let _ = handle.await;
229 }
230 }
231
232 pub(crate) fn cache_file_path(&self, key: &str) -> String {
234 join_path(MANIFEST_DIR, key)
235 }
236
237 pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
240 if self.get(key).await.is_none() {
241 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
242 return None;
243 }
244
245 let cache_file_path = self.cache_file_path(key);
246 match self.local_store.read(&cache_file_path).await {
247 Ok(data) => {
248 CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
249 Some(data.to_vec())
250 }
251 Err(e) => {
252 warn!(e; "Failed to read cached manifest file {}", cache_file_path);
253 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
254 None
255 }
256 }
257 }
258
259 pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
261 let cache_file_path = self.cache_file_path(&key);
262
263 if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
264 warn!(e; "Failed to write manifest to cache {}", cache_file_path);
265 return;
266 }
267
268 let file_size = data.len() as u32;
269 self.put(key, IndexValue { file_size }).await;
270 }
271
272 pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
277 info!("Clean empty dirs start");
278
279 let root = self.local_store.info().root();
280 let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
281 let manifest_dir_clone = manifest_dir.clone();
282
283 let result = tokio::task::spawn_blocking(move || {
284 Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
285 })
286 .await;
287
288 match result {
289 Ok(Ok(())) => {
290 info!("Clean empty dirs end");
291 }
292 Ok(Err(e)) => {
293 warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
294 }
295 Err(e) => {
296 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
297 }
298 }
299 }
300
301 pub(crate) async fn clean_manifests(&self, dir: &str) {
303 info!("Clean manifest cache for directory: {}", dir);
304
305 let cache_dir = join_path(MANIFEST_DIR, dir);
306 let mut lister = match self
307 .local_store
308 .lister_with(&cache_dir)
309 .recursive(true)
310 .await
311 {
312 Ok(lister) => lister,
313 Err(e) => {
314 warn!(e; "Failed to list manifest files under {}", cache_dir);
315 return;
316 }
317 };
318
319 let mut keys_to_remove = Vec::new();
320 loop {
321 match lister.try_next().await {
322 Ok(Some(entry)) => {
323 let meta = entry.metadata();
324 if meta.is_file() {
325 keys_to_remove
326 .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
327 }
328 }
329 Ok(None) => break,
330 Err(e) => {
331 warn!(e; "Failed to read entry while listing {}", cache_dir);
332 break;
333 }
334 }
335 }
336
337 info!(
338 "Going to remove files from manifest cache, files: {:?}",
339 keys_to_remove
340 );
341
342 self.remove_batch(&keys_to_remove).await;
344
345 let root = self.local_store.info().root();
347 let dir_path = PathBuf::from(root).join(&cache_dir);
348 let dir_path_clone = dir_path.clone();
349
350 let result = tokio::task::spawn_blocking(move || {
351 Self::clean_empty_dirs_sync(&dir_path_clone, false)
352 })
353 .await;
354
355 match result {
356 Ok(Ok(())) => {
357 info!("Cleaned manifest cache for directory: {}", dir);
358 }
359 Ok(Err(e)) => {
360 warn!(e; "Failed to clean empty directories under {}", dir_path.display());
361 }
362 Err(e) => {
363 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
364 }
365 }
366 }
367
368 fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
373 Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
374 Ok(())
375 }
376
377 fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
378 common_telemetry::debug!(
379 "Maybe remove empty dir: {:?}, check_mtime: {}",
380 dir,
381 check_mtime
382 );
383 let entries = match std::fs::read_dir(dir) {
384 Ok(entries) => entries,
385 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
386 return Ok(true);
388 }
389 Err(e) => return Err(e),
390 };
391
392 let mut is_empty = true;
393 for entry in entries {
396 let entry = entry?;
397 let path = entry.path();
398 let metadata = std::fs::metadata(&path)?;
399
400 if metadata.is_dir() {
401 if check_mtime
403 && let Ok(modified) = metadata.modified()
404 && let Ok(elapsed) = modified.elapsed()
405 && elapsed < Duration::from_secs(3600)
406 {
407 common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
408 is_empty = false;
410 continue;
411 }
412
413 let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
414 if subdir_empty {
415 if let Err(e) = std::fs::remove_dir(&path)
416 && e.kind() != std::io::ErrorKind::NotFound
417 {
418 warn!(e; "Failed to remove empty directory {}", path.display());
419 is_empty = false;
420 } else {
421 info!(
422 "Removed empty directory {} from manifest cache",
423 path.display()
424 );
425 }
426 } else {
427 is_empty = false;
428 }
429 } else {
430 is_empty = false;
431 }
432 }
433
434 Ok(is_empty)
435 }
436}
437
438#[derive(Debug, Clone)]
442pub(crate) struct IndexValue {
443 pub(crate) file_size: u32,
445}
446
447#[cfg(test)]
448mod tests {
449 use common_test_util::temp_dir::create_temp_dir;
450 use object_store::services::Fs;
451
452 use super::*;
453
454 fn new_fs_store(path: &str) -> ObjectStore {
455 let builder = Fs::default().root(path);
456 ObjectStore::new(builder).unwrap().finish()
457 }
458
459 #[tokio::test]
460 async fn test_manifest_cache_basic() {
461 common_telemetry::init_default_ut_logging();
462
463 let dir = create_temp_dir("");
464 let local_store = new_fs_store(dir.path().to_str().unwrap());
465
466 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
467 let key = "region_1/manifest/00000000000000000007.json";
468 let file_path = cache.cache_file_path(key);
469
470 assert!(cache.get(key).await.is_none());
472
473 local_store
475 .write(&file_path, b"manifest content".as_slice())
476 .await
477 .unwrap();
478 cache
480 .put(key.to_string(), IndexValue { file_size: 16 })
481 .await;
482
483 let value = cache.get(key).await.unwrap();
485 assert_eq!(16, value.file_size);
486
487 cache.index.run_pending_tasks().await;
489 assert_eq!(59, cache.index.weighted_size());
490
491 cache.remove(key).await;
493 cache.index.run_pending_tasks().await;
494 assert!(cache.get(key).await.is_none());
495
496 cache.index.run_pending_tasks().await;
498
499 assert!(!local_store.exists(&file_path).await.unwrap());
501 assert_eq!(0, cache.index.weighted_size());
502 }
503
504 #[tokio::test]
505 async fn test_manifest_cache_recover() {
506 common_telemetry::init_default_ut_logging();
507
508 let dir = create_temp_dir("");
509 let local_store = new_fs_store(dir.path().to_str().unwrap());
510 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
511
512 let keys = [
514 "region_1/manifest/00000000000000000001.json",
515 "region_1/manifest/00000000000000000002.json",
516 "region_1/manifest/00000000000000000001.checkpoint",
517 "region_2/manifest/00000000000000000001.json",
518 ];
519
520 let mut total_size = 0;
521 for (i, key) in keys.iter().enumerate() {
522 let file_path = cache.cache_file_path(key);
523 let content = format!("manifest-{}", i).into_bytes();
524 local_store
525 .write(&file_path, content.clone())
526 .await
527 .unwrap();
528
529 cache
531 .put(
532 key.to_string(),
533 IndexValue {
534 file_size: content.len() as u32,
535 },
536 )
537 .await;
538 total_size += content.len() + key.len();
539 }
540
541 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
543
544 cache.recover(true).await;
546
547 cache.index.run_pending_tasks().await;
549 let total_cached = cache.index.weighted_size() as usize;
550 assert_eq!(total_size, total_cached);
551
552 for (i, key) in keys.iter().enumerate() {
554 let value = cache.get(key).await.unwrap();
555 assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
556 }
557 }
558
559 #[tokio::test]
560 async fn test_cache_file_path() {
561 let dir = create_temp_dir("");
562 let local_store = new_fs_store(dir.path().to_str().unwrap());
563 let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None).await;
564
565 assert_eq!(
566 "cache/object/manifest/region_1/manifest/00000000000000000007.json",
567 cache.cache_file_path("region_1/manifest/00000000000000000007.json")
568 );
569 assert_eq!(
570 "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
571 cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
572 );
573 }
574}