1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::UnboundedReceiver;
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47const FILE_DIR: &str = "cache/object/write/";
51
52pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58#[derive(Debug)]
60struct FileCacheInner {
61 local_store: ObjectStore,
63 parquet_index: Cache<IndexKey, IndexValue>,
65 puffin_index: Cache<IndexKey, IndexValue>,
67}
68
69impl FileCacheInner {
70 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
72 match file_type {
73 FileType::Parquet => &self.parquet_index,
74 FileType::Puffin { .. } => &self.puffin_index,
75 }
76 }
77
78 fn cache_file_path(&self, key: IndexKey) -> String {
80 cache_file_path(FILE_DIR, key)
81 }
82
83 async fn put(&self, key: IndexKey, value: IndexValue) {
87 CACHE_BYTES
88 .with_label_values(&[key.file_type.metric_label()])
89 .add(value.file_size.into());
90 let index = self.memory_index(key.file_type);
91 index.insert(key, value).await;
92
93 index.run_pending_tasks().await;
95 }
96
97 async fn recover(&self) -> Result<()> {
99 let now = Instant::now();
100 let mut lister = self
101 .local_store
102 .lister_with(FILE_DIR)
103 .await
104 .context(OpenDalSnafu)?;
105 let (mut total_size, mut total_keys) = (0i64, 0);
108 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
109 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
110 let meta = entry.metadata();
111 if !meta.is_file() {
112 continue;
113 }
114 let Some(key) = parse_index_key(entry.name()) else {
115 continue;
116 };
117
118 let meta = self
119 .local_store
120 .stat(entry.path())
121 .await
122 .context(OpenDalSnafu)?;
123 let file_size = meta.content_length() as u32;
124 let index = self.memory_index(key.file_type);
125 index.insert(key, IndexValue { file_size }).await;
126 let size = i64::from(file_size);
127 total_size += size;
128 total_keys += 1;
129
130 match key.file_type {
132 FileType::Parquet => parquet_size += size,
133 FileType::Puffin { .. } => puffin_size += size,
134 }
135 }
136 CACHE_BYTES
138 .with_label_values(&[FILE_TYPE])
139 .add(parquet_size);
140 CACHE_BYTES
141 .with_label_values(&[INDEX_TYPE])
142 .add(puffin_size);
143
144 self.parquet_index.run_pending_tasks().await;
147 self.puffin_index.run_pending_tasks().await;
148
149 let parquet_weight = self.parquet_index.weighted_size();
150 let parquet_count = self.parquet_index.entry_count();
151 let puffin_weight = self.puffin_index.weighted_size();
152 let puffin_count = self.puffin_index.entry_count();
153 info!(
154 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
155 total_keys,
156 total_size,
157 parquet_count,
158 parquet_weight,
159 puffin_count,
160 puffin_weight,
161 now.elapsed()
162 );
163 Ok(())
164 }
165
166 async fn download_without_cleaning(
168 &self,
169 index_key: IndexKey,
170 remote_path: &str,
171 remote_store: &ObjectStore,
172 file_size: u64,
173 ) -> Result<()> {
174 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
175 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
176
177 let file_type = index_key.file_type;
178 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
179 .with_label_values(&[match file_type {
180 FileType::Parquet => "download_parquet",
181 FileType::Puffin { .. } => "download_puffin",
182 }])
183 .start_timer();
184
185 let reader = remote_store
186 .reader_with(remote_path)
187 .concurrent(DOWNLOAD_READER_CONCURRENCY)
188 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
189 .await
190 .context(error::OpenDalSnafu)?
191 .into_futures_async_read(0..file_size)
192 .await
193 .context(error::OpenDalSnafu)?;
194
195 let cache_path = self.cache_file_path(index_key);
196 let mut writer = self
197 .local_store
198 .writer(&cache_path)
199 .await
200 .context(error::OpenDalSnafu)?
201 .into_futures_async_write();
202
203 let region_id = index_key.region_id;
204 let file_id = index_key.file_id;
205 let bytes_written =
206 futures::io::copy(reader, &mut writer)
207 .await
208 .context(error::DownloadSnafu {
209 region_id,
210 file_id,
211 file_type,
212 })?;
213 writer.close().await.context(error::DownloadSnafu {
214 region_id,
215 file_id,
216 file_type,
217 })?;
218
219 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
220
221 let elapsed = timer.stop_and_record();
222 debug!(
223 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
224 remote_path, cache_path, bytes_written, region_id, elapsed,
225 );
226
227 let index_value = IndexValue {
228 file_size: bytes_written as _,
229 };
230 self.put(index_key, index_value).await;
231 Ok(())
232 }
233
234 async fn download(
236 &self,
237 index_key: IndexKey,
238 remote_path: &str,
239 remote_store: &ObjectStore,
240 file_size: u64,
241 ) -> Result<()> {
242 if let Err(e) = self
243 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
244 .await
245 {
246 let filename = index_key.to_string();
247 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
248
249 return Err(e);
250 }
251
252 Ok(())
253 }
254}
255
256#[derive(Debug, Clone)]
259pub(crate) struct FileCache {
260 inner: Arc<FileCacheInner>,
262 puffin_capacity: u64,
264}
265
266pub(crate) type FileCacheRef = Arc<FileCache>;
267
268impl FileCache {
269 pub(crate) fn new(
271 local_store: ObjectStore,
272 capacity: ReadableSize,
273 ttl: Option<Duration>,
274 index_cache_percent: Option<u8>,
275 ) -> FileCache {
276 let index_percent = index_cache_percent
278 .filter(|&percent| percent > 0 && percent < 100)
279 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
280 let total_capacity = capacity.as_bytes();
281
282 let index_ratio = index_percent as f64 / 100.0;
284 let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
285 let parquet_capacity = total_capacity - puffin_capacity;
286
287 let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
289 let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
290
291 info!(
292 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
293 index_percent,
294 ReadableSize(total_capacity),
295 ReadableSize(parquet_capacity),
296 ReadableSize(puffin_capacity)
297 );
298
299 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
300 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
301
302 let inner = Arc::new(FileCacheInner {
304 local_store,
305 parquet_index,
306 puffin_index,
307 });
308
309 FileCache {
310 inner,
311 puffin_capacity,
312 }
313 }
314
315 fn build_cache(
317 local_store: ObjectStore,
318 capacity: u64,
319 ttl: Option<Duration>,
320 label: &'static str,
321 ) -> Cache<IndexKey, IndexValue> {
322 let cache_store = local_store;
323 let mut builder = Cache::builder()
324 .eviction_policy(EvictionPolicy::lru())
325 .weigher(|_key, value: &IndexValue| -> u32 {
326 value.file_size
328 })
329 .max_capacity(capacity)
330 .async_eviction_listener(move |key, value, cause| {
331 let store = cache_store.clone();
332 let file_path = cache_file_path(FILE_DIR, *key);
334 async move {
335 if let RemovalCause::Replaced = cause {
336 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
339 warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
341 return;
342 }
343
344 match store.delete(&file_path).await {
345 Ok(()) => {
346 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
347 }
348 Err(e) => {
349 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
350 }
351 }
352 }
353 .boxed()
354 });
355 if let Some(ttl) = ttl {
356 builder = builder.time_to_idle(ttl);
357 }
358 builder.build()
359 }
360
361 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
365 self.inner.put(key, value).await
366 }
367
368 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
369 self.inner.memory_index(key.file_type).get(&key).await
370 }
371
372 #[allow(unused)]
374 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
375 let index = self.inner.memory_index(key.file_type);
378 if index.get(&key).await.is_none() {
379 CACHE_MISS
380 .with_label_values(&[key.file_type.metric_label()])
381 .inc();
382 return None;
383 }
384
385 let file_path = self.inner.cache_file_path(key);
386 match self.get_reader(&file_path).await {
387 Ok(Some(reader)) => {
388 CACHE_HIT
389 .with_label_values(&[key.file_type.metric_label()])
390 .inc();
391 return Some(reader);
392 }
393 Err(e) => {
394 if e.kind() != ErrorKind::NotFound {
395 warn!(e; "Failed to get file for key {:?}", key);
396 }
397 }
398 Ok(None) => {}
399 }
400
401 index.remove(&key).await;
403 CACHE_MISS
404 .with_label_values(&[key.file_type.metric_label()])
405 .inc();
406 None
407 }
408
409 pub(crate) async fn read_ranges(
411 &self,
412 key: IndexKey,
413 ranges: &[Range<u64>],
414 ) -> Option<Vec<Bytes>> {
415 let index = self.inner.memory_index(key.file_type);
416 if index.get(&key).await.is_none() {
417 CACHE_MISS
418 .with_label_values(&[key.file_type.metric_label()])
419 .inc();
420 return None;
421 }
422
423 let file_path = self.inner.cache_file_path(key);
424 let bytes_result =
427 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
428 match bytes_result {
429 Ok(bytes) => {
430 CACHE_HIT
431 .with_label_values(&[key.file_type.metric_label()])
432 .inc();
433 Some(bytes)
434 }
435 Err(e) => {
436 if e.kind() != ErrorKind::NotFound {
437 warn!(e; "Failed to get file for key {:?}", key);
438 }
439
440 index.remove(&key).await;
442 CACHE_MISS
443 .with_label_values(&[key.file_type.metric_label()])
444 .inc();
445 None
446 }
447 }
448 }
449
450 pub(crate) async fn remove(&self, key: IndexKey) {
454 let file_path = self.inner.cache_file_path(key);
455 self.inner.memory_index(key.file_type).remove(&key).await;
456 if let Err(e) = self.inner.local_store.delete(&file_path).await {
458 warn!(e; "Failed to delete a cached file {}", file_path);
459 }
460 }
461
462 pub(crate) async fn recover(
467 &self,
468 sync: bool,
469 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
470 ) {
471 let moved_self = self.clone();
472 let handle = tokio::spawn(async move {
473 if let Err(err) = moved_self.inner.recover().await {
474 error!(err; "Failed to recover file cache.")
475 }
476
477 if let Some(mut receiver) = task_receiver {
480 info!("Spawning background task for processing region load cache tasks");
481 tokio::spawn(async move {
482 while let Some(task) = receiver.recv().await {
483 task.fill_cache(&moved_self).await;
484 }
485 info!("Background task for processing region load cache tasks stopped");
486 });
487 }
488 });
489
490 if sync {
491 let _ = handle.await;
492 }
493 }
494
495 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
497 self.inner.cache_file_path(key)
498 }
499
500 pub(crate) fn local_store(&self) -> ObjectStore {
502 self.inner.local_store.clone()
503 }
504
505 pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
508 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
510 let local_store = self.local_store();
512 let file_path = self.inner.cache_file_path(key);
513 let file_size = index_value.file_size as u64;
514 let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
515
516 match metadata_loader.load().await {
517 Ok(metadata) => {
518 CACHE_HIT
519 .with_label_values(&[key.file_type.metric_label()])
520 .inc();
521 Some(metadata)
522 }
523 Err(e) => {
524 if !e.is_object_not_found() {
525 warn!(
526 e; "Failed to get parquet metadata for key {:?}",
527 key
528 );
529 }
530 self.inner.parquet_index.remove(&key).await;
532 CACHE_MISS
533 .with_label_values(&[key.file_type.metric_label()])
534 .inc();
535 None
536 }
537 }
538 } else {
539 CACHE_MISS
540 .with_label_values(&[key.file_type.metric_label()])
541 .inc();
542 None
543 }
544 }
545
546 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
547 if self.inner.local_store.exists(file_path).await? {
548 Ok(Some(self.inner.local_store.reader(file_path).await?))
549 } else {
550 Ok(None)
551 }
552 }
553
554 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
556 self.inner.memory_index(key.file_type).contains_key(key)
557 }
558
559 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
561 self.puffin_capacity
562 }
563
564 pub(crate) fn puffin_cache_size(&self) -> u64 {
566 self.inner.puffin_index.weighted_size()
567 }
568
569 pub(crate) async fn download(
572 &self,
573 index_key: IndexKey,
574 remote_path: &str,
575 remote_store: &ObjectStore,
576 file_size: u64,
577 ) -> Result<()> {
578 self.inner
579 .download(index_key, remote_path, remote_store, file_size)
580 .await
581 }
582}
583
584#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
586pub struct IndexKey {
587 pub region_id: RegionId,
588 pub file_id: FileId,
589 pub file_type: FileType,
590}
591
592impl IndexKey {
593 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
595 IndexKey {
596 region_id,
597 file_id,
598 file_type,
599 }
600 }
601}
602
603impl fmt::Display for IndexKey {
604 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
605 write!(
606 f,
607 "{}.{}.{}",
608 self.region_id.as_u64(),
609 self.file_id,
610 self.file_type
611 )
612 }
613}
614
615#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
617pub enum FileType {
618 Parquet,
620 Puffin(u64),
622}
623
624impl fmt::Display for FileType {
625 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626 match self {
627 FileType::Parquet => write!(f, "parquet"),
628 FileType::Puffin(version) => write!(f, "{}.puffin", version),
629 }
630 }
631}
632
633impl FileType {
634 fn parse(s: &str) -> Option<FileType> {
636 match s {
637 "parquet" => Some(FileType::Parquet),
638 "puffin" => Some(FileType::Puffin(0)),
639 _ => {
640 if let Some(version_str) = s.strip_suffix(".puffin") {
642 let version = version_str.parse::<u64>().ok()?;
643 Some(FileType::Puffin(version))
644 } else {
645 None
646 }
647 }
648 }
649 }
650
651 fn metric_label(&self) -> &'static str {
653 match self {
654 FileType::Parquet => FILE_TYPE,
655 FileType::Puffin(_) => INDEX_TYPE,
656 }
657 }
658}
659
660#[derive(Debug, Clone)]
664pub(crate) struct IndexValue {
665 pub(crate) file_size: u32,
667}
668
669fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
673 join_path(cache_file_dir, &key.to_string())
674}
675
676fn parse_index_key(name: &str) -> Option<IndexKey> {
678 let mut split = name.splitn(3, '.');
679 let region_id = split.next().and_then(|s| {
680 let id = s.parse::<u64>().ok()?;
681 Some(RegionId::from_u64(id))
682 })?;
683 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
684 let file_type = split.next().and_then(FileType::parse)?;
685
686 Some(IndexKey::new(region_id, file_id, file_type))
687}
688
689#[cfg(test)]
690mod tests {
691 use common_test_util::temp_dir::create_temp_dir;
692 use object_store::services::Fs;
693
694 use super::*;
695
696 fn new_fs_store(path: &str) -> ObjectStore {
697 let builder = Fs::default().root(path);
698 ObjectStore::new(builder).unwrap().finish()
699 }
700
701 #[tokio::test]
702 async fn test_file_cache_ttl() {
703 let dir = create_temp_dir("");
704 let local_store = new_fs_store(dir.path().to_str().unwrap());
705
706 let cache = FileCache::new(
707 local_store.clone(),
708 ReadableSize::mb(10),
709 Some(Duration::from_millis(10)),
710 None,
711 );
712 let region_id = RegionId::new(2000, 0);
713 let file_id = FileId::random();
714 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
715 let file_path = cache.cache_file_path(key);
716
717 assert!(cache.reader(key).await.is_none());
719
720 local_store
722 .write(&file_path, b"hello".as_slice())
723 .await
724 .unwrap();
725
726 cache
728 .put(
729 IndexKey::new(region_id, file_id, FileType::Parquet),
730 IndexValue { file_size: 5 },
731 )
732 .await;
733
734 let exist = cache.reader(key).await;
735 assert!(exist.is_some());
736 tokio::time::sleep(Duration::from_millis(15)).await;
737 cache.inner.parquet_index.run_pending_tasks().await;
738 let non = cache.reader(key).await;
739 assert!(non.is_none());
740 }
741
742 #[tokio::test]
743 async fn test_file_cache_basic() {
744 let dir = create_temp_dir("");
745 let local_store = new_fs_store(dir.path().to_str().unwrap());
746
747 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
748 let region_id = RegionId::new(2000, 0);
749 let file_id = FileId::random();
750 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
751 let file_path = cache.cache_file_path(key);
752
753 assert!(cache.reader(key).await.is_none());
755
756 local_store
758 .write(&file_path, b"hello".as_slice())
759 .await
760 .unwrap();
761 cache
763 .put(
764 IndexKey::new(region_id, file_id, FileType::Parquet),
765 IndexValue { file_size: 5 },
766 )
767 .await;
768
769 let reader = cache.reader(key).await.unwrap();
771 let buf = reader.read(..).await.unwrap().to_vec();
772 assert_eq!("hello", String::from_utf8(buf).unwrap());
773
774 cache.inner.parquet_index.run_pending_tasks().await;
776 assert_eq!(5, cache.inner.parquet_index.weighted_size());
777
778 cache.remove(key).await;
780 assert!(cache.reader(key).await.is_none());
781
782 cache.inner.parquet_index.run_pending_tasks().await;
784
785 assert!(!local_store.exists(&file_path).await.unwrap());
787 assert_eq!(0, cache.inner.parquet_index.weighted_size());
788 }
789
790 #[tokio::test]
791 async fn test_file_cache_file_removed() {
792 let dir = create_temp_dir("");
793 let local_store = new_fs_store(dir.path().to_str().unwrap());
794
795 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
796 let region_id = RegionId::new(2000, 0);
797 let file_id = FileId::random();
798 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
799 let file_path = cache.cache_file_path(key);
800
801 local_store
803 .write(&file_path, b"hello".as_slice())
804 .await
805 .unwrap();
806 cache
808 .put(
809 IndexKey::new(region_id, file_id, FileType::Parquet),
810 IndexValue { file_size: 5 },
811 )
812 .await;
813
814 local_store.delete(&file_path).await.unwrap();
816
817 assert!(cache.reader(key).await.is_none());
819 assert!(!cache.inner.parquet_index.contains_key(&key));
821 }
822
823 #[tokio::test]
824 async fn test_file_cache_recover() {
825 let dir = create_temp_dir("");
826 let local_store = new_fs_store(dir.path().to_str().unwrap());
827 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
828
829 let region_id = RegionId::new(2000, 0);
830 let file_type = FileType::Parquet;
831 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
833 let mut total_size = 0;
834 for (i, file_id) in file_ids.iter().enumerate() {
835 let key = IndexKey::new(region_id, *file_id, file_type);
836 let file_path = cache.cache_file_path(key);
837 let bytes = i.to_string().into_bytes();
838 local_store.write(&file_path, bytes.clone()).await.unwrap();
839
840 cache
842 .put(
843 IndexKey::new(region_id, *file_id, file_type),
844 IndexValue {
845 file_size: bytes.len() as u32,
846 },
847 )
848 .await;
849 total_size += bytes.len();
850 }
851
852 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
854 assert!(
856 cache
857 .reader(IndexKey::new(region_id, file_ids[0], file_type))
858 .await
859 .is_none()
860 );
861 cache.recover(true, None).await;
862
863 cache.inner.parquet_index.run_pending_tasks().await;
865 assert_eq!(
866 total_size,
867 cache.inner.parquet_index.weighted_size() as usize
868 );
869
870 for (i, file_id) in file_ids.iter().enumerate() {
871 let key = IndexKey::new(region_id, *file_id, file_type);
872 let reader = cache.reader(key).await.unwrap();
873 let buf = reader.read(..).await.unwrap().to_vec();
874 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
875 }
876 }
877
878 #[tokio::test]
879 async fn test_file_cache_read_ranges() {
880 let dir = create_temp_dir("");
881 let local_store = new_fs_store(dir.path().to_str().unwrap());
882 let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
883 let region_id = RegionId::new(2000, 0);
884 let file_id = FileId::random();
885 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
886 let file_path = file_cache.cache_file_path(key);
887 let data = b"hello greptime database";
889 local_store
890 .write(&file_path, data.as_slice())
891 .await
892 .unwrap();
893 file_cache.put(key, IndexValue { file_size: 5 }).await;
895 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
897 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
898
899 assert_eq!(4, bytes.len());
900 assert_eq!(b"hello", bytes[0].as_ref());
901 assert_eq!(b"grep", bytes[1].as_ref());
902 assert_eq!(b"data", bytes[2].as_ref());
903 assert_eq!(data, bytes[3].as_ref());
904 }
905
906 #[test]
907 fn test_cache_file_path() {
908 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
909 assert_eq!(
910 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
911 cache_file_path(
912 "test_dir",
913 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
914 )
915 );
916 assert_eq!(
917 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
918 cache_file_path(
919 "test_dir/",
920 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
921 )
922 );
923 }
924
925 #[test]
926 fn test_parse_file_name() {
927 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
928 let region_id = RegionId::new(1234, 5);
929 assert_eq!(
930 IndexKey::new(region_id, file_id, FileType::Parquet),
931 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
932 );
933 assert_eq!(
934 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
935 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
936 );
937 assert_eq!(
938 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
939 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
940 .unwrap()
941 );
942 assert!(parse_index_key("").is_none());
943 assert!(parse_index_key(".").is_none());
944 assert!(parse_index_key("5299989643269").is_none());
945 assert!(parse_index_key("5299989643269.").is_none());
946 assert!(parse_index_key(".5299989643269").is_none());
947 assert!(parse_index_key("5299989643269.").is_none());
948 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
949 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
950 assert!(
951 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
952 );
953 assert!(
954 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
955 .is_none()
956 );
957 }
958}