1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info, warn};
25use futures::{FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34
35use crate::cache::FILE_TYPE;
36use crate::error::{OpenDalSnafu, Result};
37use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
38use crate::sst::parquet::helper::fetch_byte_ranges;
39use crate::sst::parquet::metadata::MetadataLoader;
40
41const FILE_DIR: &str = "cache/object/write/";
45
46#[derive(Debug)]
49pub(crate) struct FileCache {
50 local_store: ObjectStore,
52 memory_index: Cache<IndexKey, IndexValue>,
56}
57
58pub(crate) type FileCacheRef = Arc<FileCache>;
59
60impl FileCache {
61 pub(crate) fn new(
63 local_store: ObjectStore,
64 capacity: ReadableSize,
65 ttl: Option<Duration>,
66 ) -> FileCache {
67 let cache_store = local_store.clone();
68 let mut builder = Cache::builder()
69 .eviction_policy(EvictionPolicy::lru())
70 .weigher(|_key, value: &IndexValue| -> u32 {
71 value.file_size
73 })
74 .max_capacity(capacity.as_bytes())
75 .async_eviction_listener(move |key, value, cause| {
76 let store = cache_store.clone();
77 let file_path = cache_file_path(FILE_DIR, *key);
79 async move {
80 if let RemovalCause::Replaced = cause {
81 CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
84 warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
85 return;
86 }
87
88 match store.delete(&file_path).await {
89 Ok(()) => {
90 CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
91 }
92 Err(e) => {
93 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
94 }
95 }
96 }
97 .boxed()
98 });
99 if let Some(ttl) = ttl {
100 builder = builder.time_to_idle(ttl);
101 }
102 let memory_index = builder.build();
103 FileCache {
104 local_store,
105 memory_index,
106 }
107 }
108
109 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
113 CACHE_BYTES
114 .with_label_values(&[FILE_TYPE])
115 .add(value.file_size.into());
116 self.memory_index.insert(key, value).await;
117
118 self.memory_index.run_pending_tasks().await;
120 }
121
122 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
123 self.memory_index.get(&key).await
124 }
125
126 #[allow(unused)]
128 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
129 if self.memory_index.get(&key).await.is_none() {
132 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
133 return None;
134 }
135
136 let file_path = self.cache_file_path(key);
137 match self.get_reader(&file_path).await {
138 Ok(Some(reader)) => {
139 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
140 return Some(reader);
141 }
142 Err(e) => {
143 if e.kind() != ErrorKind::NotFound {
144 warn!(e; "Failed to get file for key {:?}", key);
145 }
146 }
147 Ok(None) => {}
148 }
149
150 self.memory_index.remove(&key).await;
152 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
153 None
154 }
155
156 pub(crate) async fn read_ranges(
158 &self,
159 key: IndexKey,
160 ranges: &[Range<u64>],
161 ) -> Option<Vec<Bytes>> {
162 if self.memory_index.get(&key).await.is_none() {
163 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
164 return None;
165 }
166
167 let file_path = self.cache_file_path(key);
168 let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
171 match bytes_result {
172 Ok(bytes) => {
173 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
174 Some(bytes)
175 }
176 Err(e) => {
177 if e.kind() != ErrorKind::NotFound {
178 warn!(e; "Failed to get file for key {:?}", key);
179 }
180
181 self.memory_index.remove(&key).await;
183 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
184 None
185 }
186 }
187 }
188
189 pub(crate) async fn remove(&self, key: IndexKey) {
193 let file_path = self.cache_file_path(key);
194 self.memory_index.remove(&key).await;
195 if let Err(e) = self.local_store.delete(&file_path).await {
197 warn!(e; "Failed to delete a cached file {}", file_path);
198 }
199 }
200
201 async fn recover_inner(&self) -> Result<()> {
202 let now = Instant::now();
203 let mut lister = self
204 .local_store
205 .lister_with(FILE_DIR)
206 .await
207 .context(OpenDalSnafu)?;
208 let (mut total_size, mut total_keys) = (0i64, 0);
211 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
212 let meta = entry.metadata();
213 if !meta.is_file() {
214 continue;
215 }
216 let Some(key) = parse_index_key(entry.name()) else {
217 continue;
218 };
219
220 let meta = self
221 .local_store
222 .stat(entry.path())
223 .await
224 .context(OpenDalSnafu)?;
225 let file_size = meta.content_length() as u32;
226 self.memory_index
227 .insert(key, IndexValue { file_size })
228 .await;
229 total_size += i64::from(file_size);
230 total_keys += 1;
231 }
232 CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
234
235 self.memory_index.run_pending_tasks().await;
238
239 info!(
240 "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
241 total_keys,
242 total_size,
243 self.memory_index.weighted_size(),
244 now.elapsed()
245 );
246 Ok(())
247 }
248
249 pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
251 let moved_self = self.clone();
252 let handle = tokio::spawn(async move {
253 if let Err(err) = moved_self.recover_inner().await {
254 error!(err; "Failed to recover file cache.")
255 }
256 });
257
258 if sync {
259 let _ = handle.await;
260 }
261 }
262
263 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
265 cache_file_path(FILE_DIR, key)
266 }
267
268 pub(crate) fn local_store(&self) -> ObjectStore {
270 self.local_store.clone()
271 }
272
273 pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
276 if let Some(index_value) = self.memory_index.get(&key).await {
278 let local_store = self.local_store();
280 let file_path = self.cache_file_path(key);
281 let file_size = index_value.file_size as u64;
282 let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
283
284 match metadata_loader.load().await {
285 Ok(metadata) => {
286 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
287 Some(metadata)
288 }
289 Err(e) => {
290 if !e.is_object_not_found() {
291 warn!(
292 e; "Failed to get parquet metadata for key {:?}",
293 key
294 );
295 }
296 self.memory_index.remove(&key).await;
298 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
299 None
300 }
301 }
302 } else {
303 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
304 None
305 }
306 }
307
308 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
309 if self.local_store.exists(file_path).await? {
310 Ok(Some(self.local_store.reader(file_path).await?))
311 } else {
312 Ok(None)
313 }
314 }
315
316 #[cfg(test)]
318 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
319 self.memory_index.contains_key(key)
320 }
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
325pub(crate) struct IndexKey {
326 pub region_id: RegionId,
327 pub file_id: FileId,
328 pub file_type: FileType,
329}
330
331impl IndexKey {
332 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
334 IndexKey {
335 region_id,
336 file_id,
337 file_type,
338 }
339 }
340}
341
342impl fmt::Display for IndexKey {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 write!(
345 f,
346 "{}.{}.{}",
347 self.region_id.as_u64(),
348 self.file_id,
349 self.file_type.as_str()
350 )
351 }
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
356pub enum FileType {
357 Parquet,
359 Puffin,
361}
362
363impl FileType {
364 fn parse(s: &str) -> Option<FileType> {
366 match s {
367 "parquet" => Some(FileType::Parquet),
368 "puffin" => Some(FileType::Puffin),
369 _ => None,
370 }
371 }
372
373 fn as_str(&self) -> &'static str {
375 match self {
376 FileType::Parquet => "parquet",
377 FileType::Puffin => "puffin",
378 }
379 }
380}
381
382#[derive(Debug, Clone)]
386pub(crate) struct IndexValue {
387 pub(crate) file_size: u32,
389}
390
391fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
395 join_path(cache_file_dir, &key.to_string())
396}
397
398fn parse_index_key(name: &str) -> Option<IndexKey> {
400 let mut split = name.splitn(3, '.');
401 let region_id = split.next().and_then(|s| {
402 let id = s.parse::<u64>().ok()?;
403 Some(RegionId::from_u64(id))
404 })?;
405 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
406 let file_type = split.next().and_then(FileType::parse)?;
407
408 Some(IndexKey::new(region_id, file_id, file_type))
409}
410
411#[cfg(test)]
412mod tests {
413 use common_test_util::temp_dir::create_temp_dir;
414 use object_store::services::Fs;
415
416 use super::*;
417
418 fn new_fs_store(path: &str) -> ObjectStore {
419 let builder = Fs::default().root(path);
420 ObjectStore::new(builder).unwrap().finish()
421 }
422
423 #[tokio::test]
424 async fn test_file_cache_ttl() {
425 let dir = create_temp_dir("");
426 let local_store = new_fs_store(dir.path().to_str().unwrap());
427
428 let cache = FileCache::new(
429 local_store.clone(),
430 ReadableSize::mb(10),
431 Some(Duration::from_millis(10)),
432 );
433 let region_id = RegionId::new(2000, 0);
434 let file_id = FileId::random();
435 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
436 let file_path = cache.cache_file_path(key);
437
438 assert!(cache.reader(key).await.is_none());
440
441 local_store
443 .write(&file_path, b"hello".as_slice())
444 .await
445 .unwrap();
446
447 cache
449 .put(
450 IndexKey::new(region_id, file_id, FileType::Parquet),
451 IndexValue { file_size: 5 },
452 )
453 .await;
454
455 let exist = cache.reader(key).await;
456 assert!(exist.is_some());
457 tokio::time::sleep(Duration::from_millis(15)).await;
458 cache.memory_index.run_pending_tasks().await;
459 let non = cache.reader(key).await;
460 assert!(non.is_none());
461 }
462
463 #[tokio::test]
464 async fn test_file_cache_basic() {
465 let dir = create_temp_dir("");
466 let local_store = new_fs_store(dir.path().to_str().unwrap());
467
468 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
469 let region_id = RegionId::new(2000, 0);
470 let file_id = FileId::random();
471 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
472 let file_path = cache.cache_file_path(key);
473
474 assert!(cache.reader(key).await.is_none());
476
477 local_store
479 .write(&file_path, b"hello".as_slice())
480 .await
481 .unwrap();
482 cache
484 .put(
485 IndexKey::new(region_id, file_id, FileType::Parquet),
486 IndexValue { file_size: 5 },
487 )
488 .await;
489
490 let reader = cache.reader(key).await.unwrap();
492 let buf = reader.read(..).await.unwrap().to_vec();
493 assert_eq!("hello", String::from_utf8(buf).unwrap());
494
495 cache.memory_index.run_pending_tasks().await;
497 assert_eq!(5, cache.memory_index.weighted_size());
498
499 cache.remove(key).await;
501 assert!(cache.reader(key).await.is_none());
502
503 cache.memory_index.run_pending_tasks().await;
505
506 assert!(!local_store.exists(&file_path).await.unwrap());
508 assert_eq!(0, cache.memory_index.weighted_size());
509 }
510
511 #[tokio::test]
512 async fn test_file_cache_file_removed() {
513 let dir = create_temp_dir("");
514 let local_store = new_fs_store(dir.path().to_str().unwrap());
515
516 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
517 let region_id = RegionId::new(2000, 0);
518 let file_id = FileId::random();
519 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
520 let file_path = cache.cache_file_path(key);
521
522 local_store
524 .write(&file_path, b"hello".as_slice())
525 .await
526 .unwrap();
527 cache
529 .put(
530 IndexKey::new(region_id, file_id, FileType::Parquet),
531 IndexValue { file_size: 5 },
532 )
533 .await;
534
535 local_store.delete(&file_path).await.unwrap();
537
538 assert!(cache.reader(key).await.is_none());
540 assert!(!cache.memory_index.contains_key(&key));
542 }
543
544 #[tokio::test]
545 async fn test_file_cache_recover() {
546 let dir = create_temp_dir("");
547 let local_store = new_fs_store(dir.path().to_str().unwrap());
548 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
549
550 let region_id = RegionId::new(2000, 0);
551 let file_type = FileType::Parquet;
552 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
554 let mut total_size = 0;
555 for (i, file_id) in file_ids.iter().enumerate() {
556 let key = IndexKey::new(region_id, *file_id, file_type);
557 let file_path = cache.cache_file_path(key);
558 let bytes = i.to_string().into_bytes();
559 local_store.write(&file_path, bytes.clone()).await.unwrap();
560
561 cache
563 .put(
564 IndexKey::new(region_id, *file_id, file_type),
565 IndexValue {
566 file_size: bytes.len() as u32,
567 },
568 )
569 .await;
570 total_size += bytes.len();
571 }
572
573 let cache = Arc::new(FileCache::new(
575 local_store.clone(),
576 ReadableSize::mb(10),
577 None,
578 ));
579 assert!(
581 cache
582 .reader(IndexKey::new(region_id, file_ids[0], file_type))
583 .await
584 .is_none()
585 );
586 cache.recover(true).await;
587
588 cache.memory_index.run_pending_tasks().await;
590 assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
591
592 for (i, file_id) in file_ids.iter().enumerate() {
593 let key = IndexKey::new(region_id, *file_id, file_type);
594 let reader = cache.reader(key).await.unwrap();
595 let buf = reader.read(..).await.unwrap().to_vec();
596 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
597 }
598 }
599
600 #[tokio::test]
601 async fn test_file_cache_read_ranges() {
602 let dir = create_temp_dir("");
603 let local_store = new_fs_store(dir.path().to_str().unwrap());
604 let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
605 let region_id = RegionId::new(2000, 0);
606 let file_id = FileId::random();
607 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
608 let file_path = file_cache.cache_file_path(key);
609 let data = b"hello greptime database";
611 local_store
612 .write(&file_path, data.as_slice())
613 .await
614 .unwrap();
615 file_cache.put(key, IndexValue { file_size: 5 }).await;
617 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
619 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
620
621 assert_eq!(4, bytes.len());
622 assert_eq!(b"hello", bytes[0].as_ref());
623 assert_eq!(b"grep", bytes[1].as_ref());
624 assert_eq!(b"data", bytes[2].as_ref());
625 assert_eq!(data, bytes[3].as_ref());
626 }
627
628 #[test]
629 fn test_cache_file_path() {
630 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
631 assert_eq!(
632 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
633 cache_file_path(
634 "test_dir",
635 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
636 )
637 );
638 assert_eq!(
639 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
640 cache_file_path(
641 "test_dir/",
642 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
643 )
644 );
645 }
646
647 #[test]
648 fn test_parse_file_name() {
649 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
650 let region_id = RegionId::new(1234, 5);
651 assert_eq!(
652 IndexKey::new(region_id, file_id, FileType::Parquet),
653 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
654 );
655 assert!(parse_index_key("").is_none());
656 assert!(parse_index_key(".").is_none());
657 assert!(parse_index_key("5299989643269").is_none());
658 assert!(parse_index_key("5299989643269.").is_none());
659 assert!(parse_index_key(".5299989643269").is_none());
660 assert!(parse_index_key("5299989643269.").is_none());
661 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
662 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
663 assert!(
664 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
665 );
666 assert!(
667 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
668 .is_none()
669 );
670 }
671}