1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48const FILE_DIR: &str = "cache/object/write/";
52
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62struct DownloadTask {
64 index_key: IndexKey,
65 remote_path: String,
66 remote_store: ObjectStore,
67 file_size: u64,
68}
69
70#[derive(Debug)]
72struct FileCacheInner {
73 local_store: ObjectStore,
75 parquet_index: Cache<IndexKey, IndexValue>,
77 puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84 match file_type {
85 FileType::Parquet => &self.parquet_index,
86 FileType::Puffin { .. } => &self.puffin_index,
87 }
88 }
89
90 fn cache_file_path(&self, key: IndexKey) -> String {
92 cache_file_path(FILE_DIR, key)
93 }
94
95 async fn put(&self, key: IndexKey, value: IndexValue) {
99 CACHE_BYTES
100 .with_label_values(&[key.file_type.metric_label()])
101 .add(value.file_size.into());
102 let index = self.memory_index(key.file_type);
103 index.insert(key, value).await;
104
105 index.run_pending_tasks().await;
107 }
108
109 async fn recover(&self) -> Result<()> {
111 let now = Instant::now();
112 let mut lister = self
113 .local_store
114 .lister_with(FILE_DIR)
115 .await
116 .context(OpenDalSnafu)?;
117 let (mut total_size, mut total_keys) = (0i64, 0);
120 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122 let meta = entry.metadata();
123 if !meta.is_file() {
124 continue;
125 }
126 let Some(key) = parse_index_key(entry.name()) else {
127 continue;
128 };
129
130 let meta = self
131 .local_store
132 .stat(entry.path())
133 .await
134 .context(OpenDalSnafu)?;
135 let file_size = meta.content_length() as u32;
136 let index = self.memory_index(key.file_type);
137 index.insert(key, IndexValue { file_size }).await;
138 let size = i64::from(file_size);
139 total_size += size;
140 total_keys += 1;
141
142 match key.file_type {
144 FileType::Parquet => parquet_size += size,
145 FileType::Puffin { .. } => puffin_size += size,
146 }
147 }
148 CACHE_BYTES
150 .with_label_values(&[FILE_TYPE])
151 .add(parquet_size);
152 CACHE_BYTES
153 .with_label_values(&[INDEX_TYPE])
154 .add(puffin_size);
155
156 self.parquet_index.run_pending_tasks().await;
159 self.puffin_index.run_pending_tasks().await;
160
161 let parquet_weight = self.parquet_index.weighted_size();
162 let parquet_count = self.parquet_index.entry_count();
163 let puffin_weight = self.puffin_index.weighted_size();
164 let puffin_count = self.puffin_index.entry_count();
165 info!(
166 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167 total_keys,
168 total_size,
169 parquet_count,
170 parquet_weight,
171 puffin_count,
172 puffin_weight,
173 now.elapsed()
174 );
175 Ok(())
176 }
177
178 async fn download_without_cleaning(
180 &self,
181 index_key: IndexKey,
182 remote_path: &str,
183 remote_store: &ObjectStore,
184 file_size: u64,
185 concurrency: usize,
186 ) -> Result<()> {
187 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189 let file_type = index_key.file_type;
190 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191 .with_label_values(&[match file_type {
192 FileType::Parquet => "download_parquet",
193 FileType::Puffin { .. } => "download_puffin",
194 }])
195 .start_timer();
196
197 let reader = remote_store
198 .reader_with(remote_path)
199 .concurrent(concurrency)
200 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201 .await
202 .context(error::OpenDalSnafu)?
203 .into_futures_async_read(0..file_size)
204 .await
205 .context(error::OpenDalSnafu)?;
206
207 let cache_path = self.cache_file_path(index_key);
208 let mut writer = self
209 .local_store
210 .writer(&cache_path)
211 .await
212 .context(error::OpenDalSnafu)?
213 .into_futures_async_write();
214
215 let region_id = index_key.region_id;
216 let file_id = index_key.file_id;
217 let bytes_written =
218 futures::io::copy(reader, &mut writer)
219 .await
220 .context(error::DownloadSnafu {
221 region_id,
222 file_id,
223 file_type,
224 })?;
225 writer.close().await.context(error::DownloadSnafu {
226 region_id,
227 file_id,
228 file_type,
229 })?;
230
231 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233 let elapsed = timer.stop_and_record();
234 debug!(
235 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236 remote_path, cache_path, bytes_written, region_id, elapsed,
237 );
238
239 let index_value = IndexValue {
240 file_size: bytes_written as _,
241 };
242 self.put(index_key, index_value).await;
243 Ok(())
244 }
245
246 async fn download(
248 &self,
249 index_key: IndexKey,
250 remote_path: &str,
251 remote_store: &ObjectStore,
252 file_size: u64,
253 concurrency: usize,
254 ) -> Result<()> {
255 if let Err(e) = self
256 .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257 .await
258 {
259 error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261 let filename = index_key.to_string();
262 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264 return Err(e);
265 }
266
267 Ok(())
268 }
269
270 fn contains_key(&self, key: &IndexKey) -> bool {
272 self.memory_index(key.file_type).contains_key(key)
273 }
274}
275
276#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280 inner: Arc<FileCacheInner>,
282 puffin_capacity: u64,
284 download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291 fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) {
294 let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100;
295 let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2);
296 let puffin_capacity =
297 desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity);
298 let parquet_capacity = total_capacity - puffin_capacity;
299 (parquet_capacity, puffin_capacity)
300 }
301
302 pub(crate) fn new(
304 local_store: ObjectStore,
305 capacity: ReadableSize,
306 ttl: Option<Duration>,
307 index_cache_percent: Option<u8>,
308 enable_background_worker: bool,
309 ) -> FileCache {
310 let index_percent = index_cache_percent
312 .filter(|&percent| percent > 0 && percent < 100)
313 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
314 let total_capacity = capacity.as_bytes();
315
316 let (parquet_capacity, puffin_capacity) =
317 Self::split_cache_capacities(total_capacity, index_percent);
318
319 info!(
320 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
321 index_percent,
322 ReadableSize(total_capacity),
323 ReadableSize(parquet_capacity),
324 ReadableSize(puffin_capacity)
325 );
326
327 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
328 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
329
330 let inner = Arc::new(FileCacheInner {
332 local_store,
333 parquet_index,
334 puffin_index,
335 });
336
337 let download_task_tx = if enable_background_worker {
339 let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
340 Self::spawn_download_worker(inner.clone(), rx);
341 Some(tx)
342 } else {
343 None
344 };
345
346 FileCache {
347 inner,
348 puffin_capacity,
349 download_task_tx,
350 }
351 }
352
353 fn spawn_download_worker(
355 inner: Arc<FileCacheInner>,
356 mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
357 ) {
358 tokio::spawn(async move {
359 info!("Background download worker started");
360 while let Some(task) = download_task_rx.recv().await {
361 if inner.contains_key(&task.index_key) {
363 debug!(
364 "Skipping background download for region {}, file {} - already in cache",
365 task.index_key.region_id, task.index_key.file_id
366 );
367 continue;
368 }
369
370 let _ = inner
372 .download(
373 task.index_key,
374 &task.remote_path,
375 &task.remote_store,
376 task.file_size,
377 1, )
379 .await;
380 }
381 info!("Background download worker stopped");
382 });
383 }
384
385 fn build_cache(
387 local_store: ObjectStore,
388 capacity: u64,
389 ttl: Option<Duration>,
390 label: &'static str,
391 ) -> Cache<IndexKey, IndexValue> {
392 let cache_store = local_store;
393 let mut builder = Cache::builder()
394 .eviction_policy(EvictionPolicy::lru())
395 .weigher(|_key, value: &IndexValue| -> u32 {
396 value.file_size
398 })
399 .max_capacity(capacity)
400 .async_eviction_listener(move |key, value, cause| {
401 let store = cache_store.clone();
402 let file_path = cache_file_path(FILE_DIR, *key);
404 async move {
405 if let RemovalCause::Replaced = cause {
406 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
409 return;
410 }
411
412 match store.delete(&file_path).await {
413 Ok(()) => {
414 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
415 }
416 Err(e) => {
417 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
418 }
419 }
420 }
421 .boxed()
422 });
423 if let Some(ttl) = ttl {
424 builder = builder.time_to_idle(ttl);
425 }
426 builder.build()
427 }
428
429 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
433 self.inner.put(key, value).await
434 }
435
436 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
437 self.inner.memory_index(key.file_type).get(&key).await
438 }
439
440 #[allow(unused)]
442 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
443 let index = self.inner.memory_index(key.file_type);
446 if index.get(&key).await.is_none() {
447 CACHE_MISS
448 .with_label_values(&[key.file_type.metric_label()])
449 .inc();
450 return None;
451 }
452
453 let file_path = self.inner.cache_file_path(key);
454 match self.get_reader(&file_path).await {
455 Ok(Some(reader)) => {
456 CACHE_HIT
457 .with_label_values(&[key.file_type.metric_label()])
458 .inc();
459 return Some(reader);
460 }
461 Err(e) => {
462 if e.kind() != ErrorKind::NotFound {
463 warn!(e; "Failed to get file for key {:?}", key);
464 }
465 }
466 Ok(None) => {}
467 }
468
469 index.remove(&key).await;
471 CACHE_MISS
472 .with_label_values(&[key.file_type.metric_label()])
473 .inc();
474 None
475 }
476
477 pub(crate) async fn read_ranges(
479 &self,
480 key: IndexKey,
481 ranges: &[Range<u64>],
482 ) -> Option<Vec<Bytes>> {
483 let index = self.inner.memory_index(key.file_type);
484 if index.get(&key).await.is_none() {
485 CACHE_MISS
486 .with_label_values(&[key.file_type.metric_label()])
487 .inc();
488 return None;
489 }
490
491 let file_path = self.inner.cache_file_path(key);
492 let bytes_result =
495 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
496 match bytes_result {
497 Ok(bytes) => {
498 CACHE_HIT
499 .with_label_values(&[key.file_type.metric_label()])
500 .inc();
501 Some(bytes)
502 }
503 Err(e) => {
504 if e.kind() != ErrorKind::NotFound {
505 warn!(e; "Failed to get file for key {:?}", key);
506 }
507
508 index.remove(&key).await;
510 CACHE_MISS
511 .with_label_values(&[key.file_type.metric_label()])
512 .inc();
513 None
514 }
515 }
516 }
517
518 pub(crate) async fn remove(&self, key: IndexKey) {
522 let file_path = self.inner.cache_file_path(key);
523 self.inner.memory_index(key.file_type).remove(&key).await;
524 if let Err(e) = self.inner.local_store.delete(&file_path).await {
526 warn!(e; "Failed to delete a cached file {}", file_path);
527 }
528 }
529
530 pub(crate) async fn recover(
535 &self,
536 sync: bool,
537 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
538 ) {
539 let moved_self = self.clone();
540 let handle = tokio::spawn(async move {
541 if let Err(err) = moved_self.inner.recover().await {
542 error!(err; "Failed to recover file cache.")
543 }
544
545 if let Some(mut receiver) = task_receiver {
548 info!("Spawning background task for processing region load cache tasks");
549 tokio::spawn(async move {
550 while let Some(task) = receiver.recv().await {
551 task.fill_cache(&moved_self).await;
552 }
553 info!("Background task for processing region load cache tasks stopped");
554 });
555 }
556 });
557
558 if sync {
559 let _ = handle.await;
560 }
561 }
562
563 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
565 self.inner.cache_file_path(key)
566 }
567
568 pub(crate) fn local_store(&self) -> ObjectStore {
570 self.inner.local_store.clone()
571 }
572
573 pub(crate) async fn get_parquet_meta_data(
576 &self,
577 key: IndexKey,
578 cache_metrics: &mut MetadataCacheMetrics,
579 page_index_policy: PageIndexPolicy,
580 ) -> Option<ParquetMetaData> {
581 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
583 let local_store = self.local_store();
585 let file_path = self.inner.cache_file_path(key);
586 let file_size = index_value.file_size as u64;
587 let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
588 metadata_loader.with_page_index_policy(page_index_policy);
589
590 match metadata_loader.load(cache_metrics).await {
591 Ok(metadata) => {
592 CACHE_HIT
593 .with_label_values(&[key.file_type.metric_label()])
594 .inc();
595 Some(metadata)
596 }
597 Err(e) => {
598 if !e.is_object_not_found() {
599 warn!(
600 e; "Failed to get parquet metadata for key {:?}",
601 key
602 );
603 }
604 self.inner.parquet_index.remove(&key).await;
606 CACHE_MISS
607 .with_label_values(&[key.file_type.metric_label()])
608 .inc();
609 None
610 }
611 }
612 } else {
613 CACHE_MISS
614 .with_label_values(&[key.file_type.metric_label()])
615 .inc();
616 None
617 }
618 }
619
620 pub(crate) async fn get_sst_meta_data(
623 &self,
624 key: IndexKey,
625 cache_metrics: &mut MetadataCacheMetrics,
626 page_index_policy: PageIndexPolicy,
627 ) -> Option<Arc<CachedSstMeta>> {
628 let file_path = self.inner.cache_file_path(key);
629 self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
630 .await
631 .and_then(
632 |metadata| match CachedSstMeta::try_new(&file_path, metadata) {
633 Ok(metadata) => Some(Arc::new(metadata)),
634 Err(err) => {
635 CACHE_MISS
636 .with_label_values(&[key.file_type.metric_label()])
637 .inc();
638 warn!(
639 err; "Failed to decode cached parquet metadata for key {:?}",
640 key
641 );
642 None
643 }
644 },
645 )
646 }
647
648 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
649 if self.inner.local_store.exists(file_path).await? {
650 Ok(Some(self.inner.local_store.reader(file_path).await?))
651 } else {
652 Ok(None)
653 }
654 }
655
656 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
658 self.inner.contains_key(key)
659 }
660
661 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
663 self.puffin_capacity
664 }
665
666 pub(crate) fn puffin_cache_size(&self) -> u64 {
668 self.inner.puffin_index.weighted_size()
669 }
670
671 pub(crate) async fn download(
674 &self,
675 index_key: IndexKey,
676 remote_path: &str,
677 remote_store: &ObjectStore,
678 file_size: u64,
679 ) -> Result<()> {
680 self.inner
681 .download(index_key, remote_path, remote_store, file_size, 8) .await
683 }
684
685 pub(crate) fn maybe_download_background(
691 &self,
692 index_key: IndexKey,
693 remote_path: String,
694 remote_store: ObjectStore,
695 file_size: u64,
696 ) {
697 let Some(tx) = &self.download_task_tx else {
699 return;
700 };
701
702 let task = DownloadTask {
703 index_key,
704 remote_path,
705 remote_store,
706 file_size,
707 };
708
709 if let Err(e) = tx.try_send(task) {
711 debug!(
712 "Failed to queue background download task for region {}, file {}: {:?}",
713 index_key.region_id, index_key.file_id, e
714 );
715 }
716 }
717}
718
719#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
721pub struct IndexKey {
722 pub region_id: RegionId,
723 pub file_id: FileId,
724 pub file_type: FileType,
725}
726
727impl IndexKey {
728 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
730 IndexKey {
731 region_id,
732 file_id,
733 file_type,
734 }
735 }
736}
737
738impl fmt::Display for IndexKey {
739 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740 write!(
741 f,
742 "{}.{}.{}",
743 self.region_id.as_u64(),
744 self.file_id,
745 self.file_type
746 )
747 }
748}
749
750#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
752pub enum FileType {
753 Parquet,
755 Puffin(u64),
757}
758
759impl fmt::Display for FileType {
760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761 match self {
762 FileType::Parquet => write!(f, "parquet"),
763 FileType::Puffin(version) => write!(f, "{}.puffin", version),
764 }
765 }
766}
767
768impl FileType {
769 pub(crate) fn parse(s: &str) -> Option<FileType> {
771 match s {
772 "parquet" => Some(FileType::Parquet),
773 "puffin" => Some(FileType::Puffin(0)),
774 _ => {
775 if let Some(version_str) = s.strip_suffix(".puffin") {
777 let version = version_str.parse::<u64>().ok()?;
778 Some(FileType::Puffin(version))
779 } else {
780 None
781 }
782 }
783 }
784 }
785
786 fn metric_label(&self) -> &'static str {
788 match self {
789 FileType::Parquet => FILE_TYPE,
790 FileType::Puffin(_) => INDEX_TYPE,
791 }
792 }
793}
794
795#[derive(Debug, Clone)]
799pub(crate) struct IndexValue {
800 pub(crate) file_size: u32,
802}
803
804fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
808 join_path(cache_file_dir, &key.to_string())
809}
810
811fn parse_index_key(name: &str) -> Option<IndexKey> {
813 let mut split = name.splitn(3, '.');
814 let region_id = split.next().and_then(|s| {
815 let id = s.parse::<u64>().ok()?;
816 Some(RegionId::from_u64(id))
817 })?;
818 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
819 let file_type = split.next().and_then(FileType::parse)?;
820
821 Some(IndexKey::new(region_id, file_id, file_type))
822}
823
824#[cfg(test)]
825mod tests {
826 use common_test_util::temp_dir::create_temp_dir;
827 use object_store::services::Fs;
828
829 use super::*;
830
831 fn new_fs_store(path: &str) -> ObjectStore {
832 let builder = Fs::default().root(path);
833 ObjectStore::new(builder).unwrap().finish()
834 }
835
836 #[tokio::test]
837 async fn test_file_cache_ttl() {
838 let dir = create_temp_dir("");
839 let local_store = new_fs_store(dir.path().to_str().unwrap());
840
841 let cache = FileCache::new(
842 local_store.clone(),
843 ReadableSize::mb(10),
844 Some(Duration::from_millis(10)),
845 None,
846 true, );
848 let region_id = RegionId::new(2000, 0);
849 let file_id = FileId::random();
850 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
851 let file_path = cache.cache_file_path(key);
852
853 assert!(cache.reader(key).await.is_none());
855
856 local_store
858 .write(&file_path, b"hello".as_slice())
859 .await
860 .unwrap();
861
862 cache
864 .put(
865 IndexKey::new(region_id, file_id, FileType::Parquet),
866 IndexValue { file_size: 5 },
867 )
868 .await;
869
870 let exist = cache.reader(key).await;
871 assert!(exist.is_some());
872 tokio::time::sleep(Duration::from_millis(15)).await;
873 cache.inner.parquet_index.run_pending_tasks().await;
874 let non = cache.reader(key).await;
875 assert!(non.is_none());
876 }
877
878 #[tokio::test]
879 async fn test_file_cache_basic() {
880 let dir = create_temp_dir("");
881 let local_store = new_fs_store(dir.path().to_str().unwrap());
882
883 let cache = FileCache::new(
884 local_store.clone(),
885 ReadableSize::mb(10),
886 None,
887 None,
888 true, );
890 let region_id = RegionId::new(2000, 0);
891 let file_id = FileId::random();
892 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
893 let file_path = cache.cache_file_path(key);
894
895 assert!(cache.reader(key).await.is_none());
897
898 local_store
900 .write(&file_path, b"hello".as_slice())
901 .await
902 .unwrap();
903 cache
905 .put(
906 IndexKey::new(region_id, file_id, FileType::Parquet),
907 IndexValue { file_size: 5 },
908 )
909 .await;
910
911 let reader = cache.reader(key).await.unwrap();
913 let buf = reader.read(..).await.unwrap().to_vec();
914 assert_eq!("hello", String::from_utf8(buf).unwrap());
915
916 cache.inner.parquet_index.run_pending_tasks().await;
918 assert_eq!(5, cache.inner.parquet_index.weighted_size());
919
920 cache.remove(key).await;
922 assert!(cache.reader(key).await.is_none());
923
924 cache.inner.parquet_index.run_pending_tasks().await;
926
927 assert!(!local_store.exists(&file_path).await.unwrap());
929 assert_eq!(0, cache.inner.parquet_index.weighted_size());
930 }
931
932 #[tokio::test]
933 async fn test_file_cache_file_removed() {
934 let dir = create_temp_dir("");
935 let local_store = new_fs_store(dir.path().to_str().unwrap());
936
937 let cache = FileCache::new(
938 local_store.clone(),
939 ReadableSize::mb(10),
940 None,
941 None,
942 true, );
944 let region_id = RegionId::new(2000, 0);
945 let file_id = FileId::random();
946 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
947 let file_path = cache.cache_file_path(key);
948
949 local_store
951 .write(&file_path, b"hello".as_slice())
952 .await
953 .unwrap();
954 cache
956 .put(
957 IndexKey::new(region_id, file_id, FileType::Parquet),
958 IndexValue { file_size: 5 },
959 )
960 .await;
961
962 local_store.delete(&file_path).await.unwrap();
964
965 assert!(cache.reader(key).await.is_none());
967 assert!(!cache.inner.parquet_index.contains_key(&key));
969 }
970
971 #[tokio::test]
972 async fn test_file_cache_recover() {
973 let dir = create_temp_dir("");
974 let local_store = new_fs_store(dir.path().to_str().unwrap());
975 let cache = FileCache::new(
976 local_store.clone(),
977 ReadableSize::mb(10),
978 None,
979 None,
980 true, );
982
983 let region_id = RegionId::new(2000, 0);
984 let file_type = FileType::Parquet;
985 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
987 let mut total_size = 0;
988 for (i, file_id) in file_ids.iter().enumerate() {
989 let key = IndexKey::new(region_id, *file_id, file_type);
990 let file_path = cache.cache_file_path(key);
991 let bytes = i.to_string().into_bytes();
992 local_store.write(&file_path, bytes.clone()).await.unwrap();
993
994 cache
996 .put(
997 IndexKey::new(region_id, *file_id, file_type),
998 IndexValue {
999 file_size: bytes.len() as u32,
1000 },
1001 )
1002 .await;
1003 total_size += bytes.len();
1004 }
1005
1006 let cache = FileCache::new(
1008 local_store.clone(),
1009 ReadableSize::mb(10),
1010 None,
1011 None,
1012 true, );
1014 assert!(
1016 cache
1017 .reader(IndexKey::new(region_id, file_ids[0], file_type))
1018 .await
1019 .is_none()
1020 );
1021 cache.recover(true, None).await;
1022
1023 cache.inner.parquet_index.run_pending_tasks().await;
1025 assert_eq!(
1026 total_size,
1027 cache.inner.parquet_index.weighted_size() as usize
1028 );
1029
1030 for (i, file_id) in file_ids.iter().enumerate() {
1031 let key = IndexKey::new(region_id, *file_id, file_type);
1032 let reader = cache.reader(key).await.unwrap();
1033 let buf = reader.read(..).await.unwrap().to_vec();
1034 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1035 }
1036 }
1037
1038 #[tokio::test]
1039 async fn test_file_cache_read_ranges() {
1040 let dir = create_temp_dir("");
1041 let local_store = new_fs_store(dir.path().to_str().unwrap());
1042 let file_cache = FileCache::new(
1043 local_store.clone(),
1044 ReadableSize::mb(10),
1045 None,
1046 None,
1047 true, );
1049 let region_id = RegionId::new(2000, 0);
1050 let file_id = FileId::random();
1051 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1052 let file_path = file_cache.cache_file_path(key);
1053 let data = b"hello greptime database";
1055 local_store
1056 .write(&file_path, data.as_slice())
1057 .await
1058 .unwrap();
1059 file_cache.put(key, IndexValue { file_size: 5 }).await;
1061 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1063 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1064
1065 assert_eq!(4, bytes.len());
1066 assert_eq!(b"hello", bytes[0].as_ref());
1067 assert_eq!(b"grep", bytes[1].as_ref());
1068 assert_eq!(b"data", bytes[2].as_ref());
1069 assert_eq!(data, bytes[3].as_ref());
1070 }
1071
1072 #[test]
1073 fn test_file_cache_capacity_respects_total_budget() {
1074 let total_capacity = ReadableSize::mb(256).as_bytes();
1075 let (parquet_capacity, puffin_capacity) =
1076 FileCache::split_cache_capacities(total_capacity, 20);
1077
1078 assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1079 assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity);
1080 assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity);
1081 }
1082
1083 #[test]
1084 fn test_file_cache_capacity_keeps_split_when_total_allows_it() {
1085 let total_capacity = ReadableSize::gb(5).as_bytes();
1086 let (parquet_capacity, puffin_capacity) =
1087 FileCache::split_cache_capacities(total_capacity, 20);
1088
1089 assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1090 assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity);
1091 assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity);
1092 }
1093
1094 #[test]
1095 fn test_cache_file_path() {
1096 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1097 assert_eq!(
1098 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1099 cache_file_path(
1100 "test_dir",
1101 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1102 )
1103 );
1104 assert_eq!(
1105 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1106 cache_file_path(
1107 "test_dir/",
1108 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1109 )
1110 );
1111 }
1112
1113 #[test]
1114 fn test_parse_file_name() {
1115 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1116 let region_id = RegionId::new(1234, 5);
1117 assert_eq!(
1118 IndexKey::new(region_id, file_id, FileType::Parquet),
1119 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1120 );
1121 assert_eq!(
1122 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1123 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1124 );
1125 assert_eq!(
1126 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1127 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1128 .unwrap()
1129 );
1130 assert!(parse_index_key("").is_none());
1131 assert!(parse_index_key(".").is_none());
1132 assert!(parse_index_key("5299989643269").is_none());
1133 assert!(parse_index_key("5299989643269.").is_none());
1134 assert!(parse_index_key(".5299989643269").is_none());
1135 assert!(parse_index_key("5299989643269.").is_none());
1136 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1137 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1138 assert!(
1139 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1140 );
1141 assert!(
1142 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1143 .is_none()
1144 );
1145 }
1146}