1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::UnboundedReceiver;
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47const FILE_DIR: &str = "cache/object/write/";
51
52pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58#[derive(Debug)]
61pub(crate) struct FileCache {
62 local_store: ObjectStore,
64 parquet_index: Cache<IndexKey, IndexValue>,
66 puffin_index: Cache<IndexKey, IndexValue>,
68 puffin_capacity: u64,
70}
71
72pub(crate) type FileCacheRef = Arc<FileCache>;
73
74impl FileCache {
75 pub(crate) fn new(
77 local_store: ObjectStore,
78 capacity: ReadableSize,
79 ttl: Option<Duration>,
80 index_cache_percent: Option<u8>,
81 ) -> FileCache {
82 let index_percent = index_cache_percent
84 .filter(|&percent| percent > 0 && percent < 100)
85 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
86 let total_capacity = capacity.as_bytes();
87
88 let index_ratio = index_percent as f64 / 100.0;
90 let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
91 let parquet_capacity = total_capacity - puffin_capacity;
92
93 let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
95 let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
96
97 info!(
98 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
99 index_percent,
100 ReadableSize(total_capacity),
101 ReadableSize(parquet_capacity),
102 ReadableSize(puffin_capacity)
103 );
104
105 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
106 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
107
108 FileCache {
109 local_store,
110 parquet_index,
111 puffin_index,
112 puffin_capacity,
113 }
114 }
115
116 fn build_cache(
118 local_store: ObjectStore,
119 capacity: u64,
120 ttl: Option<Duration>,
121 label: &'static str,
122 ) -> Cache<IndexKey, IndexValue> {
123 let cache_store = local_store;
124 let mut builder = Cache::builder()
125 .eviction_policy(EvictionPolicy::lru())
126 .weigher(|_key, value: &IndexValue| -> u32 {
127 value.file_size
129 })
130 .max_capacity(capacity)
131 .async_eviction_listener(move |key, value, cause| {
132 let store = cache_store.clone();
133 let file_path = cache_file_path(FILE_DIR, *key);
135 async move {
136 if let RemovalCause::Replaced = cause {
137 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
140 warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
141 return;
142 }
143
144 match store.delete(&file_path).await {
145 Ok(()) => {
146 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
147 }
148 Err(e) => {
149 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
150 }
151 }
152 }
153 .boxed()
154 });
155 if let Some(ttl) = ttl {
156 builder = builder.time_to_idle(ttl);
157 }
158 builder.build()
159 }
160
161 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
163 match file_type {
164 FileType::Parquet => &self.parquet_index,
165 FileType::Puffin => &self.puffin_index,
166 }
167 }
168
169 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
173 CACHE_BYTES
174 .with_label_values(&[key.file_type.metric_label()])
175 .add(value.file_size.into());
176 let index = self.memory_index(key.file_type);
177 index.insert(key, value).await;
178
179 index.run_pending_tasks().await;
181 }
182
183 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
184 self.memory_index(key.file_type).get(&key).await
185 }
186
187 #[allow(unused)]
189 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
190 let index = self.memory_index(key.file_type);
193 if index.get(&key).await.is_none() {
194 CACHE_MISS
195 .with_label_values(&[key.file_type.metric_label()])
196 .inc();
197 return None;
198 }
199
200 let file_path = self.cache_file_path(key);
201 match self.get_reader(&file_path).await {
202 Ok(Some(reader)) => {
203 CACHE_HIT
204 .with_label_values(&[key.file_type.metric_label()])
205 .inc();
206 return Some(reader);
207 }
208 Err(e) => {
209 if e.kind() != ErrorKind::NotFound {
210 warn!(e; "Failed to get file for key {:?}", key);
211 }
212 }
213 Ok(None) => {}
214 }
215
216 index.remove(&key).await;
218 CACHE_MISS
219 .with_label_values(&[key.file_type.metric_label()])
220 .inc();
221 None
222 }
223
224 pub(crate) async fn read_ranges(
226 &self,
227 key: IndexKey,
228 ranges: &[Range<u64>],
229 ) -> Option<Vec<Bytes>> {
230 let index = self.memory_index(key.file_type);
231 if index.get(&key).await.is_none() {
232 CACHE_MISS
233 .with_label_values(&[key.file_type.metric_label()])
234 .inc();
235 return None;
236 }
237
238 let file_path = self.cache_file_path(key);
239 let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
242 match bytes_result {
243 Ok(bytes) => {
244 CACHE_HIT
245 .with_label_values(&[key.file_type.metric_label()])
246 .inc();
247 Some(bytes)
248 }
249 Err(e) => {
250 if e.kind() != ErrorKind::NotFound {
251 warn!(e; "Failed to get file for key {:?}", key);
252 }
253
254 index.remove(&key).await;
256 CACHE_MISS
257 .with_label_values(&[key.file_type.metric_label()])
258 .inc();
259 None
260 }
261 }
262 }
263
264 pub(crate) async fn remove(&self, key: IndexKey) {
268 let file_path = self.cache_file_path(key);
269 self.memory_index(key.file_type).remove(&key).await;
270 if let Err(e) = self.local_store.delete(&file_path).await {
272 warn!(e; "Failed to delete a cached file {}", file_path);
273 }
274 }
275
276 async fn recover_inner(&self) -> Result<()> {
277 let now = Instant::now();
278 let mut lister = self
279 .local_store
280 .lister_with(FILE_DIR)
281 .await
282 .context(OpenDalSnafu)?;
283 let (mut total_size, mut total_keys) = (0i64, 0);
286 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
287 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
288 let meta = entry.metadata();
289 if !meta.is_file() {
290 continue;
291 }
292 let Some(key) = parse_index_key(entry.name()) else {
293 continue;
294 };
295
296 let meta = self
297 .local_store
298 .stat(entry.path())
299 .await
300 .context(OpenDalSnafu)?;
301 let file_size = meta.content_length() as u32;
302 let index = self.memory_index(key.file_type);
303 index.insert(key, IndexValue { file_size }).await;
304 let size = i64::from(file_size);
305 total_size += size;
306 total_keys += 1;
307
308 match key.file_type {
310 FileType::Parquet => parquet_size += size,
311 FileType::Puffin => puffin_size += size,
312 }
313 }
314 CACHE_BYTES
316 .with_label_values(&[FILE_TYPE])
317 .add(parquet_size);
318 CACHE_BYTES
319 .with_label_values(&[INDEX_TYPE])
320 .add(puffin_size);
321
322 self.parquet_index.run_pending_tasks().await;
325 self.puffin_index.run_pending_tasks().await;
326
327 let parquet_weight = self.parquet_index.weighted_size();
328 let parquet_count = self.parquet_index.entry_count();
329 let puffin_weight = self.puffin_index.weighted_size();
330 let puffin_count = self.puffin_index.entry_count();
331 info!(
332 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
333 total_keys,
334 total_size,
335 parquet_count,
336 parquet_weight,
337 puffin_count,
338 puffin_weight,
339 now.elapsed()
340 );
341 Ok(())
342 }
343
344 pub(crate) async fn recover(
349 self: &Arc<Self>,
350 sync: bool,
351 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
352 ) {
353 let moved_self = self.clone();
354 let handle = tokio::spawn(async move {
355 if let Err(err) = moved_self.recover_inner().await {
356 error!(err; "Failed to recover file cache.")
357 }
358
359 if let Some(mut receiver) = task_receiver {
362 let cache_ref = moved_self.clone();
363 info!("Spawning background task for processing region load cache tasks");
364 tokio::spawn(async move {
365 while let Some(task) = receiver.recv().await {
366 let file_cache = cache_ref.clone();
367 task.fill_cache(file_cache).await;
368 }
369 info!("Background task for processing region load cache tasks stopped");
370 });
371 }
372 });
373
374 if sync {
375 let _ = handle.await;
376 }
377 }
378
379 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
381 cache_file_path(FILE_DIR, key)
382 }
383
384 pub(crate) fn local_store(&self) -> ObjectStore {
386 self.local_store.clone()
387 }
388
389 pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
392 if let Some(index_value) = self.parquet_index.get(&key).await {
394 let local_store = self.local_store();
396 let file_path = self.cache_file_path(key);
397 let file_size = index_value.file_size as u64;
398 let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
399
400 match metadata_loader.load().await {
401 Ok(metadata) => {
402 CACHE_HIT
403 .with_label_values(&[key.file_type.metric_label()])
404 .inc();
405 Some(metadata)
406 }
407 Err(e) => {
408 if !e.is_object_not_found() {
409 warn!(
410 e; "Failed to get parquet metadata for key {:?}",
411 key
412 );
413 }
414 self.parquet_index.remove(&key).await;
416 CACHE_MISS
417 .with_label_values(&[key.file_type.metric_label()])
418 .inc();
419 None
420 }
421 }
422 } else {
423 CACHE_MISS
424 .with_label_values(&[key.file_type.metric_label()])
425 .inc();
426 None
427 }
428 }
429
430 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
431 if self.local_store.exists(file_path).await? {
432 Ok(Some(self.local_store.reader(file_path).await?))
433 } else {
434 Ok(None)
435 }
436 }
437
438 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
440 self.memory_index(key.file_type).contains_key(key)
441 }
442
443 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
445 self.puffin_capacity
446 }
447
448 pub(crate) fn puffin_cache_size(&self) -> u64 {
450 self.puffin_index.weighted_size()
451 }
452
453 pub(crate) async fn download(
456 &self,
457 index_key: IndexKey,
458 remote_path: &str,
459 remote_store: &ObjectStore,
460 file_size: u64,
461 ) -> Result<()> {
462 if let Err(e) = self
463 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
464 .await
465 {
466 let filename = index_key.to_string();
467 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
468
469 return Err(e);
470 }
471 Ok(())
472 }
473
474 async fn download_without_cleaning(
475 &self,
476 index_key: IndexKey,
477 remote_path: &str,
478 remote_store: &ObjectStore,
479 file_size: u64,
480 ) -> Result<()> {
481 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
482 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
483
484 let file_type = index_key.file_type;
485 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
486 .with_label_values(&[match file_type {
487 FileType::Parquet => "download_parquet",
488 FileType::Puffin => "download_puffin",
489 }])
490 .start_timer();
491
492 let reader = remote_store
493 .reader_with(remote_path)
494 .concurrent(DOWNLOAD_READER_CONCURRENCY)
495 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
496 .await
497 .context(error::OpenDalSnafu)?
498 .into_futures_async_read(0..file_size)
499 .await
500 .context(error::OpenDalSnafu)?;
501
502 let cache_path = self.cache_file_path(index_key);
503 let mut writer = self
504 .local_store
505 .writer(&cache_path)
506 .await
507 .context(error::OpenDalSnafu)?
508 .into_futures_async_write();
509
510 let region_id = index_key.region_id;
511 let file_id = index_key.file_id;
512 let bytes_written =
513 futures::io::copy(reader, &mut writer)
514 .await
515 .context(error::DownloadSnafu {
516 region_id,
517 file_id,
518 file_type,
519 })?;
520 writer.close().await.context(error::DownloadSnafu {
521 region_id,
522 file_id,
523 file_type,
524 })?;
525
526 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
527
528 let elapsed = timer.stop_and_record();
529 debug!(
530 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
531 remote_path, cache_path, bytes_written, region_id, elapsed,
532 );
533
534 let index_value = IndexValue {
535 file_size: bytes_written as _,
536 };
537 self.put(index_key, index_value).await;
538 Ok(())
539 }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
544pub(crate) struct IndexKey {
545 pub region_id: RegionId,
546 pub file_id: FileId,
547 pub file_type: FileType,
548}
549
550impl IndexKey {
551 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
553 IndexKey {
554 region_id,
555 file_id,
556 file_type,
557 }
558 }
559}
560
561impl fmt::Display for IndexKey {
562 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563 write!(
564 f,
565 "{}.{}.{}",
566 self.region_id.as_u64(),
567 self.file_id,
568 self.file_type.as_str()
569 )
570 }
571}
572
573#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
575pub enum FileType {
576 Parquet,
578 Puffin,
580}
581
582impl FileType {
583 fn parse(s: &str) -> Option<FileType> {
585 match s {
586 "parquet" => Some(FileType::Parquet),
587 "puffin" => Some(FileType::Puffin),
588 _ => None,
589 }
590 }
591
592 fn as_str(&self) -> &'static str {
594 match self {
595 FileType::Parquet => "parquet",
596 FileType::Puffin => "puffin",
597 }
598 }
599
600 fn metric_label(&self) -> &'static str {
602 match self {
603 FileType::Parquet => FILE_TYPE,
604 FileType::Puffin => INDEX_TYPE,
605 }
606 }
607}
608
609#[derive(Debug, Clone)]
613pub(crate) struct IndexValue {
614 pub(crate) file_size: u32,
616}
617
618fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
622 join_path(cache_file_dir, &key.to_string())
623}
624
625fn parse_index_key(name: &str) -> Option<IndexKey> {
627 let mut split = name.splitn(3, '.');
628 let region_id = split.next().and_then(|s| {
629 let id = s.parse::<u64>().ok()?;
630 Some(RegionId::from_u64(id))
631 })?;
632 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
633 let file_type = split.next().and_then(FileType::parse)?;
634
635 Some(IndexKey::new(region_id, file_id, file_type))
636}
637
638#[cfg(test)]
639mod tests {
640 use common_test_util::temp_dir::create_temp_dir;
641 use object_store::services::Fs;
642
643 use super::*;
644
645 fn new_fs_store(path: &str) -> ObjectStore {
646 let builder = Fs::default().root(path);
647 ObjectStore::new(builder).unwrap().finish()
648 }
649
650 #[tokio::test]
651 async fn test_file_cache_ttl() {
652 let dir = create_temp_dir("");
653 let local_store = new_fs_store(dir.path().to_str().unwrap());
654
655 let cache = FileCache::new(
656 local_store.clone(),
657 ReadableSize::mb(10),
658 Some(Duration::from_millis(10)),
659 None,
660 );
661 let region_id = RegionId::new(2000, 0);
662 let file_id = FileId::random();
663 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
664 let file_path = cache.cache_file_path(key);
665
666 assert!(cache.reader(key).await.is_none());
668
669 local_store
671 .write(&file_path, b"hello".as_slice())
672 .await
673 .unwrap();
674
675 cache
677 .put(
678 IndexKey::new(region_id, file_id, FileType::Parquet),
679 IndexValue { file_size: 5 },
680 )
681 .await;
682
683 let exist = cache.reader(key).await;
684 assert!(exist.is_some());
685 tokio::time::sleep(Duration::from_millis(15)).await;
686 cache.parquet_index.run_pending_tasks().await;
687 let non = cache.reader(key).await;
688 assert!(non.is_none());
689 }
690
691 #[tokio::test]
692 async fn test_file_cache_basic() {
693 let dir = create_temp_dir("");
694 let local_store = new_fs_store(dir.path().to_str().unwrap());
695
696 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
697 let region_id = RegionId::new(2000, 0);
698 let file_id = FileId::random();
699 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
700 let file_path = cache.cache_file_path(key);
701
702 assert!(cache.reader(key).await.is_none());
704
705 local_store
707 .write(&file_path, b"hello".as_slice())
708 .await
709 .unwrap();
710 cache
712 .put(
713 IndexKey::new(region_id, file_id, FileType::Parquet),
714 IndexValue { file_size: 5 },
715 )
716 .await;
717
718 let reader = cache.reader(key).await.unwrap();
720 let buf = reader.read(..).await.unwrap().to_vec();
721 assert_eq!("hello", String::from_utf8(buf).unwrap());
722
723 cache.parquet_index.run_pending_tasks().await;
725 assert_eq!(5, cache.parquet_index.weighted_size());
726
727 cache.remove(key).await;
729 assert!(cache.reader(key).await.is_none());
730
731 cache.parquet_index.run_pending_tasks().await;
733
734 assert!(!local_store.exists(&file_path).await.unwrap());
736 assert_eq!(0, cache.parquet_index.weighted_size());
737 }
738
739 #[tokio::test]
740 async fn test_file_cache_file_removed() {
741 let dir = create_temp_dir("");
742 let local_store = new_fs_store(dir.path().to_str().unwrap());
743
744 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
745 let region_id = RegionId::new(2000, 0);
746 let file_id = FileId::random();
747 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
748 let file_path = cache.cache_file_path(key);
749
750 local_store
752 .write(&file_path, b"hello".as_slice())
753 .await
754 .unwrap();
755 cache
757 .put(
758 IndexKey::new(region_id, file_id, FileType::Parquet),
759 IndexValue { file_size: 5 },
760 )
761 .await;
762
763 local_store.delete(&file_path).await.unwrap();
765
766 assert!(cache.reader(key).await.is_none());
768 assert!(!cache.parquet_index.contains_key(&key));
770 }
771
772 #[tokio::test]
773 async fn test_file_cache_recover() {
774 let dir = create_temp_dir("");
775 let local_store = new_fs_store(dir.path().to_str().unwrap());
776 let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
777
778 let region_id = RegionId::new(2000, 0);
779 let file_type = FileType::Parquet;
780 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
782 let mut total_size = 0;
783 for (i, file_id) in file_ids.iter().enumerate() {
784 let key = IndexKey::new(region_id, *file_id, file_type);
785 let file_path = cache.cache_file_path(key);
786 let bytes = i.to_string().into_bytes();
787 local_store.write(&file_path, bytes.clone()).await.unwrap();
788
789 cache
791 .put(
792 IndexKey::new(region_id, *file_id, file_type),
793 IndexValue {
794 file_size: bytes.len() as u32,
795 },
796 )
797 .await;
798 total_size += bytes.len();
799 }
800
801 let cache = Arc::new(FileCache::new(
803 local_store.clone(),
804 ReadableSize::mb(10),
805 None,
806 None,
807 ));
808 assert!(
810 cache
811 .reader(IndexKey::new(region_id, file_ids[0], file_type))
812 .await
813 .is_none()
814 );
815 cache.recover(true, None).await;
816
817 cache.parquet_index.run_pending_tasks().await;
819 assert_eq!(total_size, cache.parquet_index.weighted_size() as usize);
820
821 for (i, file_id) in file_ids.iter().enumerate() {
822 let key = IndexKey::new(region_id, *file_id, file_type);
823 let reader = cache.reader(key).await.unwrap();
824 let buf = reader.read(..).await.unwrap().to_vec();
825 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
826 }
827 }
828
829 #[tokio::test]
830 async fn test_file_cache_read_ranges() {
831 let dir = create_temp_dir("");
832 let local_store = new_fs_store(dir.path().to_str().unwrap());
833 let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
834 let region_id = RegionId::new(2000, 0);
835 let file_id = FileId::random();
836 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
837 let file_path = file_cache.cache_file_path(key);
838 let data = b"hello greptime database";
840 local_store
841 .write(&file_path, data.as_slice())
842 .await
843 .unwrap();
844 file_cache.put(key, IndexValue { file_size: 5 }).await;
846 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
848 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
849
850 assert_eq!(4, bytes.len());
851 assert_eq!(b"hello", bytes[0].as_ref());
852 assert_eq!(b"grep", bytes[1].as_ref());
853 assert_eq!(b"data", bytes[2].as_ref());
854 assert_eq!(data, bytes[3].as_ref());
855 }
856
857 #[test]
858 fn test_cache_file_path() {
859 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
860 assert_eq!(
861 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
862 cache_file_path(
863 "test_dir",
864 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
865 )
866 );
867 assert_eq!(
868 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
869 cache_file_path(
870 "test_dir/",
871 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
872 )
873 );
874 }
875
876 #[test]
877 fn test_parse_file_name() {
878 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
879 let region_id = RegionId::new(1234, 5);
880 assert_eq!(
881 IndexKey::new(region_id, file_id, FileType::Parquet),
882 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
883 );
884 assert!(parse_index_key("").is_none());
885 assert!(parse_index_key(".").is_none());
886 assert!(parse_index_key("5299989643269").is_none());
887 assert!(parse_index_key("5299989643269.").is_none());
888 assert!(parse_index_key(".5299989643269").is_none());
889 assert!(parse_index_key("5299989643269.").is_none());
890 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
891 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
892 assert!(
893 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
894 );
895 assert!(
896 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
897 .is_none()
898 );
899 }
900}