mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28    FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29    TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34    UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::PuffinManagerFactory;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59    /// Create the cache with a `local_store` to cache files and a
60    /// `object_store_manager` for all object stores.
61    pub async fn new(
62        local_store: ObjectStore,
63        cache_capacity: ReadableSize,
64        ttl: Option<Duration>,
65        puffin_manager_factory: PuffinManagerFactory,
66        intermediate_manager: IntermediateManager,
67    ) -> Result<Self> {
68        let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69        file_cache.recover(false).await;
70
71        Ok(Self {
72            file_cache,
73            puffin_manager_factory,
74            intermediate_manager,
75        })
76    }
77
78    /// Creates a write cache based on local fs.
79    pub async fn new_fs(
80        cache_dir: &str,
81        cache_capacity: ReadableSize,
82        ttl: Option<Duration>,
83        puffin_manager_factory: PuffinManagerFactory,
84        intermediate_manager: IntermediateManager,
85    ) -> Result<Self> {
86        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88        let local_store = new_fs_cache_store(cache_dir).await?;
89        Self::new(
90            local_store,
91            cache_capacity,
92            ttl,
93            puffin_manager_factory,
94            intermediate_manager,
95        )
96        .await
97    }
98
99    /// Returns the file cache of the write cache.
100    pub(crate) fn file_cache(&self) -> FileCacheRef {
101        self.file_cache.clone()
102    }
103
104    /// Put encoded SST data to the cache and upload to the remote object store.
105    pub(crate) async fn put_and_upload_sst(
106        &self,
107        data: &bytes::Bytes,
108        region_id: RegionId,
109        sst_info: &SstInfo,
110        upload_request: SstUploadRequest,
111    ) -> Result<Metrics> {
112        let file_id = sst_info.file_id;
113        let mut metrics = Metrics::new(WriteType::Flush);
114
115        // Create index key for the SST file
116        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
117
118        // Write to cache first
119        let cache_start = Instant::now();
120        let cache_path = self.file_cache.cache_file_path(parquet_key);
121        let mut cache_writer = self
122            .file_cache
123            .local_store()
124            .writer(&cache_path)
125            .await
126            .context(crate::error::OpenDalSnafu)?;
127
128        cache_writer
129            .write(data.clone())
130            .await
131            .context(crate::error::OpenDalSnafu)?;
132        cache_writer
133            .close()
134            .await
135            .context(crate::error::OpenDalSnafu)?;
136
137        // Register in file cache
138        let index_value = IndexValue {
139            file_size: data.len() as u32,
140        };
141        self.file_cache.put(parquet_key, index_value).await;
142        metrics.write_batch = cache_start.elapsed();
143
144        // Upload to remote store
145        let upload_start = Instant::now();
146        let region_file_id = RegionFileId::new(region_id, file_id);
147        let remote_path = upload_request
148            .dest_path_provider
149            .build_sst_file_path(region_file_id);
150
151        if let Err(e) = self
152            .upload(parquet_key, &remote_path, &upload_request.remote_store)
153            .await
154        {
155            // Clean up cache on failure
156            self.remove(parquet_key).await;
157            return Err(e);
158        }
159
160        metrics.upload_parquet = upload_start.elapsed();
161        Ok(metrics)
162    }
163
164    /// Writes SST to the cache and then uploads it to the remote object store.
165    pub(crate) async fn write_and_upload_sst(
166        &self,
167        write_request: SstWriteRequest,
168        upload_request: SstUploadRequest,
169        write_opts: &WriteOptions,
170        write_type: WriteType,
171    ) -> Result<(SstInfoArray, Metrics)> {
172        let region_id = write_request.metadata.region_id;
173
174        let store = self.file_cache.local_store();
175        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
176        let indexer = IndexerBuilderImpl {
177            op_type: write_request.op_type,
178            metadata: write_request.metadata.clone(),
179            row_group_size: write_opts.row_group_size,
180            puffin_manager: self
181                .puffin_manager_factory
182                .build(store.clone(), path_provider.clone()),
183            intermediate_manager: self.intermediate_manager.clone(),
184            index_options: write_request.index_options,
185            inverted_index_config: write_request.inverted_index_config,
186            fulltext_index_config: write_request.fulltext_index_config,
187            bloom_filter_index_config: write_request.bloom_filter_index_config,
188        };
189
190        let cleaner = TempFileCleaner::new(region_id, store.clone());
191        // Write to FileCache.
192        let mut writer = ParquetWriter::new_with_object_store(
193            store.clone(),
194            write_request.metadata,
195            indexer,
196            path_provider.clone(),
197            Metrics::new(write_type),
198        )
199        .await
200        .with_file_cleaner(cleaner);
201
202        let sst_info = match write_request.source {
203            either::Left(source) => {
204                writer
205                    .write_all(source, write_request.max_sequence, write_opts)
206                    .await?
207            }
208            either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
209        };
210        let mut metrics = writer.into_metrics();
211
212        // Upload sst file to remote object store.
213        if sst_info.is_empty() {
214            return Ok((sst_info, metrics));
215        }
216
217        let mut upload_tracker = UploadTracker::new(region_id);
218        let mut err = None;
219        let remote_store = &upload_request.remote_store;
220        for sst in &sst_info {
221            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
222            let parquet_path = upload_request
223                .dest_path_provider
224                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
225            let start = Instant::now();
226            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
227                err = Some(e);
228                break;
229            }
230            metrics.upload_parquet += start.elapsed();
231            upload_tracker.push_uploaded_file(parquet_path);
232
233            if sst.index_metadata.file_size > 0 {
234                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
235                let puffin_path = upload_request
236                    .dest_path_provider
237                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
238                let start = Instant::now();
239                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
240                    err = Some(e);
241                    break;
242                }
243                metrics.upload_puffin += start.elapsed();
244                upload_tracker.push_uploaded_file(puffin_path);
245            }
246        }
247
248        if let Some(err) = err {
249            // Cleans files on failure.
250            upload_tracker
251                .clean(&sst_info, &self.file_cache, remote_store)
252                .await;
253            return Err(err);
254        }
255
256        Ok((sst_info, metrics))
257    }
258
259    /// Removes a file from the cache by `index_key`.
260    pub(crate) async fn remove(&self, index_key: IndexKey) {
261        self.file_cache.remove(index_key).await
262    }
263
264    /// Downloads a file in `remote_path` from the remote object store to the local cache
265    /// (specified by `index_key`).
266    pub(crate) async fn download(
267        &self,
268        index_key: IndexKey,
269        remote_path: &str,
270        remote_store: &ObjectStore,
271        file_size: u64,
272    ) -> Result<()> {
273        if let Err(e) = self
274            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
275            .await
276        {
277            let filename = index_key.to_string();
278            TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
279                .await;
280
281            return Err(e);
282        }
283        Ok(())
284    }
285
286    async fn download_without_cleaning(
287        &self,
288        index_key: IndexKey,
289        remote_path: &str,
290        remote_store: &ObjectStore,
291        file_size: u64,
292    ) -> Result<()> {
293        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
294        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
295
296        let file_type = index_key.file_type;
297        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
298            .with_label_values(&[match file_type {
299                FileType::Parquet => "download_parquet",
300                FileType::Puffin => "download_puffin",
301            }])
302            .start_timer();
303
304        let reader = remote_store
305            .reader_with(remote_path)
306            .concurrent(DOWNLOAD_READER_CONCURRENCY)
307            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
308            .await
309            .context(error::OpenDalSnafu)?
310            .into_futures_async_read(0..file_size)
311            .await
312            .context(error::OpenDalSnafu)?;
313
314        let cache_path = self.file_cache.cache_file_path(index_key);
315        let mut writer = self
316            .file_cache
317            .local_store()
318            .writer(&cache_path)
319            .await
320            .context(error::OpenDalSnafu)?
321            .into_futures_async_write();
322
323        let region_id = index_key.region_id;
324        let file_id = index_key.file_id;
325        let bytes_written =
326            futures::io::copy(reader, &mut writer)
327                .await
328                .context(error::DownloadSnafu {
329                    region_id,
330                    file_id,
331                    file_type,
332                })?;
333        writer.close().await.context(error::DownloadSnafu {
334            region_id,
335            file_id,
336            file_type,
337        })?;
338
339        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
340
341        let elapsed = timer.stop_and_record();
342        debug!(
343            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
344            remote_path, cache_path, bytes_written, region_id, elapsed,
345        );
346
347        let index_value = IndexValue {
348            file_size: bytes_written as _,
349        };
350        self.file_cache.put(index_key, index_value).await;
351        Ok(())
352    }
353
354    /// Uploads a Parquet file or a Puffin file to the remote object store.
355    async fn upload(
356        &self,
357        index_key: IndexKey,
358        upload_path: &str,
359        remote_store: &ObjectStore,
360    ) -> Result<()> {
361        let region_id = index_key.region_id;
362        let file_id = index_key.file_id;
363        let file_type = index_key.file_type;
364        let cache_path = self.file_cache.cache_file_path(index_key);
365
366        let start = Instant::now();
367        let cached_value = self
368            .file_cache
369            .local_store()
370            .stat(&cache_path)
371            .await
372            .context(error::OpenDalSnafu)?;
373        let reader = self
374            .file_cache
375            .local_store()
376            .reader(&cache_path)
377            .await
378            .context(error::OpenDalSnafu)?
379            .into_futures_async_read(0..cached_value.content_length())
380            .await
381            .context(error::OpenDalSnafu)?;
382
383        let mut writer = remote_store
384            .writer_with(upload_path)
385            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
386            .concurrent(DEFAULT_WRITE_CONCURRENCY)
387            .await
388            .context(error::OpenDalSnafu)?
389            .into_futures_async_write();
390
391        let bytes_written =
392            futures::io::copy(reader, &mut writer)
393                .await
394                .context(error::UploadSnafu {
395                    region_id,
396                    file_id,
397                    file_type,
398                })?;
399
400        // Must close to upload all data.
401        writer.close().await.context(error::UploadSnafu {
402            region_id,
403            file_id,
404            file_type,
405        })?;
406
407        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
408
409        debug!(
410            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
411            region_id,
412            file_id,
413            upload_path,
414            start.elapsed(),
415        );
416
417        let index_value = IndexValue {
418            file_size: bytes_written as _,
419        };
420        // Register to file cache
421        self.file_cache.put(index_key, index_value).await;
422
423        Ok(())
424    }
425}
426
427/// Request to write and upload a SST.
428pub struct SstUploadRequest {
429    /// Destination path provider of which SST files in write cache should be uploaded to.
430    pub dest_path_provider: RegionFilePathFactory,
431    /// Remote object store to upload.
432    pub remote_store: ObjectStore,
433}
434
435/// A structs to track files to upload and clean them if upload failed.
436struct UploadTracker {
437    /// Id of the region to track.
438    region_id: RegionId,
439    /// Paths of files uploaded successfully.
440    files_uploaded: Vec<String>,
441}
442
443impl UploadTracker {
444    /// Creates a new instance of `UploadTracker` for a given region.
445    fn new(region_id: RegionId) -> Self {
446        Self {
447            region_id,
448            files_uploaded: Vec::new(),
449        }
450    }
451
452    /// Add a file path to the list of uploaded files.
453    fn push_uploaded_file(&mut self, path: String) {
454        self.files_uploaded.push(path);
455    }
456
457    /// Cleans uploaded files and files in the file cache at best effort.
458    async fn clean(
459        &self,
460        sst_info: &SstInfoArray,
461        file_cache: &FileCacheRef,
462        remote_store: &ObjectStore,
463    ) {
464        common_telemetry::info!(
465            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
466            self.region_id,
467            sst_info.len()
468        );
469
470        // Cleans files in the file cache first.
471        for sst in sst_info {
472            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
473            file_cache.remove(parquet_key).await;
474
475            if sst.index_metadata.file_size > 0 {
476                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
477                file_cache.remove(puffin_key).await;
478            }
479        }
480
481        // Cleans uploaded files.
482        for file_path in &self.files_uploaded {
483            if let Err(e) = remote_store.delete(file_path).await {
484                common_telemetry::error!(e; "Failed to delete file {}", file_path);
485            }
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use common_test_util::temp_dir::create_temp_dir;
493    use object_store::ATOMIC_WRITE_DIR;
494    use store_api::region_request::PathType;
495
496    use super::*;
497    use crate::access_layer::OperationType;
498    use crate::cache::test_util::new_fs_store;
499    use crate::cache::{CacheManager, CacheStrategy};
500    use crate::error::InvalidBatchSnafu;
501    use crate::read::Source;
502    use crate::region::options::IndexOptions;
503    use crate::sst::parquet::reader::ParquetReaderBuilder;
504    use crate::test_util::TestEnv;
505    use crate::test_util::sst_util::{
506        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
507        sst_region_metadata,
508    };
509
510    #[tokio::test]
511    async fn test_write_and_upload_sst() {
512        // TODO(QuenKar): maybe find a way to create some object server for testing,
513        // and now just use local file system to mock.
514        let mut env = TestEnv::new().await;
515        let mock_store = env.init_object_store_manager();
516        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
517
518        let local_dir = create_temp_dir("");
519        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
520
521        let write_cache = env
522            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
523            .await;
524
525        // Create Source
526        let metadata = Arc::new(sst_region_metadata());
527        let region_id = metadata.region_id;
528        let source = new_source(&[
529            new_batch_by_range(&["a", "d"], 0, 60),
530            new_batch_by_range(&["b", "f"], 0, 40),
531            new_batch_by_range(&["b", "h"], 100, 200),
532        ]);
533
534        let write_request = SstWriteRequest {
535            op_type: OperationType::Flush,
536            metadata,
537            source: either::Left(source),
538            storage: None,
539            max_sequence: None,
540            cache_manager: Default::default(),
541            index_options: IndexOptions::default(),
542            inverted_index_config: Default::default(),
543            fulltext_index_config: Default::default(),
544            bloom_filter_index_config: Default::default(),
545        };
546
547        let upload_request = SstUploadRequest {
548            dest_path_provider: path_provider.clone(),
549            remote_store: mock_store.clone(),
550        };
551
552        let write_opts = WriteOptions {
553            row_group_size: 512,
554            ..Default::default()
555        };
556
557        // Write to cache and upload sst to mock remote store
558        let (mut sst_infos, _) = write_cache
559            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
560            .await
561            .unwrap();
562        let sst_info = sst_infos.remove(0);
563
564        let file_id = sst_info.file_id;
565        let sst_upload_path =
566            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
567        let index_upload_path =
568            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
569
570        // Check write cache contains the key
571        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
572        assert!(write_cache.file_cache.contains_key(&key));
573
574        // Check file data
575        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
576        let cache_data = local_store
577            .read(&write_cache.file_cache.cache_file_path(key))
578            .await
579            .unwrap();
580        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
581
582        // Check write cache contains the index key
583        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
584        assert!(write_cache.file_cache.contains_key(&index_key));
585
586        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
587        let cache_index_data = local_store
588            .read(&write_cache.file_cache.cache_file_path(index_key))
589            .await
590            .unwrap();
591        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
592
593        // Removes the file from the cache.
594        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
595        write_cache.remove(sst_index_key).await;
596        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
597        write_cache.remove(index_key).await;
598        assert!(!write_cache.file_cache.contains_key(&index_key));
599    }
600
601    #[tokio::test]
602    async fn test_read_metadata_from_write_cache() {
603        common_telemetry::init_default_ut_logging();
604        let mut env = TestEnv::new().await;
605        let data_home = env.data_home().display().to_string();
606        let mock_store = env.init_object_store_manager();
607
608        let local_dir = create_temp_dir("");
609        let local_path = local_dir.path().to_str().unwrap();
610        let local_store = new_fs_store(local_path);
611
612        // Create a cache manager using only write cache
613        let write_cache = env
614            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
615            .await;
616        let cache_manager = Arc::new(
617            CacheManager::builder()
618                .write_cache(Some(write_cache.clone()))
619                .build(),
620        );
621
622        // Create source
623        let metadata = Arc::new(sst_region_metadata());
624
625        let source = new_source(&[
626            new_batch_by_range(&["a", "d"], 0, 60),
627            new_batch_by_range(&["b", "f"], 0, 40),
628            new_batch_by_range(&["b", "h"], 100, 200),
629        ]);
630
631        // Write to local cache and upload sst to mock remote store
632        let write_request = SstWriteRequest {
633            op_type: OperationType::Flush,
634            metadata,
635            source: either::Left(source),
636            storage: None,
637            max_sequence: None,
638            cache_manager: cache_manager.clone(),
639            index_options: IndexOptions::default(),
640            inverted_index_config: Default::default(),
641            fulltext_index_config: Default::default(),
642            bloom_filter_index_config: Default::default(),
643        };
644        let write_opts = WriteOptions {
645            row_group_size: 512,
646            ..Default::default()
647        };
648        let upload_request = SstUploadRequest {
649            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
650            remote_store: mock_store.clone(),
651        };
652
653        let (mut sst_infos, _) = write_cache
654            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
655            .await
656            .unwrap();
657        let sst_info = sst_infos.remove(0);
658        let write_parquet_metadata = sst_info.file_metadata.unwrap();
659
660        // Read metadata from write cache
661        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
662        let builder = ParquetReaderBuilder::new(
663            data_home,
664            PathType::Bare,
665            handle.clone(),
666            mock_store.clone(),
667        )
668        .cache(CacheStrategy::EnableAll(cache_manager.clone()));
669        let reader = builder.build().await.unwrap();
670
671        // Check parquet metadata
672        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
673    }
674
675    #[tokio::test]
676    async fn test_write_cache_clean_tmp_files() {
677        common_telemetry::init_default_ut_logging();
678        let mut env = TestEnv::new().await;
679        let data_home = env.data_home().display().to_string();
680        let mock_store = env.init_object_store_manager();
681
682        let write_cache_dir = create_temp_dir("");
683        let write_cache_path = write_cache_dir.path().to_str().unwrap();
684        let write_cache = env
685            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
686            .await;
687
688        // Create a cache manager using only write cache
689        let cache_manager = Arc::new(
690            CacheManager::builder()
691                .write_cache(Some(write_cache.clone()))
692                .build(),
693        );
694
695        // Create source
696        let metadata = Arc::new(sst_region_metadata());
697
698        // Creates a source that can return an error to abort the writer.
699        let source = Source::Iter(Box::new(
700            [
701                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
702                InvalidBatchSnafu {
703                    reason: "Abort the writer",
704                }
705                .fail(),
706            ]
707            .into_iter(),
708        ));
709
710        // Write to local cache and upload sst to mock remote store
711        let write_request = SstWriteRequest {
712            op_type: OperationType::Flush,
713            metadata,
714            source: either::Left(source),
715            storage: None,
716            max_sequence: None,
717            cache_manager: cache_manager.clone(),
718            index_options: IndexOptions::default(),
719            inverted_index_config: Default::default(),
720            fulltext_index_config: Default::default(),
721            bloom_filter_index_config: Default::default(),
722        };
723        let write_opts = WriteOptions {
724            row_group_size: 512,
725            ..Default::default()
726        };
727        let upload_request = SstUploadRequest {
728            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
729            remote_store: mock_store.clone(),
730        };
731
732        write_cache
733            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
734            .await
735            .unwrap_err();
736        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
737        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
738        let mut has_files = false;
739        while let Some(entry) = entries.next_entry().await.unwrap() {
740            if entry.file_type().await.unwrap().is_dir() {
741                continue;
742            }
743            has_files = true;
744            common_telemetry::warn!(
745                "Found remaining temporary file in atomic dir: {}",
746                entry.path().display()
747            );
748        }
749
750        assert!(!has_files);
751    }
752}