mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29    FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30    TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45/// A cache for uploading files to remote object stores.
46///
47/// It keeps files in local disk and then sends files to object stores.
48pub struct WriteCache {
49    /// Local file cache.
50    file_cache: FileCacheRef,
51    /// Puffin manager factory for index.
52    puffin_manager_factory: PuffinManagerFactory,
53    /// Intermediate manager for index.
54    intermediate_manager: IntermediateManager,
55    /// Sender for region load cache tasks.
56    task_sender: UnboundedSender<RegionLoadCacheTask>,
57    /// Optional cache for manifest files.
58    manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64    /// Create the cache with a `local_store` to cache files and a
65    /// `object_store_manager` for all object stores.
66    #[allow(clippy::too_many_arguments)]
67    pub async fn new(
68        local_store: ObjectStore,
69        cache_capacity: ReadableSize,
70        ttl: Option<Duration>,
71        index_cache_percent: Option<u8>,
72        enable_background_worker: bool,
73        puffin_manager_factory: PuffinManagerFactory,
74        intermediate_manager: IntermediateManager,
75        manifest_cache: Option<ManifestCache>,
76    ) -> Result<Self> {
77        let (task_sender, task_receiver) = unbounded_channel();
78
79        let file_cache = Arc::new(FileCache::new(
80            local_store,
81            cache_capacity,
82            ttl,
83            index_cache_percent,
84            enable_background_worker,
85        ));
86        file_cache.recover(false, Some(task_receiver)).await;
87
88        Ok(Self {
89            file_cache,
90            puffin_manager_factory,
91            intermediate_manager,
92            task_sender,
93            manifest_cache,
94        })
95    }
96
97    /// Creates a write cache based on local fs.
98    #[allow(clippy::too_many_arguments)]
99    pub async fn new_fs(
100        cache_dir: &str,
101        cache_capacity: ReadableSize,
102        ttl: Option<Duration>,
103        index_cache_percent: Option<u8>,
104        enable_background_worker: bool,
105        puffin_manager_factory: PuffinManagerFactory,
106        intermediate_manager: IntermediateManager,
107        manifest_cache_capacity: ReadableSize,
108    ) -> Result<Self> {
109        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
110
111        let local_store = new_fs_cache_store(cache_dir).await?;
112
113        // Create manifest cache if capacity is non-zero
114        let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
115            Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl, false).await)
116        } else {
117            None
118        };
119
120        Self::new(
121            local_store,
122            cache_capacity,
123            ttl,
124            index_cache_percent,
125            enable_background_worker,
126            puffin_manager_factory,
127            intermediate_manager,
128            manifest_cache,
129        )
130        .await
131    }
132
133    /// Returns the file cache of the write cache.
134    pub(crate) fn file_cache(&self) -> FileCacheRef {
135        self.file_cache.clone()
136    }
137
138    /// Returns the manifest cache if available.
139    pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
140        self.manifest_cache.clone()
141    }
142
143    /// Build the puffin manager
144    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
145        let store = self.file_cache.local_store();
146        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
147        self.puffin_manager_factory.build(store, path_provider)
148    }
149
150    /// Put encoded SST data to the cache and upload to the remote object store.
151    pub(crate) async fn put_and_upload_sst(
152        &self,
153        data: &bytes::Bytes,
154        region_id: RegionId,
155        sst_info: &SstInfo,
156        upload_request: SstUploadRequest,
157    ) -> Result<Metrics> {
158        let file_id = sst_info.file_id;
159        let mut metrics = Metrics::new(WriteType::Flush);
160
161        // Create index key for the SST file
162        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
163
164        // Write to cache first
165        let cache_start = Instant::now();
166        let cache_path = self.file_cache.cache_file_path(parquet_key);
167        let store = self.file_cache.local_store();
168        let cleaner = TempFileCleaner::new(region_id, store.clone());
169        let write_res = store
170            .write(&cache_path, data.clone())
171            .await
172            .context(crate::error::OpenDalSnafu);
173        if let Err(e) = write_res {
174            cleaner.clean_by_file_id(file_id).await;
175            return Err(e);
176        }
177
178        metrics.write_batch = cache_start.elapsed();
179
180        // Upload to remote store
181        let upload_start = Instant::now();
182        let region_file_id = RegionFileId::new(region_id, file_id);
183        let remote_path = upload_request
184            .dest_path_provider
185            .build_sst_file_path(region_file_id);
186
187        if let Err(e) = self
188            .upload(parquet_key, &remote_path, &upload_request.remote_store)
189            .await
190        {
191            // Clean up cache on failure
192            self.remove(parquet_key).await;
193            return Err(e);
194        }
195
196        metrics.upload_parquet = upload_start.elapsed();
197        Ok(metrics)
198    }
199
200    /// Returns the intermediate manager of the write cache.
201    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
202        &self.intermediate_manager
203    }
204
205    /// Writes SST to the cache and then uploads it to the remote object store.
206    pub(crate) async fn write_and_upload_sst(
207        &self,
208        write_request: SstWriteRequest,
209        upload_request: SstUploadRequest,
210        write_opts: &WriteOptions,
211        metrics: &mut Metrics,
212    ) -> Result<SstInfoArray> {
213        let region_id = write_request.metadata.region_id;
214
215        let store = self.file_cache.local_store();
216        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
217        let indexer = IndexerBuilderImpl {
218            build_type: write_request.op_type.into(),
219            metadata: write_request.metadata.clone(),
220            row_group_size: write_opts.row_group_size,
221            puffin_manager: self
222                .puffin_manager_factory
223                .build(store.clone(), path_provider.clone()),
224            write_cache_enabled: true,
225            intermediate_manager: self.intermediate_manager.clone(),
226            index_options: write_request.index_options,
227            inverted_index_config: write_request.inverted_index_config,
228            fulltext_index_config: write_request.fulltext_index_config,
229            bloom_filter_index_config: write_request.bloom_filter_index_config,
230            #[cfg(feature = "vector_index")]
231            vector_index_config: write_request.vector_index_config,
232        };
233
234        let cleaner = TempFileCleaner::new(region_id, store.clone());
235        // Write to FileCache.
236        let mut writer = ParquetWriter::new_with_object_store(
237            store.clone(),
238            write_request.metadata,
239            write_request.index_config,
240            indexer,
241            path_provider.clone(),
242            metrics,
243        )
244        .await
245        .with_file_cleaner(cleaner);
246
247        let sst_info = match write_request.sst_write_format {
248            crate::sst::FormatType::PrimaryKey => {
249                writer
250                    .write_all_flat_as_primary_key(
251                        write_request.source,
252                        write_request.max_sequence,
253                        write_opts,
254                    )
255                    .await?
256            }
257            crate::sst::FormatType::Flat => {
258                writer
259                    .write_all_flat(write_request.source, write_request.max_sequence, write_opts)
260                    .await?
261            }
262        };
263
264        // Upload sst file to remote object store.
265        if sst_info.is_empty() {
266            return Ok(sst_info);
267        }
268
269        let mut upload_tracker = UploadTracker::new(region_id);
270        let mut err = None;
271        let remote_store = &upload_request.remote_store;
272        for sst in &sst_info {
273            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
274            let parquet_path = upload_request
275                .dest_path_provider
276                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
277            let start = Instant::now();
278            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
279                err = Some(e);
280                break;
281            }
282            metrics.upload_parquet += start.elapsed();
283            upload_tracker.push_uploaded_file(parquet_path);
284
285            if sst.index_metadata.file_size > 0 {
286                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
287                let puffin_path = upload_request
288                    .dest_path_provider
289                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
290                let start = Instant::now();
291                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
292                    err = Some(e);
293                    break;
294                }
295                metrics.upload_puffin += start.elapsed();
296                upload_tracker.push_uploaded_file(puffin_path);
297            }
298        }
299
300        if let Some(err) = err {
301            // Cleans files on failure.
302            upload_tracker
303                .clean(&sst_info, &self.file_cache, remote_store)
304                .await;
305            return Err(err);
306        }
307
308        Ok(sst_info)
309    }
310
311    /// Removes a file from the cache by `index_key`.
312    pub(crate) async fn remove(&self, index_key: IndexKey) {
313        self.file_cache.remove(index_key).await
314    }
315
316    /// Downloads a file in `remote_path` from the remote object store to the local cache
317    /// (specified by `index_key`).
318    pub(crate) async fn download(
319        &self,
320        index_key: IndexKey,
321        remote_path: &str,
322        remote_store: &ObjectStore,
323        file_size: u64,
324    ) -> Result<()> {
325        self.file_cache
326            .download(index_key, remote_path, remote_store, file_size)
327            .await
328    }
329
330    /// Downloads the target file into write cache only when it is not cached.
331    ///
332    /// Returns `Ok(true)` if this call performs a download, or `Ok(false)` if the
333    /// file is already present in write cache and download is skipped.
334    pub(crate) async fn download_if_absent(
335        &self,
336        index_key: IndexKey,
337        remote_path: &str,
338        remote_store: &ObjectStore,
339        file_size: u64,
340    ) -> Result<bool> {
341        if self.file_cache.contains_key(&index_key) {
342            debug!(
343                "Skip downloading file already in write cache, region: {}, file: {}",
344                index_key.region_id, index_key.file_id
345            );
346            return Ok(false);
347        }
348
349        self.download(index_key, remote_path, remote_store, file_size)
350            .await?;
351        Ok(true)
352    }
353
354    /// Uploads a Parquet file or a Puffin file to the remote object store.
355    pub(crate) async fn upload(
356        &self,
357        index_key: IndexKey,
358        upload_path: &str,
359        remote_store: &ObjectStore,
360    ) -> Result<()> {
361        let region_id = index_key.region_id;
362        let file_id = index_key.file_id;
363        let file_type = index_key.file_type;
364        let cache_path = self.file_cache.cache_file_path(index_key);
365
366        let start = Instant::now();
367        let cached_value = self
368            .file_cache
369            .local_store()
370            .stat(&cache_path)
371            .await
372            .context(error::OpenDalSnafu)?;
373        let reader = self
374            .file_cache
375            .local_store()
376            .reader(&cache_path)
377            .await
378            .context(error::OpenDalSnafu)?
379            .into_futures_async_read(0..cached_value.content_length())
380            .await
381            .context(error::OpenDalSnafu)?;
382
383        let mut writer = remote_store
384            .writer_with(upload_path)
385            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
386            .concurrent(DEFAULT_WRITE_CONCURRENCY)
387            .await
388            .context(error::OpenDalSnafu)?
389            .into_futures_async_write();
390
391        let bytes_written =
392            futures::io::copy(reader, &mut writer)
393                .await
394                .context(error::UploadSnafu {
395                    region_id,
396                    file_id,
397                    file_type,
398                })?;
399
400        // Must close to upload all data.
401        writer.close().await.context(error::UploadSnafu {
402            region_id,
403            file_id,
404            file_type,
405        })?;
406
407        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
408
409        debug!(
410            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
411            region_id,
412            file_id,
413            upload_path,
414            start.elapsed(),
415        );
416
417        let index_value = IndexValue {
418            file_size: bytes_written as _,
419        };
420        // Register to file cache
421        self.file_cache.put(index_key, index_value).await;
422
423        Ok(())
424    }
425
426    /// Sends a region load cache task to the background processing queue.
427    ///
428    /// If the receiver has been dropped, the error is ignored.
429    pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
430        let _ = self.task_sender.send(task);
431    }
432}
433
434/// Request to write and upload a SST.
435pub struct SstUploadRequest {
436    /// Destination path provider of which SST files in write cache should be uploaded to.
437    pub dest_path_provider: RegionFilePathFactory,
438    /// Remote object store to upload.
439    pub remote_store: ObjectStore,
440}
441
442/// A structs to track files to upload and clean them if upload failed.
443pub(crate) struct UploadTracker {
444    /// Id of the region to track.
445    region_id: RegionId,
446    /// Paths of files uploaded successfully.
447    files_uploaded: Vec<String>,
448}
449
450impl UploadTracker {
451    /// Creates a new instance of `UploadTracker` for a given region.
452    pub(crate) fn new(region_id: RegionId) -> Self {
453        Self {
454            region_id,
455            files_uploaded: Vec::new(),
456        }
457    }
458
459    /// Add a file path to the list of uploaded files.
460    pub(crate) fn push_uploaded_file(&mut self, path: String) {
461        self.files_uploaded.push(path);
462    }
463
464    /// Cleans uploaded files and files in the file cache at best effort.
465    pub(crate) async fn clean(
466        &self,
467        sst_info: &SstInfoArray,
468        file_cache: &FileCacheRef,
469        remote_store: &ObjectStore,
470    ) {
471        common_telemetry::info!(
472            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
473            self.region_id,
474            sst_info.len()
475        );
476
477        // Cleans files in the file cache first.
478        for sst in sst_info {
479            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
480            file_cache.remove(parquet_key).await;
481
482            if sst.index_metadata.file_size > 0 {
483                let puffin_key = IndexKey::new(
484                    self.region_id,
485                    sst.file_id,
486                    FileType::Puffin(sst.index_metadata.version),
487                );
488                file_cache.remove(puffin_key).await;
489            }
490        }
491
492        // Cleans uploaded files.
493        for file_path in &self.files_uploaded {
494            if let Err(e) = remote_store.delete(file_path).await {
495                common_telemetry::error!(e; "Failed to delete file {}", file_path);
496            }
497        }
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use bytes::Bytes;
504    use common_test_util::temp_dir::create_temp_dir;
505    use object_store::ATOMIC_WRITE_DIR;
506    use parquet::file::metadata::PageIndexPolicy;
507    use store_api::region_request::PathType;
508    use store_api::storage::FileId;
509
510    use super::*;
511    use crate::access_layer::OperationType;
512    use crate::cache::file_cache::IndexValue;
513    use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
514    use crate::cache::{CacheManager, CacheStrategy};
515    use crate::error::InvalidBatchSnafu;
516    use crate::read::FlatSource;
517    use crate::region::options::IndexOptions;
518    use crate::sst::parquet::reader::ParquetReaderBuilder;
519    use crate::test_util::TestEnv;
520    use crate::test_util::sst_util::{
521        new_flat_source_from_record_batches, new_record_batch_by_range,
522        sst_file_handle_with_file_id, sst_region_metadata,
523    };
524
525    #[tokio::test]
526    async fn test_write_and_upload_sst() {
527        // TODO(QuenKar): maybe find a way to create some object server for testing,
528        // and now just use local file system to mock.
529        let mut env = TestEnv::new().await;
530        let mock_store = env.init_object_store_manager();
531        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
532
533        let local_dir = create_temp_dir("");
534        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
535
536        let write_cache = env
537            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
538            .await;
539
540        // Create source.
541        let metadata = Arc::new(sst_region_metadata());
542        let region_id = metadata.region_id;
543        let source = new_flat_source_from_record_batches(vec![
544            new_record_batch_by_range(&["a", "d"], 0, 60),
545            new_record_batch_by_range(&["b", "f"], 0, 40),
546            new_record_batch_by_range(&["b", "h"], 100, 200),
547        ]);
548
549        let write_request = SstWriteRequest {
550            op_type: OperationType::Flush,
551            metadata,
552            source,
553            storage: None,
554            max_sequence: None,
555            sst_write_format: Default::default(),
556            cache_manager: Default::default(),
557            index_options: IndexOptions::default(),
558            index_config: Default::default(),
559            inverted_index_config: Default::default(),
560            fulltext_index_config: Default::default(),
561            bloom_filter_index_config: Default::default(),
562            #[cfg(feature = "vector_index")]
563            vector_index_config: Default::default(),
564        };
565
566        let upload_request = SstUploadRequest {
567            dest_path_provider: path_provider.clone(),
568            remote_store: mock_store.clone(),
569        };
570
571        let write_opts = WriteOptions {
572            row_group_size: 512,
573            ..Default::default()
574        };
575
576        // Write to cache and upload sst to mock remote store
577        let mut metrics = Metrics::new(WriteType::Flush);
578        let mut sst_infos = write_cache
579            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
580            .await
581            .unwrap();
582        let sst_info = sst_infos.remove(0);
583
584        let file_id = sst_info.file_id;
585        let sst_upload_path =
586            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
587        let index_upload_path =
588            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
589
590        // Check write cache contains the key
591        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
592        assert!(write_cache.file_cache.contains_key(&key));
593
594        // Check file data
595        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
596        let cache_data = local_store
597            .read(&write_cache.file_cache.cache_file_path(key))
598            .await
599            .unwrap();
600        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
601
602        // Check write cache contains the index key
603        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
604        assert!(write_cache.file_cache.contains_key(&index_key));
605
606        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
607        let cache_index_data = local_store
608            .read(&write_cache.file_cache.cache_file_path(index_key))
609            .await
610            .unwrap();
611        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
612
613        // Removes the file from the cache.
614        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
615        write_cache.remove(sst_index_key).await;
616        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
617        write_cache.remove(index_key).await;
618        assert!(!write_cache.file_cache.contains_key(&index_key));
619    }
620
621    #[tokio::test]
622    async fn test_read_metadata_from_write_cache() {
623        common_telemetry::init_default_ut_logging();
624        let mut env = TestEnv::new().await;
625        let data_home = env.data_home().display().to_string();
626        let mock_store = env.init_object_store_manager();
627
628        let local_dir = create_temp_dir("");
629        let local_path = local_dir.path().to_str().unwrap();
630        let local_store = new_fs_store(local_path);
631
632        // Create a cache manager using only write cache
633        let write_cache = env
634            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
635            .await;
636        let cache_manager = Arc::new(
637            CacheManager::builder()
638                .write_cache(Some(write_cache.clone()))
639                .build(),
640        );
641
642        // Create source
643        let metadata = Arc::new(sst_region_metadata());
644
645        let source = new_flat_source_from_record_batches(vec![
646            new_record_batch_by_range(&["a", "d"], 0, 60),
647            new_record_batch_by_range(&["b", "f"], 0, 40),
648            new_record_batch_by_range(&["b", "h"], 100, 200),
649        ]);
650
651        // Write to local cache and upload sst to mock remote store
652        let write_request = SstWriteRequest {
653            op_type: OperationType::Flush,
654            metadata,
655            source,
656            storage: None,
657            max_sequence: None,
658            sst_write_format: Default::default(),
659            cache_manager: cache_manager.clone(),
660            index_options: IndexOptions::default(),
661            index_config: Default::default(),
662            inverted_index_config: Default::default(),
663            fulltext_index_config: Default::default(),
664            bloom_filter_index_config: Default::default(),
665            #[cfg(feature = "vector_index")]
666            vector_index_config: Default::default(),
667        };
668        let write_opts = WriteOptions {
669            row_group_size: 512,
670            ..Default::default()
671        };
672        let upload_request = SstUploadRequest {
673            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
674            remote_store: mock_store.clone(),
675        };
676
677        let mut metrics = Metrics::new(WriteType::Flush);
678        let mut sst_infos = write_cache
679            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
680            .await
681            .unwrap();
682        let sst_info = sst_infos.remove(0);
683        let write_parquet_metadata = sst_info.file_metadata.unwrap();
684
685        // Read metadata from write cache
686        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
687        let builder = ParquetReaderBuilder::new(
688            data_home,
689            PathType::Bare,
690            handle.clone(),
691            mock_store.clone(),
692        )
693        .cache(CacheStrategy::EnableAll(cache_manager.clone()))
694        .page_index_policy(PageIndexPolicy::Optional);
695        let reader = builder.build().await.unwrap().unwrap();
696        let cached_write_parquet_metadata = crate::cache::CachedSstMeta::try_new(
697            "test.sst",
698            Arc::unwrap_or_clone(write_parquet_metadata),
699        )
700        .unwrap()
701        .parquet_metadata();
702
703        // Check parquet metadata
704        assert_parquet_metadata_equal(cached_write_parquet_metadata, reader.parquet_metadata());
705    }
706
707    #[tokio::test]
708    async fn test_write_cache_clean_tmp_files() {
709        common_telemetry::init_default_ut_logging();
710        let mut env = TestEnv::new().await;
711        let data_home = env.data_home().display().to_string();
712        let mock_store = env.init_object_store_manager();
713
714        let write_cache_dir = create_temp_dir("");
715        let write_cache_path = write_cache_dir.path().to_str().unwrap();
716        let write_cache = env
717            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
718            .await;
719
720        // Create a cache manager using only write cache
721        let cache_manager = Arc::new(
722            CacheManager::builder()
723                .write_cache(Some(write_cache.clone()))
724                .build(),
725        );
726
727        // Create source
728        let metadata = Arc::new(sst_region_metadata());
729
730        // Creates a source that can return an error to abort the writer.
731        let source = FlatSource::Iter(Box::new(
732            [
733                Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
734                InvalidBatchSnafu {
735                    reason: "Abort the writer",
736                }
737                .fail(),
738            ]
739            .into_iter(),
740        ));
741
742        // Write to local cache and upload sst to mock remote store
743        let write_request = SstWriteRequest {
744            op_type: OperationType::Flush,
745            metadata,
746            source,
747            storage: None,
748            max_sequence: None,
749            sst_write_format: Default::default(),
750            cache_manager: cache_manager.clone(),
751            index_options: IndexOptions::default(),
752            index_config: Default::default(),
753            inverted_index_config: Default::default(),
754            fulltext_index_config: Default::default(),
755            bloom_filter_index_config: Default::default(),
756            #[cfg(feature = "vector_index")]
757            vector_index_config: Default::default(),
758        };
759        let write_opts = WriteOptions {
760            row_group_size: 512,
761            ..Default::default()
762        };
763        let upload_request = SstUploadRequest {
764            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
765            remote_store: mock_store.clone(),
766        };
767
768        let mut metrics = Metrics::new(WriteType::Flush);
769        write_cache
770            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
771            .await
772            .unwrap_err();
773        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
774        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
775        let mut has_files = false;
776        while let Some(entry) = entries.next_entry().await.unwrap() {
777            if entry.file_type().await.unwrap().is_dir() {
778                continue;
779            }
780            has_files = true;
781            common_telemetry::warn!(
782                "Found remaining temporary file in atomic dir: {}",
783                entry.path().display()
784            );
785        }
786
787        assert!(!has_files);
788    }
789
790    #[tokio::test]
791    async fn test_download_if_absent_skips_when_cached() {
792        let mut env = TestEnv::new().await;
793        let remote_store = env.init_object_store_manager();
794
795        let local_dir = create_temp_dir("");
796        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
797        let write_cache = env
798            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
799            .await;
800
801        let region_id = RegionId::new(1024, 1);
802        let file_id = FileId::random();
803        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
804        write_cache
805            .file_cache()
806            .put(key, IndexValue { file_size: 1 })
807            .await;
808
809        let downloaded = write_cache
810            .download_if_absent(key, "missing/path.parquet", &remote_store, 1)
811            .await
812            .unwrap();
813
814        assert!(!downloaded);
815    }
816
817    #[tokio::test]
818    async fn test_download_if_absent_downloads_when_missing() {
819        let mut env = TestEnv::new().await;
820        let remote_store = env.init_object_store_manager();
821
822        let local_dir = create_temp_dir("");
823        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
824        let write_cache = env
825            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
826            .await;
827
828        let region_id = RegionId::new(1024, 2);
829        let file_id = FileId::random();
830        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
831        let remote_path = format!("download-if-absent/{file_id}.parquet");
832        let remote_data = Bytes::from_static(b"download-if-absent-test");
833        remote_store
834            .write(&remote_path, remote_data.clone())
835            .await
836            .unwrap();
837
838        let downloaded = write_cache
839            .download_if_absent(key, &remote_path, &remote_store, remote_data.len() as u64)
840            .await
841            .unwrap();
842
843        assert!(downloaded);
844        assert!(write_cache.file_cache().contains_key(&key));
845
846        let cached_data = local_store
847            .read(&write_cache.file_cache().cache_file_path(key))
848            .await
849            .unwrap();
850        assert_eq!(cached_data.to_vec(), remote_data.to_vec());
851    }
852}