1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info, warn};
25use futures::{FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::RegionId;
34
35use crate::cache::FILE_TYPE;
36use crate::error::{OpenDalSnafu, Result};
37use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
38use crate::sst::file::FileId;
39use crate::sst::parquet::helper::fetch_byte_ranges;
40use crate::sst::parquet::metadata::MetadataLoader;
41
42const FILE_DIR: &str = "cache/object/write/";
46
47#[derive(Debug)]
50pub(crate) struct FileCache {
51 local_store: ObjectStore,
53 memory_index: Cache<IndexKey, IndexValue>,
57}
58
59pub(crate) type FileCacheRef = Arc<FileCache>;
60
61impl FileCache {
62 pub(crate) fn new(
64 local_store: ObjectStore,
65 capacity: ReadableSize,
66 ttl: Option<Duration>,
67 ) -> FileCache {
68 let cache_store = local_store.clone();
69 let mut builder = Cache::builder()
70 .eviction_policy(EvictionPolicy::lru())
71 .weigher(|_key, value: &IndexValue| -> u32 {
72 value.file_size
74 })
75 .max_capacity(capacity.as_bytes())
76 .async_eviction_listener(move |key, value, cause| {
77 let store = cache_store.clone();
78 let file_path = cache_file_path(FILE_DIR, *key);
80 async move {
81 if let RemovalCause::Replaced = cause {
82 CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
85 warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
86 return;
87 }
88
89 match store.delete(&file_path).await {
90 Ok(()) => {
91 CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
92 }
93 Err(e) => {
94 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
95 }
96 }
97 }
98 .boxed()
99 });
100 if let Some(ttl) = ttl {
101 builder = builder.time_to_idle(ttl);
102 }
103 let memory_index = builder.build();
104 FileCache {
105 local_store,
106 memory_index,
107 }
108 }
109
110 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
114 CACHE_BYTES
115 .with_label_values(&[FILE_TYPE])
116 .add(value.file_size.into());
117 self.memory_index.insert(key, value).await;
118
119 self.memory_index.run_pending_tasks().await;
121 }
122
123 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
124 self.memory_index.get(&key).await
125 }
126
127 #[allow(unused)]
129 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
130 if self.memory_index.get(&key).await.is_none() {
133 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
134 return None;
135 }
136
137 let file_path = self.cache_file_path(key);
138 match self.get_reader(&file_path).await {
139 Ok(Some(reader)) => {
140 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
141 return Some(reader);
142 }
143 Err(e) => {
144 if e.kind() != ErrorKind::NotFound {
145 warn!(e; "Failed to get file for key {:?}", key);
146 }
147 }
148 Ok(None) => {}
149 }
150
151 self.memory_index.remove(&key).await;
153 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
154 None
155 }
156
157 pub(crate) async fn read_ranges(
159 &self,
160 key: IndexKey,
161 ranges: &[Range<u64>],
162 ) -> Option<Vec<Bytes>> {
163 if self.memory_index.get(&key).await.is_none() {
164 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
165 return None;
166 }
167
168 let file_path = self.cache_file_path(key);
169 let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
172 match bytes_result {
173 Ok(bytes) => {
174 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
175 Some(bytes)
176 }
177 Err(e) => {
178 if e.kind() != ErrorKind::NotFound {
179 warn!(e; "Failed to get file for key {:?}", key);
180 }
181
182 self.memory_index.remove(&key).await;
184 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
185 None
186 }
187 }
188 }
189
190 pub(crate) async fn remove(&self, key: IndexKey) {
194 let file_path = self.cache_file_path(key);
195 self.memory_index.remove(&key).await;
196 if let Err(e) = self.local_store.delete(&file_path).await {
198 warn!(e; "Failed to delete a cached file {}", file_path);
199 }
200 }
201
202 async fn recover_inner(&self) -> Result<()> {
203 let now = Instant::now();
204 let mut lister = self
205 .local_store
206 .lister_with(FILE_DIR)
207 .await
208 .context(OpenDalSnafu)?;
209 let (mut total_size, mut total_keys) = (0i64, 0);
212 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
213 let meta = entry.metadata();
214 if !meta.is_file() {
215 continue;
216 }
217 let Some(key) = parse_index_key(entry.name()) else {
218 continue;
219 };
220
221 let meta = self
222 .local_store
223 .stat(entry.path())
224 .await
225 .context(OpenDalSnafu)?;
226 let file_size = meta.content_length() as u32;
227 self.memory_index
228 .insert(key, IndexValue { file_size })
229 .await;
230 total_size += i64::from(file_size);
231 total_keys += 1;
232 }
233 CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
235
236 self.memory_index.run_pending_tasks().await;
239
240 info!(
241 "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
242 total_keys,
243 total_size,
244 self.memory_index.weighted_size(),
245 now.elapsed()
246 );
247 Ok(())
248 }
249
250 pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
252 let moved_self = self.clone();
253 let handle = tokio::spawn(async move {
254 if let Err(err) = moved_self.recover_inner().await {
255 error!(err; "Failed to recover file cache.")
256 }
257 });
258
259 if sync {
260 let _ = handle.await;
261 }
262 }
263
264 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
266 cache_file_path(FILE_DIR, key)
267 }
268
269 pub(crate) fn local_store(&self) -> ObjectStore {
271 self.local_store.clone()
272 }
273
274 pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
277 if let Some(index_value) = self.memory_index.get(&key).await {
279 let local_store = self.local_store();
281 let file_path = self.cache_file_path(key);
282 let file_size = index_value.file_size as u64;
283 let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
284
285 match metadata_loader.load().await {
286 Ok(metadata) => {
287 CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
288 Some(metadata)
289 }
290 Err(e) => {
291 if !e.is_object_not_found() {
292 warn!(
293 e; "Failed to get parquet metadata for key {:?}",
294 key
295 );
296 }
297 self.memory_index.remove(&key).await;
299 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
300 None
301 }
302 }
303 } else {
304 CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
305 None
306 }
307 }
308
309 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
310 if self.local_store.exists(file_path).await? {
311 Ok(Some(self.local_store.reader(file_path).await?))
312 } else {
313 Ok(None)
314 }
315 }
316
317 #[cfg(test)]
319 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
320 self.memory_index.contains_key(key)
321 }
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
326pub(crate) struct IndexKey {
327 pub region_id: RegionId,
328 pub file_id: FileId,
329 pub file_type: FileType,
330}
331
332impl IndexKey {
333 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
335 IndexKey {
336 region_id,
337 file_id,
338 file_type,
339 }
340 }
341}
342
343impl fmt::Display for IndexKey {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 write!(
346 f,
347 "{}.{}.{}",
348 self.region_id.as_u64(),
349 self.file_id,
350 self.file_type.as_str()
351 )
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
357pub enum FileType {
358 Parquet,
360 Puffin,
362}
363
364impl FileType {
365 fn parse(s: &str) -> Option<FileType> {
367 match s {
368 "parquet" => Some(FileType::Parquet),
369 "puffin" => Some(FileType::Puffin),
370 _ => None,
371 }
372 }
373
374 fn as_str(&self) -> &'static str {
376 match self {
377 FileType::Parquet => "parquet",
378 FileType::Puffin => "puffin",
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
387pub(crate) struct IndexValue {
388 pub(crate) file_size: u32,
390}
391
392fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
396 join_path(cache_file_dir, &key.to_string())
397}
398
399fn parse_index_key(name: &str) -> Option<IndexKey> {
401 let mut split = name.splitn(3, '.');
402 let region_id = split.next().and_then(|s| {
403 let id = s.parse::<u64>().ok()?;
404 Some(RegionId::from_u64(id))
405 })?;
406 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
407 let file_type = split.next().and_then(FileType::parse)?;
408
409 Some(IndexKey::new(region_id, file_id, file_type))
410}
411
412#[cfg(test)]
413mod tests {
414 use common_test_util::temp_dir::create_temp_dir;
415 use object_store::services::Fs;
416
417 use super::*;
418
419 fn new_fs_store(path: &str) -> ObjectStore {
420 let builder = Fs::default().root(path);
421 ObjectStore::new(builder).unwrap().finish()
422 }
423
424 #[tokio::test]
425 async fn test_file_cache_ttl() {
426 let dir = create_temp_dir("");
427 let local_store = new_fs_store(dir.path().to_str().unwrap());
428
429 let cache = FileCache::new(
430 local_store.clone(),
431 ReadableSize::mb(10),
432 Some(Duration::from_millis(10)),
433 );
434 let region_id = RegionId::new(2000, 0);
435 let file_id = FileId::random();
436 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
437 let file_path = cache.cache_file_path(key);
438
439 assert!(cache.reader(key).await.is_none());
441
442 local_store
444 .write(&file_path, b"hello".as_slice())
445 .await
446 .unwrap();
447
448 cache
450 .put(
451 IndexKey::new(region_id, file_id, FileType::Parquet),
452 IndexValue { file_size: 5 },
453 )
454 .await;
455
456 let exist = cache.reader(key).await;
457 assert!(exist.is_some());
458 tokio::time::sleep(Duration::from_millis(15)).await;
459 cache.memory_index.run_pending_tasks().await;
460 let non = cache.reader(key).await;
461 assert!(non.is_none());
462 }
463
464 #[tokio::test]
465 async fn test_file_cache_basic() {
466 let dir = create_temp_dir("");
467 let local_store = new_fs_store(dir.path().to_str().unwrap());
468
469 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
470 let region_id = RegionId::new(2000, 0);
471 let file_id = FileId::random();
472 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
473 let file_path = cache.cache_file_path(key);
474
475 assert!(cache.reader(key).await.is_none());
477
478 local_store
480 .write(&file_path, b"hello".as_slice())
481 .await
482 .unwrap();
483 cache
485 .put(
486 IndexKey::new(region_id, file_id, FileType::Parquet),
487 IndexValue { file_size: 5 },
488 )
489 .await;
490
491 let reader = cache.reader(key).await.unwrap();
493 let buf = reader.read(..).await.unwrap().to_vec();
494 assert_eq!("hello", String::from_utf8(buf).unwrap());
495
496 cache.memory_index.run_pending_tasks().await;
498 assert_eq!(5, cache.memory_index.weighted_size());
499
500 cache.remove(key).await;
502 assert!(cache.reader(key).await.is_none());
503
504 cache.memory_index.run_pending_tasks().await;
506
507 assert!(!local_store.exists(&file_path).await.unwrap());
509 assert_eq!(0, cache.memory_index.weighted_size());
510 }
511
512 #[tokio::test]
513 async fn test_file_cache_file_removed() {
514 let dir = create_temp_dir("");
515 let local_store = new_fs_store(dir.path().to_str().unwrap());
516
517 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
518 let region_id = RegionId::new(2000, 0);
519 let file_id = FileId::random();
520 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
521 let file_path = cache.cache_file_path(key);
522
523 local_store
525 .write(&file_path, b"hello".as_slice())
526 .await
527 .unwrap();
528 cache
530 .put(
531 IndexKey::new(region_id, file_id, FileType::Parquet),
532 IndexValue { file_size: 5 },
533 )
534 .await;
535
536 local_store.delete(&file_path).await.unwrap();
538
539 assert!(cache.reader(key).await.is_none());
541 assert!(!cache.memory_index.contains_key(&key));
543 }
544
545 #[tokio::test]
546 async fn test_file_cache_recover() {
547 let dir = create_temp_dir("");
548 let local_store = new_fs_store(dir.path().to_str().unwrap());
549 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
550
551 let region_id = RegionId::new(2000, 0);
552 let file_type = FileType::Parquet;
553 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
555 let mut total_size = 0;
556 for (i, file_id) in file_ids.iter().enumerate() {
557 let key = IndexKey::new(region_id, *file_id, file_type);
558 let file_path = cache.cache_file_path(key);
559 let bytes = i.to_string().into_bytes();
560 local_store.write(&file_path, bytes.clone()).await.unwrap();
561
562 cache
564 .put(
565 IndexKey::new(region_id, *file_id, file_type),
566 IndexValue {
567 file_size: bytes.len() as u32,
568 },
569 )
570 .await;
571 total_size += bytes.len();
572 }
573
574 let cache = Arc::new(FileCache::new(
576 local_store.clone(),
577 ReadableSize::mb(10),
578 None,
579 ));
580 assert!(cache
582 .reader(IndexKey::new(region_id, file_ids[0], file_type))
583 .await
584 .is_none());
585 cache.recover(true).await;
586
587 cache.memory_index.run_pending_tasks().await;
589 assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
590
591 for (i, file_id) in file_ids.iter().enumerate() {
592 let key = IndexKey::new(region_id, *file_id, file_type);
593 let reader = cache.reader(key).await.unwrap();
594 let buf = reader.read(..).await.unwrap().to_vec();
595 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
596 }
597 }
598
599 #[tokio::test]
600 async fn test_file_cache_read_ranges() {
601 let dir = create_temp_dir("");
602 let local_store = new_fs_store(dir.path().to_str().unwrap());
603 let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
604 let region_id = RegionId::new(2000, 0);
605 let file_id = FileId::random();
606 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
607 let file_path = file_cache.cache_file_path(key);
608 let data = b"hello greptime database";
610 local_store
611 .write(&file_path, data.as_slice())
612 .await
613 .unwrap();
614 file_cache.put(key, IndexValue { file_size: 5 }).await;
616 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
618 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
619
620 assert_eq!(4, bytes.len());
621 assert_eq!(b"hello", bytes[0].as_ref());
622 assert_eq!(b"grep", bytes[1].as_ref());
623 assert_eq!(b"data", bytes[2].as_ref());
624 assert_eq!(data, bytes[3].as_ref());
625 }
626
627 #[test]
628 fn test_cache_file_path() {
629 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
630 assert_eq!(
631 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
632 cache_file_path(
633 "test_dir",
634 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
635 )
636 );
637 assert_eq!(
638 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
639 cache_file_path(
640 "test_dir/",
641 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
642 )
643 );
644 }
645
646 #[test]
647 fn test_parse_file_name() {
648 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
649 let region_id = RegionId::new(1234, 5);
650 assert_eq!(
651 IndexKey::new(region_id, file_id, FileType::Parquet),
652 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
653 );
654 assert!(parse_index_key("").is_none());
655 assert!(parse_index_key(".").is_none());
656 assert!(parse_index_key("5299989643269").is_none());
657 assert!(parse_index_key("5299989643269.").is_none());
658 assert!(parse_index_key(".5299989643269").is_none());
659 assert!(parse_index_key("5299989643269.").is_none());
660 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
661 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
662 assert!(
663 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
664 );
665 assert!(parse_index_key(
666 "5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin"
667 )
668 .is_none());
669 }
670}