1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45pub struct WriteCache {
49 file_cache: FileCacheRef,
51 puffin_manager_factory: PuffinManagerFactory,
53 intermediate_manager: IntermediateManager,
55 task_sender: UnboundedSender<RegionLoadCacheTask>,
57 manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64 #[allow(clippy::too_many_arguments)]
67 pub async fn new(
68 local_store: ObjectStore,
69 cache_capacity: ReadableSize,
70 ttl: Option<Duration>,
71 index_cache_percent: Option<u8>,
72 enable_background_worker: bool,
73 puffin_manager_factory: PuffinManagerFactory,
74 intermediate_manager: IntermediateManager,
75 manifest_cache: Option<ManifestCache>,
76 ) -> Result<Self> {
77 let (task_sender, task_receiver) = unbounded_channel();
78
79 let file_cache = Arc::new(FileCache::new(
80 local_store,
81 cache_capacity,
82 ttl,
83 index_cache_percent,
84 enable_background_worker,
85 ));
86 file_cache.recover(false, Some(task_receiver)).await;
87
88 Ok(Self {
89 file_cache,
90 puffin_manager_factory,
91 intermediate_manager,
92 task_sender,
93 manifest_cache,
94 })
95 }
96
97 #[allow(clippy::too_many_arguments)]
99 pub async fn new_fs(
100 cache_dir: &str,
101 cache_capacity: ReadableSize,
102 ttl: Option<Duration>,
103 index_cache_percent: Option<u8>,
104 enable_background_worker: bool,
105 puffin_manager_factory: PuffinManagerFactory,
106 intermediate_manager: IntermediateManager,
107 manifest_cache_capacity: ReadableSize,
108 ) -> Result<Self> {
109 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
110
111 let local_store = new_fs_cache_store(cache_dir).await?;
112
113 let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
115 Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl, false).await)
116 } else {
117 None
118 };
119
120 Self::new(
121 local_store,
122 cache_capacity,
123 ttl,
124 index_cache_percent,
125 enable_background_worker,
126 puffin_manager_factory,
127 intermediate_manager,
128 manifest_cache,
129 )
130 .await
131 }
132
133 pub(crate) fn file_cache(&self) -> FileCacheRef {
135 self.file_cache.clone()
136 }
137
138 pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
140 self.manifest_cache.clone()
141 }
142
143 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
145 let store = self.file_cache.local_store();
146 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
147 self.puffin_manager_factory.build(store, path_provider)
148 }
149
150 pub(crate) async fn put_and_upload_sst(
152 &self,
153 data: &bytes::Bytes,
154 region_id: RegionId,
155 sst_info: &SstInfo,
156 upload_request: SstUploadRequest,
157 ) -> Result<Metrics> {
158 let file_id = sst_info.file_id;
159 let mut metrics = Metrics::new(WriteType::Flush);
160
161 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
163
164 let cache_start = Instant::now();
166 let cache_path = self.file_cache.cache_file_path(parquet_key);
167 let store = self.file_cache.local_store();
168 let cleaner = TempFileCleaner::new(region_id, store.clone());
169 let write_res = store
170 .write(&cache_path, data.clone())
171 .await
172 .context(crate::error::OpenDalSnafu);
173 if let Err(e) = write_res {
174 cleaner.clean_by_file_id(file_id).await;
175 return Err(e);
176 }
177
178 metrics.write_batch = cache_start.elapsed();
179
180 let upload_start = Instant::now();
182 let region_file_id = RegionFileId::new(region_id, file_id);
183 let remote_path = upload_request
184 .dest_path_provider
185 .build_sst_file_path(region_file_id);
186
187 if let Err(e) = self
188 .upload(parquet_key, &remote_path, &upload_request.remote_store)
189 .await
190 {
191 self.remove(parquet_key).await;
193 return Err(e);
194 }
195
196 metrics.upload_parquet = upload_start.elapsed();
197 Ok(metrics)
198 }
199
200 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
202 &self.intermediate_manager
203 }
204
205 pub(crate) async fn write_and_upload_sst(
207 &self,
208 write_request: SstWriteRequest,
209 upload_request: SstUploadRequest,
210 write_opts: &WriteOptions,
211 metrics: &mut Metrics,
212 ) -> Result<SstInfoArray> {
213 let region_id = write_request.metadata.region_id;
214
215 let store = self.file_cache.local_store();
216 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
217 let indexer = IndexerBuilderImpl {
218 build_type: write_request.op_type.into(),
219 metadata: write_request.metadata.clone(),
220 row_group_size: write_opts.row_group_size,
221 puffin_manager: self
222 .puffin_manager_factory
223 .build(store.clone(), path_provider.clone()),
224 write_cache_enabled: true,
225 intermediate_manager: self.intermediate_manager.clone(),
226 index_options: write_request.index_options,
227 inverted_index_config: write_request.inverted_index_config,
228 fulltext_index_config: write_request.fulltext_index_config,
229 bloom_filter_index_config: write_request.bloom_filter_index_config,
230 #[cfg(feature = "vector_index")]
231 vector_index_config: write_request.vector_index_config,
232 };
233
234 let cleaner = TempFileCleaner::new(region_id, store.clone());
235 let mut writer = ParquetWriter::new_with_object_store(
237 store.clone(),
238 write_request.metadata,
239 write_request.index_config,
240 indexer,
241 path_provider.clone(),
242 metrics,
243 )
244 .await
245 .with_file_cleaner(cleaner);
246
247 let sst_info = match write_request.sst_write_format {
248 crate::sst::FormatType::PrimaryKey => {
249 writer
250 .write_all_flat_as_primary_key(
251 write_request.source,
252 write_request.max_sequence,
253 write_opts,
254 )
255 .await?
256 }
257 crate::sst::FormatType::Flat => {
258 writer
259 .write_all_flat(write_request.source, write_request.max_sequence, write_opts)
260 .await?
261 }
262 };
263
264 if sst_info.is_empty() {
266 return Ok(sst_info);
267 }
268
269 let mut upload_tracker = UploadTracker::new(region_id);
270 let mut err = None;
271 let remote_store = &upload_request.remote_store;
272 for sst in &sst_info {
273 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
274 let parquet_path = upload_request
275 .dest_path_provider
276 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
277 let start = Instant::now();
278 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
279 err = Some(e);
280 break;
281 }
282 metrics.upload_parquet += start.elapsed();
283 upload_tracker.push_uploaded_file(parquet_path);
284
285 if sst.index_metadata.file_size > 0 {
286 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
287 let puffin_path = upload_request
288 .dest_path_provider
289 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
290 let start = Instant::now();
291 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
292 err = Some(e);
293 break;
294 }
295 metrics.upload_puffin += start.elapsed();
296 upload_tracker.push_uploaded_file(puffin_path);
297 }
298 }
299
300 if let Some(err) = err {
301 upload_tracker
303 .clean(&sst_info, &self.file_cache, remote_store)
304 .await;
305 return Err(err);
306 }
307
308 Ok(sst_info)
309 }
310
311 pub(crate) async fn remove(&self, index_key: IndexKey) {
313 self.file_cache.remove(index_key).await
314 }
315
316 pub(crate) async fn download(
319 &self,
320 index_key: IndexKey,
321 remote_path: &str,
322 remote_store: &ObjectStore,
323 file_size: u64,
324 ) -> Result<()> {
325 self.file_cache
326 .download(index_key, remote_path, remote_store, file_size)
327 .await
328 }
329
330 pub(crate) async fn download_if_absent(
335 &self,
336 index_key: IndexKey,
337 remote_path: &str,
338 remote_store: &ObjectStore,
339 file_size: u64,
340 ) -> Result<bool> {
341 if self.file_cache.contains_key(&index_key) {
342 debug!(
343 "Skip downloading file already in write cache, region: {}, file: {}",
344 index_key.region_id, index_key.file_id
345 );
346 return Ok(false);
347 }
348
349 self.download(index_key, remote_path, remote_store, file_size)
350 .await?;
351 Ok(true)
352 }
353
354 pub(crate) async fn upload(
356 &self,
357 index_key: IndexKey,
358 upload_path: &str,
359 remote_store: &ObjectStore,
360 ) -> Result<()> {
361 let region_id = index_key.region_id;
362 let file_id = index_key.file_id;
363 let file_type = index_key.file_type;
364 let cache_path = self.file_cache.cache_file_path(index_key);
365
366 let start = Instant::now();
367 let cached_value = self
368 .file_cache
369 .local_store()
370 .stat(&cache_path)
371 .await
372 .context(error::OpenDalSnafu)?;
373 let reader = self
374 .file_cache
375 .local_store()
376 .reader(&cache_path)
377 .await
378 .context(error::OpenDalSnafu)?
379 .into_futures_async_read(0..cached_value.content_length())
380 .await
381 .context(error::OpenDalSnafu)?;
382
383 let mut writer = remote_store
384 .writer_with(upload_path)
385 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
386 .concurrent(DEFAULT_WRITE_CONCURRENCY)
387 .await
388 .context(error::OpenDalSnafu)?
389 .into_futures_async_write();
390
391 let bytes_written =
392 futures::io::copy(reader, &mut writer)
393 .await
394 .context(error::UploadSnafu {
395 region_id,
396 file_id,
397 file_type,
398 })?;
399
400 writer.close().await.context(error::UploadSnafu {
402 region_id,
403 file_id,
404 file_type,
405 })?;
406
407 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
408
409 debug!(
410 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
411 region_id,
412 file_id,
413 upload_path,
414 start.elapsed(),
415 );
416
417 let index_value = IndexValue {
418 file_size: bytes_written as _,
419 };
420 self.file_cache.put(index_key, index_value).await;
422
423 Ok(())
424 }
425
426 pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
430 let _ = self.task_sender.send(task);
431 }
432}
433
434pub struct SstUploadRequest {
436 pub dest_path_provider: RegionFilePathFactory,
438 pub remote_store: ObjectStore,
440}
441
442pub(crate) struct UploadTracker {
444 region_id: RegionId,
446 files_uploaded: Vec<String>,
448}
449
450impl UploadTracker {
451 pub(crate) fn new(region_id: RegionId) -> Self {
453 Self {
454 region_id,
455 files_uploaded: Vec::new(),
456 }
457 }
458
459 pub(crate) fn push_uploaded_file(&mut self, path: String) {
461 self.files_uploaded.push(path);
462 }
463
464 pub(crate) async fn clean(
466 &self,
467 sst_info: &SstInfoArray,
468 file_cache: &FileCacheRef,
469 remote_store: &ObjectStore,
470 ) {
471 common_telemetry::info!(
472 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
473 self.region_id,
474 sst_info.len()
475 );
476
477 for sst in sst_info {
479 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
480 file_cache.remove(parquet_key).await;
481
482 if sst.index_metadata.file_size > 0 {
483 let puffin_key = IndexKey::new(
484 self.region_id,
485 sst.file_id,
486 FileType::Puffin(sst.index_metadata.version),
487 );
488 file_cache.remove(puffin_key).await;
489 }
490 }
491
492 for file_path in &self.files_uploaded {
494 if let Err(e) = remote_store.delete(file_path).await {
495 common_telemetry::error!(e; "Failed to delete file {}", file_path);
496 }
497 }
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use bytes::Bytes;
504 use common_test_util::temp_dir::create_temp_dir;
505 use object_store::ATOMIC_WRITE_DIR;
506 use parquet::file::metadata::PageIndexPolicy;
507 use store_api::region_request::PathType;
508 use store_api::storage::FileId;
509
510 use super::*;
511 use crate::access_layer::OperationType;
512 use crate::cache::file_cache::IndexValue;
513 use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
514 use crate::cache::{CacheManager, CacheStrategy};
515 use crate::error::InvalidBatchSnafu;
516 use crate::read::FlatSource;
517 use crate::region::options::IndexOptions;
518 use crate::sst::parquet::reader::ParquetReaderBuilder;
519 use crate::test_util::TestEnv;
520 use crate::test_util::sst_util::{
521 new_flat_source_from_record_batches, new_record_batch_by_range,
522 sst_file_handle_with_file_id, sst_region_metadata,
523 };
524
525 #[tokio::test]
526 async fn test_write_and_upload_sst() {
527 let mut env = TestEnv::new().await;
530 let mock_store = env.init_object_store_manager();
531 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
532
533 let local_dir = create_temp_dir("");
534 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
535
536 let write_cache = env
537 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
538 .await;
539
540 let metadata = Arc::new(sst_region_metadata());
542 let region_id = metadata.region_id;
543 let source = new_flat_source_from_record_batches(vec![
544 new_record_batch_by_range(&["a", "d"], 0, 60),
545 new_record_batch_by_range(&["b", "f"], 0, 40),
546 new_record_batch_by_range(&["b", "h"], 100, 200),
547 ]);
548
549 let write_request = SstWriteRequest {
550 op_type: OperationType::Flush,
551 metadata,
552 source,
553 storage: None,
554 max_sequence: None,
555 sst_write_format: Default::default(),
556 cache_manager: Default::default(),
557 index_options: IndexOptions::default(),
558 index_config: Default::default(),
559 inverted_index_config: Default::default(),
560 fulltext_index_config: Default::default(),
561 bloom_filter_index_config: Default::default(),
562 #[cfg(feature = "vector_index")]
563 vector_index_config: Default::default(),
564 };
565
566 let upload_request = SstUploadRequest {
567 dest_path_provider: path_provider.clone(),
568 remote_store: mock_store.clone(),
569 };
570
571 let write_opts = WriteOptions {
572 row_group_size: 512,
573 ..Default::default()
574 };
575
576 let mut metrics = Metrics::new(WriteType::Flush);
578 let mut sst_infos = write_cache
579 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
580 .await
581 .unwrap();
582 let sst_info = sst_infos.remove(0);
583
584 let file_id = sst_info.file_id;
585 let sst_upload_path =
586 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
587 let index_upload_path =
588 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
589
590 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
592 assert!(write_cache.file_cache.contains_key(&key));
593
594 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
596 let cache_data = local_store
597 .read(&write_cache.file_cache.cache_file_path(key))
598 .await
599 .unwrap();
600 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
601
602 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
604 assert!(write_cache.file_cache.contains_key(&index_key));
605
606 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
607 let cache_index_data = local_store
608 .read(&write_cache.file_cache.cache_file_path(index_key))
609 .await
610 .unwrap();
611 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
612
613 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
615 write_cache.remove(sst_index_key).await;
616 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
617 write_cache.remove(index_key).await;
618 assert!(!write_cache.file_cache.contains_key(&index_key));
619 }
620
621 #[tokio::test]
622 async fn test_read_metadata_from_write_cache() {
623 common_telemetry::init_default_ut_logging();
624 let mut env = TestEnv::new().await;
625 let data_home = env.data_home().display().to_string();
626 let mock_store = env.init_object_store_manager();
627
628 let local_dir = create_temp_dir("");
629 let local_path = local_dir.path().to_str().unwrap();
630 let local_store = new_fs_store(local_path);
631
632 let write_cache = env
634 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
635 .await;
636 let cache_manager = Arc::new(
637 CacheManager::builder()
638 .write_cache(Some(write_cache.clone()))
639 .build(),
640 );
641
642 let metadata = Arc::new(sst_region_metadata());
644
645 let source = new_flat_source_from_record_batches(vec![
646 new_record_batch_by_range(&["a", "d"], 0, 60),
647 new_record_batch_by_range(&["b", "f"], 0, 40),
648 new_record_batch_by_range(&["b", "h"], 100, 200),
649 ]);
650
651 let write_request = SstWriteRequest {
653 op_type: OperationType::Flush,
654 metadata,
655 source,
656 storage: None,
657 max_sequence: None,
658 sst_write_format: Default::default(),
659 cache_manager: cache_manager.clone(),
660 index_options: IndexOptions::default(),
661 index_config: Default::default(),
662 inverted_index_config: Default::default(),
663 fulltext_index_config: Default::default(),
664 bloom_filter_index_config: Default::default(),
665 #[cfg(feature = "vector_index")]
666 vector_index_config: Default::default(),
667 };
668 let write_opts = WriteOptions {
669 row_group_size: 512,
670 ..Default::default()
671 };
672 let upload_request = SstUploadRequest {
673 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
674 remote_store: mock_store.clone(),
675 };
676
677 let mut metrics = Metrics::new(WriteType::Flush);
678 let mut sst_infos = write_cache
679 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
680 .await
681 .unwrap();
682 let sst_info = sst_infos.remove(0);
683 let write_parquet_metadata = sst_info.file_metadata.unwrap();
684
685 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
687 let builder = ParquetReaderBuilder::new(
688 data_home,
689 PathType::Bare,
690 handle.clone(),
691 mock_store.clone(),
692 )
693 .cache(CacheStrategy::EnableAll(cache_manager.clone()))
694 .page_index_policy(PageIndexPolicy::Optional);
695 let reader = builder.build().await.unwrap().unwrap();
696 let cached_write_parquet_metadata = crate::cache::CachedSstMeta::try_new(
697 "test.sst",
698 Arc::unwrap_or_clone(write_parquet_metadata),
699 )
700 .unwrap()
701 .parquet_metadata();
702
703 assert_parquet_metadata_equal(cached_write_parquet_metadata, reader.parquet_metadata());
705 }
706
707 #[tokio::test]
708 async fn test_write_cache_clean_tmp_files() {
709 common_telemetry::init_default_ut_logging();
710 let mut env = TestEnv::new().await;
711 let data_home = env.data_home().display().to_string();
712 let mock_store = env.init_object_store_manager();
713
714 let write_cache_dir = create_temp_dir("");
715 let write_cache_path = write_cache_dir.path().to_str().unwrap();
716 let write_cache = env
717 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
718 .await;
719
720 let cache_manager = Arc::new(
722 CacheManager::builder()
723 .write_cache(Some(write_cache.clone()))
724 .build(),
725 );
726
727 let metadata = Arc::new(sst_region_metadata());
729
730 let source = FlatSource::Iter(Box::new(
732 [
733 Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
734 InvalidBatchSnafu {
735 reason: "Abort the writer",
736 }
737 .fail(),
738 ]
739 .into_iter(),
740 ));
741
742 let write_request = SstWriteRequest {
744 op_type: OperationType::Flush,
745 metadata,
746 source,
747 storage: None,
748 max_sequence: None,
749 sst_write_format: Default::default(),
750 cache_manager: cache_manager.clone(),
751 index_options: IndexOptions::default(),
752 index_config: Default::default(),
753 inverted_index_config: Default::default(),
754 fulltext_index_config: Default::default(),
755 bloom_filter_index_config: Default::default(),
756 #[cfg(feature = "vector_index")]
757 vector_index_config: Default::default(),
758 };
759 let write_opts = WriteOptions {
760 row_group_size: 512,
761 ..Default::default()
762 };
763 let upload_request = SstUploadRequest {
764 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
765 remote_store: mock_store.clone(),
766 };
767
768 let mut metrics = Metrics::new(WriteType::Flush);
769 write_cache
770 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
771 .await
772 .unwrap_err();
773 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
774 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
775 let mut has_files = false;
776 while let Some(entry) = entries.next_entry().await.unwrap() {
777 if entry.file_type().await.unwrap().is_dir() {
778 continue;
779 }
780 has_files = true;
781 common_telemetry::warn!(
782 "Found remaining temporary file in atomic dir: {}",
783 entry.path().display()
784 );
785 }
786
787 assert!(!has_files);
788 }
789
790 #[tokio::test]
791 async fn test_download_if_absent_skips_when_cached() {
792 let mut env = TestEnv::new().await;
793 let remote_store = env.init_object_store_manager();
794
795 let local_dir = create_temp_dir("");
796 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
797 let write_cache = env
798 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
799 .await;
800
801 let region_id = RegionId::new(1024, 1);
802 let file_id = FileId::random();
803 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
804 write_cache
805 .file_cache()
806 .put(key, IndexValue { file_size: 1 })
807 .await;
808
809 let downloaded = write_cache
810 .download_if_absent(key, "missing/path.parquet", &remote_store, 1)
811 .await
812 .unwrap();
813
814 assert!(!downloaded);
815 }
816
817 #[tokio::test]
818 async fn test_download_if_absent_downloads_when_missing() {
819 let mut env = TestEnv::new().await;
820 let remote_store = env.init_object_store_manager();
821
822 let local_dir = create_temp_dir("");
823 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
824 let write_cache = env
825 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
826 .await;
827
828 let region_id = RegionId::new(1024, 2);
829 let file_id = FileId::random();
830 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
831 let remote_path = format!("download-if-absent/{file_id}.parquet");
832 let remote_data = Bytes::from_static(b"download-if-absent-test");
833 remote_store
834 .write(&remote_path, remote_data.clone())
835 .await
836 .unwrap();
837
838 let downloaded = write_cache
839 .download_if_absent(key, &remote_path, &remote_store, remote_data.len() as u64)
840 .await
841 .unwrap();
842
843 assert!(downloaded);
844 assert!(write_cache.file_cache().contains_key(&key));
845
846 let cached_data = local_store
847 .read(&write_cache.file_cache().cache_file_path(key))
848 .await
849 .unwrap();
850 assert_eq!(cached_data.to_vec(), remote_data.to_vec());
851 }
852}