1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48const FILE_DIR: &str = "cache/object/write/";
52
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62struct DownloadTask {
64 index_key: IndexKey,
65 remote_path: String,
66 remote_store: ObjectStore,
67 file_size: u64,
68}
69
70#[derive(Debug)]
72struct FileCacheInner {
73 local_store: ObjectStore,
75 parquet_index: Cache<IndexKey, IndexValue>,
77 puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84 match file_type {
85 FileType::Parquet => &self.parquet_index,
86 FileType::Puffin { .. } => &self.puffin_index,
87 }
88 }
89
90 fn cache_file_path(&self, key: IndexKey) -> String {
92 cache_file_path(FILE_DIR, key)
93 }
94
95 async fn put(&self, key: IndexKey, value: IndexValue) {
99 CACHE_BYTES
100 .with_label_values(&[key.file_type.metric_label()])
101 .add(value.file_size.into());
102 let index = self.memory_index(key.file_type);
103 index.insert(key, value).await;
104
105 index.run_pending_tasks().await;
107 }
108
109 async fn recover(&self) -> Result<()> {
111 let now = Instant::now();
112 let mut lister = self
113 .local_store
114 .lister_with(FILE_DIR)
115 .await
116 .context(OpenDalSnafu)?;
117 let (mut total_size, mut total_keys) = (0i64, 0);
120 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122 let meta = entry.metadata();
123 if !meta.is_file() {
124 continue;
125 }
126 let Some(key) = parse_index_key(entry.name()) else {
127 continue;
128 };
129
130 let meta = self
131 .local_store
132 .stat(entry.path())
133 .await
134 .context(OpenDalSnafu)?;
135 let file_size = meta.content_length() as u32;
136 let index = self.memory_index(key.file_type);
137 index.insert(key, IndexValue { file_size }).await;
138 let size = i64::from(file_size);
139 total_size += size;
140 total_keys += 1;
141
142 match key.file_type {
144 FileType::Parquet => parquet_size += size,
145 FileType::Puffin { .. } => puffin_size += size,
146 }
147 }
148 CACHE_BYTES
150 .with_label_values(&[FILE_TYPE])
151 .add(parquet_size);
152 CACHE_BYTES
153 .with_label_values(&[INDEX_TYPE])
154 .add(puffin_size);
155
156 self.parquet_index.run_pending_tasks().await;
159 self.puffin_index.run_pending_tasks().await;
160
161 let parquet_weight = self.parquet_index.weighted_size();
162 let parquet_count = self.parquet_index.entry_count();
163 let puffin_weight = self.puffin_index.weighted_size();
164 let puffin_count = self.puffin_index.entry_count();
165 info!(
166 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167 total_keys,
168 total_size,
169 parquet_count,
170 parquet_weight,
171 puffin_count,
172 puffin_weight,
173 now.elapsed()
174 );
175 Ok(())
176 }
177
178 async fn download_without_cleaning(
180 &self,
181 index_key: IndexKey,
182 remote_path: &str,
183 remote_store: &ObjectStore,
184 file_size: u64,
185 concurrency: usize,
186 ) -> Result<()> {
187 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189 let file_type = index_key.file_type;
190 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191 .with_label_values(&[match file_type {
192 FileType::Parquet => "download_parquet",
193 FileType::Puffin { .. } => "download_puffin",
194 }])
195 .start_timer();
196
197 let reader = remote_store
198 .reader_with(remote_path)
199 .concurrent(concurrency)
200 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201 .await
202 .context(error::OpenDalSnafu)?
203 .into_futures_async_read(0..file_size)
204 .await
205 .context(error::OpenDalSnafu)?;
206
207 let cache_path = self.cache_file_path(index_key);
208 let mut writer = self
209 .local_store
210 .writer(&cache_path)
211 .await
212 .context(error::OpenDalSnafu)?
213 .into_futures_async_write();
214
215 let region_id = index_key.region_id;
216 let file_id = index_key.file_id;
217 let bytes_written =
218 futures::io::copy(reader, &mut writer)
219 .await
220 .context(error::DownloadSnafu {
221 region_id,
222 file_id,
223 file_type,
224 })?;
225 writer.close().await.context(error::DownloadSnafu {
226 region_id,
227 file_id,
228 file_type,
229 })?;
230
231 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233 let elapsed = timer.stop_and_record();
234 debug!(
235 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236 remote_path, cache_path, bytes_written, region_id, elapsed,
237 );
238
239 let index_value = IndexValue {
240 file_size: bytes_written as _,
241 };
242 self.put(index_key, index_value).await;
243 Ok(())
244 }
245
246 async fn download(
248 &self,
249 index_key: IndexKey,
250 remote_path: &str,
251 remote_store: &ObjectStore,
252 file_size: u64,
253 concurrency: usize,
254 ) -> Result<()> {
255 if let Err(e) = self
256 .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257 .await
258 {
259 error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261 let filename = index_key.to_string();
262 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264 return Err(e);
265 }
266
267 Ok(())
268 }
269
270 fn contains_key(&self, key: &IndexKey) -> bool {
272 self.memory_index(key.file_type).contains_key(key)
273 }
274}
275
276#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280 inner: Arc<FileCacheInner>,
282 puffin_capacity: u64,
284 download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291 pub(crate) fn new(
293 local_store: ObjectStore,
294 capacity: ReadableSize,
295 ttl: Option<Duration>,
296 index_cache_percent: Option<u8>,
297 enable_background_worker: bool,
298 ) -> FileCache {
299 let index_percent = index_cache_percent
301 .filter(|&percent| percent > 0 && percent < 100)
302 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
303 let total_capacity = capacity.as_bytes();
304
305 let index_ratio = index_percent as f64 / 100.0;
307 let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
308 let parquet_capacity = total_capacity - puffin_capacity;
309
310 let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
312 let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
313
314 info!(
315 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
316 index_percent,
317 ReadableSize(total_capacity),
318 ReadableSize(parquet_capacity),
319 ReadableSize(puffin_capacity)
320 );
321
322 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
323 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
324
325 let inner = Arc::new(FileCacheInner {
327 local_store,
328 parquet_index,
329 puffin_index,
330 });
331
332 let download_task_tx = if enable_background_worker {
334 let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
335 Self::spawn_download_worker(inner.clone(), rx);
336 Some(tx)
337 } else {
338 None
339 };
340
341 FileCache {
342 inner,
343 puffin_capacity,
344 download_task_tx,
345 }
346 }
347
348 fn spawn_download_worker(
350 inner: Arc<FileCacheInner>,
351 mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
352 ) {
353 tokio::spawn(async move {
354 info!("Background download worker started");
355 while let Some(task) = download_task_rx.recv().await {
356 if inner.contains_key(&task.index_key) {
358 debug!(
359 "Skipping background download for region {}, file {} - already in cache",
360 task.index_key.region_id, task.index_key.file_id
361 );
362 continue;
363 }
364
365 let _ = inner
367 .download(
368 task.index_key,
369 &task.remote_path,
370 &task.remote_store,
371 task.file_size,
372 1, )
374 .await;
375 }
376 info!("Background download worker stopped");
377 });
378 }
379
380 fn build_cache(
382 local_store: ObjectStore,
383 capacity: u64,
384 ttl: Option<Duration>,
385 label: &'static str,
386 ) -> Cache<IndexKey, IndexValue> {
387 let cache_store = local_store;
388 let mut builder = Cache::builder()
389 .eviction_policy(EvictionPolicy::lru())
390 .weigher(|_key, value: &IndexValue| -> u32 {
391 value.file_size
393 })
394 .max_capacity(capacity)
395 .async_eviction_listener(move |key, value, cause| {
396 let store = cache_store.clone();
397 let file_path = cache_file_path(FILE_DIR, *key);
399 async move {
400 if let RemovalCause::Replaced = cause {
401 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
404 return;
405 }
406
407 match store.delete(&file_path).await {
408 Ok(()) => {
409 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
410 }
411 Err(e) => {
412 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
413 }
414 }
415 }
416 .boxed()
417 });
418 if let Some(ttl) = ttl {
419 builder = builder.time_to_idle(ttl);
420 }
421 builder.build()
422 }
423
424 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
428 self.inner.put(key, value).await
429 }
430
431 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
432 self.inner.memory_index(key.file_type).get(&key).await
433 }
434
435 #[allow(unused)]
437 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
438 let index = self.inner.memory_index(key.file_type);
441 if index.get(&key).await.is_none() {
442 CACHE_MISS
443 .with_label_values(&[key.file_type.metric_label()])
444 .inc();
445 return None;
446 }
447
448 let file_path = self.inner.cache_file_path(key);
449 match self.get_reader(&file_path).await {
450 Ok(Some(reader)) => {
451 CACHE_HIT
452 .with_label_values(&[key.file_type.metric_label()])
453 .inc();
454 return Some(reader);
455 }
456 Err(e) => {
457 if e.kind() != ErrorKind::NotFound {
458 warn!(e; "Failed to get file for key {:?}", key);
459 }
460 }
461 Ok(None) => {}
462 }
463
464 index.remove(&key).await;
466 CACHE_MISS
467 .with_label_values(&[key.file_type.metric_label()])
468 .inc();
469 None
470 }
471
472 pub(crate) async fn read_ranges(
474 &self,
475 key: IndexKey,
476 ranges: &[Range<u64>],
477 ) -> Option<Vec<Bytes>> {
478 let index = self.inner.memory_index(key.file_type);
479 if index.get(&key).await.is_none() {
480 CACHE_MISS
481 .with_label_values(&[key.file_type.metric_label()])
482 .inc();
483 return None;
484 }
485
486 let file_path = self.inner.cache_file_path(key);
487 let bytes_result =
490 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
491 match bytes_result {
492 Ok(bytes) => {
493 CACHE_HIT
494 .with_label_values(&[key.file_type.metric_label()])
495 .inc();
496 Some(bytes)
497 }
498 Err(e) => {
499 if e.kind() != ErrorKind::NotFound {
500 warn!(e; "Failed to get file for key {:?}", key);
501 }
502
503 index.remove(&key).await;
505 CACHE_MISS
506 .with_label_values(&[key.file_type.metric_label()])
507 .inc();
508 None
509 }
510 }
511 }
512
513 pub(crate) async fn remove(&self, key: IndexKey) {
517 let file_path = self.inner.cache_file_path(key);
518 self.inner.memory_index(key.file_type).remove(&key).await;
519 if let Err(e) = self.inner.local_store.delete(&file_path).await {
521 warn!(e; "Failed to delete a cached file {}", file_path);
522 }
523 }
524
525 pub(crate) async fn recover(
530 &self,
531 sync: bool,
532 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
533 ) {
534 let moved_self = self.clone();
535 let handle = tokio::spawn(async move {
536 if let Err(err) = moved_self.inner.recover().await {
537 error!(err; "Failed to recover file cache.")
538 }
539
540 if let Some(mut receiver) = task_receiver {
543 info!("Spawning background task for processing region load cache tasks");
544 tokio::spawn(async move {
545 while let Some(task) = receiver.recv().await {
546 task.fill_cache(&moved_self).await;
547 }
548 info!("Background task for processing region load cache tasks stopped");
549 });
550 }
551 });
552
553 if sync {
554 let _ = handle.await;
555 }
556 }
557
558 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
560 self.inner.cache_file_path(key)
561 }
562
563 pub(crate) fn local_store(&self) -> ObjectStore {
565 self.inner.local_store.clone()
566 }
567
568 pub(crate) async fn get_parquet_meta_data(
571 &self,
572 key: IndexKey,
573 cache_metrics: &mut MetadataCacheMetrics,
574 page_index_policy: PageIndexPolicy,
575 ) -> Option<ParquetMetaData> {
576 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
578 let local_store = self.local_store();
580 let file_path = self.inner.cache_file_path(key);
581 let file_size = index_value.file_size as u64;
582 let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
583 metadata_loader.with_page_index_policy(page_index_policy);
584
585 match metadata_loader.load(cache_metrics).await {
586 Ok(metadata) => {
587 CACHE_HIT
588 .with_label_values(&[key.file_type.metric_label()])
589 .inc();
590 Some(metadata)
591 }
592 Err(e) => {
593 if !e.is_object_not_found() {
594 warn!(
595 e; "Failed to get parquet metadata for key {:?}",
596 key
597 );
598 }
599 self.inner.parquet_index.remove(&key).await;
601 CACHE_MISS
602 .with_label_values(&[key.file_type.metric_label()])
603 .inc();
604 None
605 }
606 }
607 } else {
608 CACHE_MISS
609 .with_label_values(&[key.file_type.metric_label()])
610 .inc();
611 None
612 }
613 }
614
615 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
616 if self.inner.local_store.exists(file_path).await? {
617 Ok(Some(self.inner.local_store.reader(file_path).await?))
618 } else {
619 Ok(None)
620 }
621 }
622
623 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
625 self.inner.contains_key(key)
626 }
627
628 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
630 self.puffin_capacity
631 }
632
633 pub(crate) fn puffin_cache_size(&self) -> u64 {
635 self.inner.puffin_index.weighted_size()
636 }
637
638 pub(crate) async fn download(
641 &self,
642 index_key: IndexKey,
643 remote_path: &str,
644 remote_store: &ObjectStore,
645 file_size: u64,
646 ) -> Result<()> {
647 self.inner
648 .download(index_key, remote_path, remote_store, file_size, 8) .await
650 }
651
652 pub(crate) fn maybe_download_background(
658 &self,
659 index_key: IndexKey,
660 remote_path: String,
661 remote_store: ObjectStore,
662 file_size: u64,
663 ) {
664 let Some(tx) = &self.download_task_tx else {
666 return;
667 };
668
669 let task = DownloadTask {
670 index_key,
671 remote_path,
672 remote_store,
673 file_size,
674 };
675
676 if let Err(e) = tx.try_send(task) {
678 debug!(
679 "Failed to queue background download task for region {}, file {}: {:?}",
680 index_key.region_id, index_key.file_id, e
681 );
682 }
683 }
684}
685
686#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
688pub struct IndexKey {
689 pub region_id: RegionId,
690 pub file_id: FileId,
691 pub file_type: FileType,
692}
693
694impl IndexKey {
695 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
697 IndexKey {
698 region_id,
699 file_id,
700 file_type,
701 }
702 }
703}
704
705impl fmt::Display for IndexKey {
706 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707 write!(
708 f,
709 "{}.{}.{}",
710 self.region_id.as_u64(),
711 self.file_id,
712 self.file_type
713 )
714 }
715}
716
717#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
719pub enum FileType {
720 Parquet,
722 Puffin(u64),
724}
725
726impl fmt::Display for FileType {
727 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728 match self {
729 FileType::Parquet => write!(f, "parquet"),
730 FileType::Puffin(version) => write!(f, "{}.puffin", version),
731 }
732 }
733}
734
735impl FileType {
736 pub(crate) fn parse(s: &str) -> Option<FileType> {
738 match s {
739 "parquet" => Some(FileType::Parquet),
740 "puffin" => Some(FileType::Puffin(0)),
741 _ => {
742 if let Some(version_str) = s.strip_suffix(".puffin") {
744 let version = version_str.parse::<u64>().ok()?;
745 Some(FileType::Puffin(version))
746 } else {
747 None
748 }
749 }
750 }
751 }
752
753 fn metric_label(&self) -> &'static str {
755 match self {
756 FileType::Parquet => FILE_TYPE,
757 FileType::Puffin(_) => INDEX_TYPE,
758 }
759 }
760}
761
762#[derive(Debug, Clone)]
766pub(crate) struct IndexValue {
767 pub(crate) file_size: u32,
769}
770
771fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
775 join_path(cache_file_dir, &key.to_string())
776}
777
778fn parse_index_key(name: &str) -> Option<IndexKey> {
780 let mut split = name.splitn(3, '.');
781 let region_id = split.next().and_then(|s| {
782 let id = s.parse::<u64>().ok()?;
783 Some(RegionId::from_u64(id))
784 })?;
785 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
786 let file_type = split.next().and_then(FileType::parse)?;
787
788 Some(IndexKey::new(region_id, file_id, file_type))
789}
790
791#[cfg(test)]
792mod tests {
793 use common_test_util::temp_dir::create_temp_dir;
794 use object_store::services::Fs;
795
796 use super::*;
797
798 fn new_fs_store(path: &str) -> ObjectStore {
799 let builder = Fs::default().root(path);
800 ObjectStore::new(builder).unwrap().finish()
801 }
802
803 #[tokio::test]
804 async fn test_file_cache_ttl() {
805 let dir = create_temp_dir("");
806 let local_store = new_fs_store(dir.path().to_str().unwrap());
807
808 let cache = FileCache::new(
809 local_store.clone(),
810 ReadableSize::mb(10),
811 Some(Duration::from_millis(10)),
812 None,
813 true, );
815 let region_id = RegionId::new(2000, 0);
816 let file_id = FileId::random();
817 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
818 let file_path = cache.cache_file_path(key);
819
820 assert!(cache.reader(key).await.is_none());
822
823 local_store
825 .write(&file_path, b"hello".as_slice())
826 .await
827 .unwrap();
828
829 cache
831 .put(
832 IndexKey::new(region_id, file_id, FileType::Parquet),
833 IndexValue { file_size: 5 },
834 )
835 .await;
836
837 let exist = cache.reader(key).await;
838 assert!(exist.is_some());
839 tokio::time::sleep(Duration::from_millis(15)).await;
840 cache.inner.parquet_index.run_pending_tasks().await;
841 let non = cache.reader(key).await;
842 assert!(non.is_none());
843 }
844
845 #[tokio::test]
846 async fn test_file_cache_basic() {
847 let dir = create_temp_dir("");
848 let local_store = new_fs_store(dir.path().to_str().unwrap());
849
850 let cache = FileCache::new(
851 local_store.clone(),
852 ReadableSize::mb(10),
853 None,
854 None,
855 true, );
857 let region_id = RegionId::new(2000, 0);
858 let file_id = FileId::random();
859 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
860 let file_path = cache.cache_file_path(key);
861
862 assert!(cache.reader(key).await.is_none());
864
865 local_store
867 .write(&file_path, b"hello".as_slice())
868 .await
869 .unwrap();
870 cache
872 .put(
873 IndexKey::new(region_id, file_id, FileType::Parquet),
874 IndexValue { file_size: 5 },
875 )
876 .await;
877
878 let reader = cache.reader(key).await.unwrap();
880 let buf = reader.read(..).await.unwrap().to_vec();
881 assert_eq!("hello", String::from_utf8(buf).unwrap());
882
883 cache.inner.parquet_index.run_pending_tasks().await;
885 assert_eq!(5, cache.inner.parquet_index.weighted_size());
886
887 cache.remove(key).await;
889 assert!(cache.reader(key).await.is_none());
890
891 cache.inner.parquet_index.run_pending_tasks().await;
893
894 assert!(!local_store.exists(&file_path).await.unwrap());
896 assert_eq!(0, cache.inner.parquet_index.weighted_size());
897 }
898
899 #[tokio::test]
900 async fn test_file_cache_file_removed() {
901 let dir = create_temp_dir("");
902 let local_store = new_fs_store(dir.path().to_str().unwrap());
903
904 let cache = FileCache::new(
905 local_store.clone(),
906 ReadableSize::mb(10),
907 None,
908 None,
909 true, );
911 let region_id = RegionId::new(2000, 0);
912 let file_id = FileId::random();
913 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
914 let file_path = cache.cache_file_path(key);
915
916 local_store
918 .write(&file_path, b"hello".as_slice())
919 .await
920 .unwrap();
921 cache
923 .put(
924 IndexKey::new(region_id, file_id, FileType::Parquet),
925 IndexValue { file_size: 5 },
926 )
927 .await;
928
929 local_store.delete(&file_path).await.unwrap();
931
932 assert!(cache.reader(key).await.is_none());
934 assert!(!cache.inner.parquet_index.contains_key(&key));
936 }
937
938 #[tokio::test]
939 async fn test_file_cache_recover() {
940 let dir = create_temp_dir("");
941 let local_store = new_fs_store(dir.path().to_str().unwrap());
942 let cache = FileCache::new(
943 local_store.clone(),
944 ReadableSize::mb(10),
945 None,
946 None,
947 true, );
949
950 let region_id = RegionId::new(2000, 0);
951 let file_type = FileType::Parquet;
952 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
954 let mut total_size = 0;
955 for (i, file_id) in file_ids.iter().enumerate() {
956 let key = IndexKey::new(region_id, *file_id, file_type);
957 let file_path = cache.cache_file_path(key);
958 let bytes = i.to_string().into_bytes();
959 local_store.write(&file_path, bytes.clone()).await.unwrap();
960
961 cache
963 .put(
964 IndexKey::new(region_id, *file_id, file_type),
965 IndexValue {
966 file_size: bytes.len() as u32,
967 },
968 )
969 .await;
970 total_size += bytes.len();
971 }
972
973 let cache = FileCache::new(
975 local_store.clone(),
976 ReadableSize::mb(10),
977 None,
978 None,
979 true, );
981 assert!(
983 cache
984 .reader(IndexKey::new(region_id, file_ids[0], file_type))
985 .await
986 .is_none()
987 );
988 cache.recover(true, None).await;
989
990 cache.inner.parquet_index.run_pending_tasks().await;
992 assert_eq!(
993 total_size,
994 cache.inner.parquet_index.weighted_size() as usize
995 );
996
997 for (i, file_id) in file_ids.iter().enumerate() {
998 let key = IndexKey::new(region_id, *file_id, file_type);
999 let reader = cache.reader(key).await.unwrap();
1000 let buf = reader.read(..).await.unwrap().to_vec();
1001 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1002 }
1003 }
1004
1005 #[tokio::test]
1006 async fn test_file_cache_read_ranges() {
1007 let dir = create_temp_dir("");
1008 let local_store = new_fs_store(dir.path().to_str().unwrap());
1009 let file_cache = FileCache::new(
1010 local_store.clone(),
1011 ReadableSize::mb(10),
1012 None,
1013 None,
1014 true, );
1016 let region_id = RegionId::new(2000, 0);
1017 let file_id = FileId::random();
1018 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1019 let file_path = file_cache.cache_file_path(key);
1020 let data = b"hello greptime database";
1022 local_store
1023 .write(&file_path, data.as_slice())
1024 .await
1025 .unwrap();
1026 file_cache.put(key, IndexValue { file_size: 5 }).await;
1028 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1030 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1031
1032 assert_eq!(4, bytes.len());
1033 assert_eq!(b"hello", bytes[0].as_ref());
1034 assert_eq!(b"grep", bytes[1].as_ref());
1035 assert_eq!(b"data", bytes[2].as_ref());
1036 assert_eq!(data, bytes[3].as_ref());
1037 }
1038
1039 #[test]
1040 fn test_cache_file_path() {
1041 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1042 assert_eq!(
1043 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1044 cache_file_path(
1045 "test_dir",
1046 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1047 )
1048 );
1049 assert_eq!(
1050 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1051 cache_file_path(
1052 "test_dir/",
1053 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1054 )
1055 );
1056 }
1057
1058 #[test]
1059 fn test_parse_file_name() {
1060 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1061 let region_id = RegionId::new(1234, 5);
1062 assert_eq!(
1063 IndexKey::new(region_id, file_id, FileType::Parquet),
1064 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1065 );
1066 assert_eq!(
1067 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1068 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1069 );
1070 assert_eq!(
1071 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1072 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1073 .unwrap()
1074 );
1075 assert!(parse_index_key("").is_none());
1076 assert!(parse_index_key(".").is_none());
1077 assert!(parse_index_key("5299989643269").is_none());
1078 assert!(parse_index_key("5299989643269.").is_none());
1079 assert!(parse_index_key(".5299989643269").is_none());
1080 assert!(parse_index_key("5299989643269.").is_none());
1081 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1082 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1083 assert!(
1084 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1085 );
1086 assert!(
1087 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1088 .is_none()
1089 );
1090 }
1091}