1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48const FILE_DIR: &str = "cache/object/write/";
52
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62struct DownloadTask {
64 index_key: IndexKey,
65 remote_path: String,
66 remote_store: ObjectStore,
67 file_size: u64,
68}
69
70#[derive(Debug)]
72struct FileCacheInner {
73 local_store: ObjectStore,
75 parquet_index: Cache<IndexKey, IndexValue>,
77 puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84 match file_type {
85 FileType::Parquet => &self.parquet_index,
86 FileType::Puffin { .. } => &self.puffin_index,
87 }
88 }
89
90 fn cache_file_path(&self, key: IndexKey) -> String {
92 cache_file_path(FILE_DIR, key)
93 }
94
95 async fn put(&self, key: IndexKey, value: IndexValue) {
99 CACHE_BYTES
100 .with_label_values(&[key.file_type.metric_label()])
101 .add(value.file_size.into());
102 let index = self.memory_index(key.file_type);
103 index.insert(key, value).await;
104
105 index.run_pending_tasks().await;
107 }
108
109 async fn recover(&self) -> Result<()> {
111 let now = Instant::now();
112 let mut lister = self
113 .local_store
114 .lister_with(FILE_DIR)
115 .await
116 .context(OpenDalSnafu)?;
117 let (mut total_size, mut total_keys) = (0i64, 0);
120 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122 let meta = entry.metadata();
123 if !meta.is_file() {
124 continue;
125 }
126 let Some(key) = parse_index_key(entry.name()) else {
127 continue;
128 };
129
130 let meta = self
131 .local_store
132 .stat(entry.path())
133 .await
134 .context(OpenDalSnafu)?;
135 let file_size = meta.content_length() as u32;
136 let index = self.memory_index(key.file_type);
137 index.insert(key, IndexValue { file_size }).await;
138 let size = i64::from(file_size);
139 total_size += size;
140 total_keys += 1;
141
142 match key.file_type {
144 FileType::Parquet => parquet_size += size,
145 FileType::Puffin { .. } => puffin_size += size,
146 }
147 }
148 CACHE_BYTES
150 .with_label_values(&[FILE_TYPE])
151 .add(parquet_size);
152 CACHE_BYTES
153 .with_label_values(&[INDEX_TYPE])
154 .add(puffin_size);
155
156 self.parquet_index.run_pending_tasks().await;
159 self.puffin_index.run_pending_tasks().await;
160
161 let parquet_weight = self.parquet_index.weighted_size();
162 let parquet_count = self.parquet_index.entry_count();
163 let puffin_weight = self.puffin_index.weighted_size();
164 let puffin_count = self.puffin_index.entry_count();
165 info!(
166 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167 total_keys,
168 total_size,
169 parquet_count,
170 parquet_weight,
171 puffin_count,
172 puffin_weight,
173 now.elapsed()
174 );
175 Ok(())
176 }
177
178 async fn download_without_cleaning(
180 &self,
181 index_key: IndexKey,
182 remote_path: &str,
183 remote_store: &ObjectStore,
184 file_size: u64,
185 concurrency: usize,
186 ) -> Result<()> {
187 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189 let file_type = index_key.file_type;
190 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191 .with_label_values(&[match file_type {
192 FileType::Parquet => "download_parquet",
193 FileType::Puffin { .. } => "download_puffin",
194 }])
195 .start_timer();
196
197 let reader = remote_store
198 .reader_with(remote_path)
199 .concurrent(concurrency)
200 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201 .await
202 .context(error::OpenDalSnafu)?
203 .into_futures_async_read(0..file_size)
204 .await
205 .context(error::OpenDalSnafu)?;
206
207 let cache_path = self.cache_file_path(index_key);
208 let mut writer = self
209 .local_store
210 .writer(&cache_path)
211 .await
212 .context(error::OpenDalSnafu)?
213 .into_futures_async_write();
214
215 let region_id = index_key.region_id;
216 let file_id = index_key.file_id;
217 let bytes_written =
218 futures::io::copy(reader, &mut writer)
219 .await
220 .context(error::DownloadSnafu {
221 region_id,
222 file_id,
223 file_type,
224 })?;
225 writer.close().await.context(error::DownloadSnafu {
226 region_id,
227 file_id,
228 file_type,
229 })?;
230
231 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233 let elapsed = timer.stop_and_record();
234 debug!(
235 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236 remote_path, cache_path, bytes_written, region_id, elapsed,
237 );
238
239 let index_value = IndexValue {
240 file_size: bytes_written as _,
241 };
242 self.put(index_key, index_value).await;
243 Ok(())
244 }
245
246 async fn download(
248 &self,
249 index_key: IndexKey,
250 remote_path: &str,
251 remote_store: &ObjectStore,
252 file_size: u64,
253 concurrency: usize,
254 ) -> Result<()> {
255 if let Err(e) = self
256 .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257 .await
258 {
259 error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261 let filename = index_key.to_string();
262 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264 return Err(e);
265 }
266
267 Ok(())
268 }
269
270 fn contains_key(&self, key: &IndexKey) -> bool {
272 self.memory_index(key.file_type).contains_key(key)
273 }
274}
275
276#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280 inner: Arc<FileCacheInner>,
282 puffin_capacity: u64,
284 download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291 pub(crate) fn new(
293 local_store: ObjectStore,
294 capacity: ReadableSize,
295 ttl: Option<Duration>,
296 index_cache_percent: Option<u8>,
297 enable_background_worker: bool,
298 ) -> FileCache {
299 let index_percent = index_cache_percent
301 .filter(|&percent| percent > 0 && percent < 100)
302 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
303 let total_capacity = capacity.as_bytes();
304
305 let index_ratio = index_percent as f64 / 100.0;
307 let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
308 let parquet_capacity = total_capacity - puffin_capacity;
309
310 let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
312 let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
313
314 info!(
315 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
316 index_percent,
317 ReadableSize(total_capacity),
318 ReadableSize(parquet_capacity),
319 ReadableSize(puffin_capacity)
320 );
321
322 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
323 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
324
325 let inner = Arc::new(FileCacheInner {
327 local_store,
328 parquet_index,
329 puffin_index,
330 });
331
332 let download_task_tx = if enable_background_worker {
334 let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
335 Self::spawn_download_worker(inner.clone(), rx);
336 Some(tx)
337 } else {
338 None
339 };
340
341 FileCache {
342 inner,
343 puffin_capacity,
344 download_task_tx,
345 }
346 }
347
348 fn spawn_download_worker(
350 inner: Arc<FileCacheInner>,
351 mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
352 ) {
353 tokio::spawn(async move {
354 info!("Background download worker started");
355 while let Some(task) = download_task_rx.recv().await {
356 if inner.contains_key(&task.index_key) {
358 debug!(
359 "Skipping background download for region {}, file {} - already in cache",
360 task.index_key.region_id, task.index_key.file_id
361 );
362 continue;
363 }
364
365 let _ = inner
367 .download(
368 task.index_key,
369 &task.remote_path,
370 &task.remote_store,
371 task.file_size,
372 1, )
374 .await;
375 }
376 info!("Background download worker stopped");
377 });
378 }
379
380 fn build_cache(
382 local_store: ObjectStore,
383 capacity: u64,
384 ttl: Option<Duration>,
385 label: &'static str,
386 ) -> Cache<IndexKey, IndexValue> {
387 let cache_store = local_store;
388 let mut builder = Cache::builder()
389 .eviction_policy(EvictionPolicy::lru())
390 .weigher(|_key, value: &IndexValue| -> u32 {
391 value.file_size
393 })
394 .max_capacity(capacity)
395 .async_eviction_listener(move |key, value, cause| {
396 let store = cache_store.clone();
397 let file_path = cache_file_path(FILE_DIR, *key);
399 async move {
400 if let RemovalCause::Replaced = cause {
401 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
404 return;
405 }
406
407 match store.delete(&file_path).await {
408 Ok(()) => {
409 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
410 }
411 Err(e) => {
412 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
413 }
414 }
415 }
416 .boxed()
417 });
418 if let Some(ttl) = ttl {
419 builder = builder.time_to_idle(ttl);
420 }
421 builder.build()
422 }
423
424 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
428 self.inner.put(key, value).await
429 }
430
431 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
432 self.inner.memory_index(key.file_type).get(&key).await
433 }
434
435 #[allow(unused)]
437 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
438 let index = self.inner.memory_index(key.file_type);
441 if index.get(&key).await.is_none() {
442 CACHE_MISS
443 .with_label_values(&[key.file_type.metric_label()])
444 .inc();
445 return None;
446 }
447
448 let file_path = self.inner.cache_file_path(key);
449 match self.get_reader(&file_path).await {
450 Ok(Some(reader)) => {
451 CACHE_HIT
452 .with_label_values(&[key.file_type.metric_label()])
453 .inc();
454 return Some(reader);
455 }
456 Err(e) => {
457 if e.kind() != ErrorKind::NotFound {
458 warn!(e; "Failed to get file for key {:?}", key);
459 }
460 }
461 Ok(None) => {}
462 }
463
464 index.remove(&key).await;
466 CACHE_MISS
467 .with_label_values(&[key.file_type.metric_label()])
468 .inc();
469 None
470 }
471
472 pub(crate) async fn read_ranges(
474 &self,
475 key: IndexKey,
476 ranges: &[Range<u64>],
477 ) -> Option<Vec<Bytes>> {
478 let index = self.inner.memory_index(key.file_type);
479 if index.get(&key).await.is_none() {
480 CACHE_MISS
481 .with_label_values(&[key.file_type.metric_label()])
482 .inc();
483 return None;
484 }
485
486 let file_path = self.inner.cache_file_path(key);
487 let bytes_result =
490 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
491 match bytes_result {
492 Ok(bytes) => {
493 CACHE_HIT
494 .with_label_values(&[key.file_type.metric_label()])
495 .inc();
496 Some(bytes)
497 }
498 Err(e) => {
499 if e.kind() != ErrorKind::NotFound {
500 warn!(e; "Failed to get file for key {:?}", key);
501 }
502
503 index.remove(&key).await;
505 CACHE_MISS
506 .with_label_values(&[key.file_type.metric_label()])
507 .inc();
508 None
509 }
510 }
511 }
512
513 pub(crate) async fn remove(&self, key: IndexKey) {
517 let file_path = self.inner.cache_file_path(key);
518 self.inner.memory_index(key.file_type).remove(&key).await;
519 if let Err(e) = self.inner.local_store.delete(&file_path).await {
521 warn!(e; "Failed to delete a cached file {}", file_path);
522 }
523 }
524
525 pub(crate) async fn recover(
530 &self,
531 sync: bool,
532 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
533 ) {
534 let moved_self = self.clone();
535 let handle = tokio::spawn(async move {
536 if let Err(err) = moved_self.inner.recover().await {
537 error!(err; "Failed to recover file cache.")
538 }
539
540 if let Some(mut receiver) = task_receiver {
543 info!("Spawning background task for processing region load cache tasks");
544 tokio::spawn(async move {
545 while let Some(task) = receiver.recv().await {
546 task.fill_cache(&moved_self).await;
547 }
548 info!("Background task for processing region load cache tasks stopped");
549 });
550 }
551 });
552
553 if sync {
554 let _ = handle.await;
555 }
556 }
557
558 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
560 self.inner.cache_file_path(key)
561 }
562
563 pub(crate) fn local_store(&self) -> ObjectStore {
565 self.inner.local_store.clone()
566 }
567
568 pub(crate) async fn get_parquet_meta_data(
571 &self,
572 key: IndexKey,
573 cache_metrics: &mut MetadataCacheMetrics,
574 page_index_policy: PageIndexPolicy,
575 ) -> Option<ParquetMetaData> {
576 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
578 let local_store = self.local_store();
580 let file_path = self.inner.cache_file_path(key);
581 let file_size = index_value.file_size as u64;
582 let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
583 metadata_loader.with_page_index_policy(page_index_policy);
584
585 match metadata_loader.load(cache_metrics).await {
586 Ok(metadata) => {
587 CACHE_HIT
588 .with_label_values(&[key.file_type.metric_label()])
589 .inc();
590 Some(metadata)
591 }
592 Err(e) => {
593 if !e.is_object_not_found() {
594 warn!(
595 e; "Failed to get parquet metadata for key {:?}",
596 key
597 );
598 }
599 self.inner.parquet_index.remove(&key).await;
601 CACHE_MISS
602 .with_label_values(&[key.file_type.metric_label()])
603 .inc();
604 None
605 }
606 }
607 } else {
608 CACHE_MISS
609 .with_label_values(&[key.file_type.metric_label()])
610 .inc();
611 None
612 }
613 }
614
615 pub(crate) async fn get_sst_meta_data(
618 &self,
619 key: IndexKey,
620 cache_metrics: &mut MetadataCacheMetrics,
621 page_index_policy: PageIndexPolicy,
622 ) -> Option<Arc<CachedSstMeta>> {
623 let file_path = self.inner.cache_file_path(key);
624 self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
625 .await
626 .and_then(
627 |metadata| match CachedSstMeta::try_new(&file_path, metadata) {
628 Ok(metadata) => Some(Arc::new(metadata)),
629 Err(err) => {
630 CACHE_MISS
631 .with_label_values(&[key.file_type.metric_label()])
632 .inc();
633 warn!(
634 err; "Failed to decode cached parquet metadata for key {:?}",
635 key
636 );
637 None
638 }
639 },
640 )
641 }
642
643 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
644 if self.inner.local_store.exists(file_path).await? {
645 Ok(Some(self.inner.local_store.reader(file_path).await?))
646 } else {
647 Ok(None)
648 }
649 }
650
651 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
653 self.inner.contains_key(key)
654 }
655
656 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
658 self.puffin_capacity
659 }
660
661 pub(crate) fn puffin_cache_size(&self) -> u64 {
663 self.inner.puffin_index.weighted_size()
664 }
665
666 pub(crate) async fn download(
669 &self,
670 index_key: IndexKey,
671 remote_path: &str,
672 remote_store: &ObjectStore,
673 file_size: u64,
674 ) -> Result<()> {
675 self.inner
676 .download(index_key, remote_path, remote_store, file_size, 8) .await
678 }
679
680 pub(crate) fn maybe_download_background(
686 &self,
687 index_key: IndexKey,
688 remote_path: String,
689 remote_store: ObjectStore,
690 file_size: u64,
691 ) {
692 let Some(tx) = &self.download_task_tx else {
694 return;
695 };
696
697 let task = DownloadTask {
698 index_key,
699 remote_path,
700 remote_store,
701 file_size,
702 };
703
704 if let Err(e) = tx.try_send(task) {
706 debug!(
707 "Failed to queue background download task for region {}, file {}: {:?}",
708 index_key.region_id, index_key.file_id, e
709 );
710 }
711 }
712}
713
714#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
716pub struct IndexKey {
717 pub region_id: RegionId,
718 pub file_id: FileId,
719 pub file_type: FileType,
720}
721
722impl IndexKey {
723 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
725 IndexKey {
726 region_id,
727 file_id,
728 file_type,
729 }
730 }
731}
732
733impl fmt::Display for IndexKey {
734 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
735 write!(
736 f,
737 "{}.{}.{}",
738 self.region_id.as_u64(),
739 self.file_id,
740 self.file_type
741 )
742 }
743}
744
745#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
747pub enum FileType {
748 Parquet,
750 Puffin(u64),
752}
753
754impl fmt::Display for FileType {
755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756 match self {
757 FileType::Parquet => write!(f, "parquet"),
758 FileType::Puffin(version) => write!(f, "{}.puffin", version),
759 }
760 }
761}
762
763impl FileType {
764 pub(crate) fn parse(s: &str) -> Option<FileType> {
766 match s {
767 "parquet" => Some(FileType::Parquet),
768 "puffin" => Some(FileType::Puffin(0)),
769 _ => {
770 if let Some(version_str) = s.strip_suffix(".puffin") {
772 let version = version_str.parse::<u64>().ok()?;
773 Some(FileType::Puffin(version))
774 } else {
775 None
776 }
777 }
778 }
779 }
780
781 fn metric_label(&self) -> &'static str {
783 match self {
784 FileType::Parquet => FILE_TYPE,
785 FileType::Puffin(_) => INDEX_TYPE,
786 }
787 }
788}
789
790#[derive(Debug, Clone)]
794pub(crate) struct IndexValue {
795 pub(crate) file_size: u32,
797}
798
799fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
803 join_path(cache_file_dir, &key.to_string())
804}
805
806fn parse_index_key(name: &str) -> Option<IndexKey> {
808 let mut split = name.splitn(3, '.');
809 let region_id = split.next().and_then(|s| {
810 let id = s.parse::<u64>().ok()?;
811 Some(RegionId::from_u64(id))
812 })?;
813 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
814 let file_type = split.next().and_then(FileType::parse)?;
815
816 Some(IndexKey::new(region_id, file_id, file_type))
817}
818
819#[cfg(test)]
820mod tests {
821 use common_test_util::temp_dir::create_temp_dir;
822 use object_store::services::Fs;
823
824 use super::*;
825
826 fn new_fs_store(path: &str) -> ObjectStore {
827 let builder = Fs::default().root(path);
828 ObjectStore::new(builder).unwrap().finish()
829 }
830
831 #[tokio::test]
832 async fn test_file_cache_ttl() {
833 let dir = create_temp_dir("");
834 let local_store = new_fs_store(dir.path().to_str().unwrap());
835
836 let cache = FileCache::new(
837 local_store.clone(),
838 ReadableSize::mb(10),
839 Some(Duration::from_millis(10)),
840 None,
841 true, );
843 let region_id = RegionId::new(2000, 0);
844 let file_id = FileId::random();
845 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
846 let file_path = cache.cache_file_path(key);
847
848 assert!(cache.reader(key).await.is_none());
850
851 local_store
853 .write(&file_path, b"hello".as_slice())
854 .await
855 .unwrap();
856
857 cache
859 .put(
860 IndexKey::new(region_id, file_id, FileType::Parquet),
861 IndexValue { file_size: 5 },
862 )
863 .await;
864
865 let exist = cache.reader(key).await;
866 assert!(exist.is_some());
867 tokio::time::sleep(Duration::from_millis(15)).await;
868 cache.inner.parquet_index.run_pending_tasks().await;
869 let non = cache.reader(key).await;
870 assert!(non.is_none());
871 }
872
873 #[tokio::test]
874 async fn test_file_cache_basic() {
875 let dir = create_temp_dir("");
876 let local_store = new_fs_store(dir.path().to_str().unwrap());
877
878 let cache = FileCache::new(
879 local_store.clone(),
880 ReadableSize::mb(10),
881 None,
882 None,
883 true, );
885 let region_id = RegionId::new(2000, 0);
886 let file_id = FileId::random();
887 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
888 let file_path = cache.cache_file_path(key);
889
890 assert!(cache.reader(key).await.is_none());
892
893 local_store
895 .write(&file_path, b"hello".as_slice())
896 .await
897 .unwrap();
898 cache
900 .put(
901 IndexKey::new(region_id, file_id, FileType::Parquet),
902 IndexValue { file_size: 5 },
903 )
904 .await;
905
906 let reader = cache.reader(key).await.unwrap();
908 let buf = reader.read(..).await.unwrap().to_vec();
909 assert_eq!("hello", String::from_utf8(buf).unwrap());
910
911 cache.inner.parquet_index.run_pending_tasks().await;
913 assert_eq!(5, cache.inner.parquet_index.weighted_size());
914
915 cache.remove(key).await;
917 assert!(cache.reader(key).await.is_none());
918
919 cache.inner.parquet_index.run_pending_tasks().await;
921
922 assert!(!local_store.exists(&file_path).await.unwrap());
924 assert_eq!(0, cache.inner.parquet_index.weighted_size());
925 }
926
927 #[tokio::test]
928 async fn test_file_cache_file_removed() {
929 let dir = create_temp_dir("");
930 let local_store = new_fs_store(dir.path().to_str().unwrap());
931
932 let cache = FileCache::new(
933 local_store.clone(),
934 ReadableSize::mb(10),
935 None,
936 None,
937 true, );
939 let region_id = RegionId::new(2000, 0);
940 let file_id = FileId::random();
941 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
942 let file_path = cache.cache_file_path(key);
943
944 local_store
946 .write(&file_path, b"hello".as_slice())
947 .await
948 .unwrap();
949 cache
951 .put(
952 IndexKey::new(region_id, file_id, FileType::Parquet),
953 IndexValue { file_size: 5 },
954 )
955 .await;
956
957 local_store.delete(&file_path).await.unwrap();
959
960 assert!(cache.reader(key).await.is_none());
962 assert!(!cache.inner.parquet_index.contains_key(&key));
964 }
965
966 #[tokio::test]
967 async fn test_file_cache_recover() {
968 let dir = create_temp_dir("");
969 let local_store = new_fs_store(dir.path().to_str().unwrap());
970 let cache = FileCache::new(
971 local_store.clone(),
972 ReadableSize::mb(10),
973 None,
974 None,
975 true, );
977
978 let region_id = RegionId::new(2000, 0);
979 let file_type = FileType::Parquet;
980 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
982 let mut total_size = 0;
983 for (i, file_id) in file_ids.iter().enumerate() {
984 let key = IndexKey::new(region_id, *file_id, file_type);
985 let file_path = cache.cache_file_path(key);
986 let bytes = i.to_string().into_bytes();
987 local_store.write(&file_path, bytes.clone()).await.unwrap();
988
989 cache
991 .put(
992 IndexKey::new(region_id, *file_id, file_type),
993 IndexValue {
994 file_size: bytes.len() as u32,
995 },
996 )
997 .await;
998 total_size += bytes.len();
999 }
1000
1001 let cache = FileCache::new(
1003 local_store.clone(),
1004 ReadableSize::mb(10),
1005 None,
1006 None,
1007 true, );
1009 assert!(
1011 cache
1012 .reader(IndexKey::new(region_id, file_ids[0], file_type))
1013 .await
1014 .is_none()
1015 );
1016 cache.recover(true, None).await;
1017
1018 cache.inner.parquet_index.run_pending_tasks().await;
1020 assert_eq!(
1021 total_size,
1022 cache.inner.parquet_index.weighted_size() as usize
1023 );
1024
1025 for (i, file_id) in file_ids.iter().enumerate() {
1026 let key = IndexKey::new(region_id, *file_id, file_type);
1027 let reader = cache.reader(key).await.unwrap();
1028 let buf = reader.read(..).await.unwrap().to_vec();
1029 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1030 }
1031 }
1032
1033 #[tokio::test]
1034 async fn test_file_cache_read_ranges() {
1035 let dir = create_temp_dir("");
1036 let local_store = new_fs_store(dir.path().to_str().unwrap());
1037 let file_cache = FileCache::new(
1038 local_store.clone(),
1039 ReadableSize::mb(10),
1040 None,
1041 None,
1042 true, );
1044 let region_id = RegionId::new(2000, 0);
1045 let file_id = FileId::random();
1046 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1047 let file_path = file_cache.cache_file_path(key);
1048 let data = b"hello greptime database";
1050 local_store
1051 .write(&file_path, data.as_slice())
1052 .await
1053 .unwrap();
1054 file_cache.put(key, IndexValue { file_size: 5 }).await;
1056 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1058 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1059
1060 assert_eq!(4, bytes.len());
1061 assert_eq!(b"hello", bytes[0].as_ref());
1062 assert_eq!(b"grep", bytes[1].as_ref());
1063 assert_eq!(b"data", bytes[2].as_ref());
1064 assert_eq!(data, bytes[3].as_ref());
1065 }
1066
1067 #[test]
1068 fn test_cache_file_path() {
1069 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1070 assert_eq!(
1071 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1072 cache_file_path(
1073 "test_dir",
1074 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1075 )
1076 );
1077 assert_eq!(
1078 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1079 cache_file_path(
1080 "test_dir/",
1081 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1082 )
1083 );
1084 }
1085
1086 #[test]
1087 fn test_parse_file_name() {
1088 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1089 let region_id = RegionId::new(1234, 5);
1090 assert_eq!(
1091 IndexKey::new(region_id, file_id, FileType::Parquet),
1092 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1093 );
1094 assert_eq!(
1095 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1096 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1097 );
1098 assert_eq!(
1099 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1100 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1101 .unwrap()
1102 );
1103 assert!(parse_index_key("").is_none());
1104 assert!(parse_index_key(".").is_none());
1105 assert!(parse_index_key("5299989643269").is_none());
1106 assert!(parse_index_key("5299989643269.").is_none());
1107 assert!(parse_index_key(".5299989643269").is_none());
1108 assert!(parse_index_key("5299989643269.").is_none());
1109 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1110 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1111 assert!(
1112 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1113 );
1114 assert!(
1115 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1116 .is_none()
1117 );
1118 }
1119}