mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28    FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29    TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34    UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59    /// Create the cache with a `local_store` to cache files and a
60    /// `object_store_manager` for all object stores.
61    pub async fn new(
62        local_store: ObjectStore,
63        cache_capacity: ReadableSize,
64        ttl: Option<Duration>,
65        puffin_manager_factory: PuffinManagerFactory,
66        intermediate_manager: IntermediateManager,
67    ) -> Result<Self> {
68        let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69        file_cache.recover(false).await;
70
71        Ok(Self {
72            file_cache,
73            puffin_manager_factory,
74            intermediate_manager,
75        })
76    }
77
78    /// Creates a write cache based on local fs.
79    pub async fn new_fs(
80        cache_dir: &str,
81        cache_capacity: ReadableSize,
82        ttl: Option<Duration>,
83        puffin_manager_factory: PuffinManagerFactory,
84        intermediate_manager: IntermediateManager,
85    ) -> Result<Self> {
86        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88        let local_store = new_fs_cache_store(cache_dir).await?;
89        Self::new(
90            local_store,
91            cache_capacity,
92            ttl,
93            puffin_manager_factory,
94            intermediate_manager,
95        )
96        .await
97    }
98
99    /// Returns the file cache of the write cache.
100    pub(crate) fn file_cache(&self) -> FileCacheRef {
101        self.file_cache.clone()
102    }
103
104    /// Build the puffin manager
105    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
106        let store = self.file_cache.local_store();
107        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
108        self.puffin_manager_factory.build(store, path_provider)
109    }
110
111    /// Put encoded SST data to the cache and upload to the remote object store.
112    pub(crate) async fn put_and_upload_sst(
113        &self,
114        data: &bytes::Bytes,
115        region_id: RegionId,
116        sst_info: &SstInfo,
117        upload_request: SstUploadRequest,
118    ) -> Result<Metrics> {
119        let file_id = sst_info.file_id;
120        let mut metrics = Metrics::new(WriteType::Flush);
121
122        // Create index key for the SST file
123        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
124
125        // Write to cache first
126        let cache_start = Instant::now();
127        let cache_path = self.file_cache.cache_file_path(parquet_key);
128        let store = self.file_cache.local_store();
129        let cleaner = TempFileCleaner::new(region_id, store.clone());
130        let write_res = store
131            .write(&cache_path, data.clone())
132            .await
133            .context(crate::error::OpenDalSnafu);
134        if let Err(e) = write_res {
135            cleaner.clean_by_file_id(file_id).await;
136            return Err(e);
137        }
138
139        metrics.write_batch = cache_start.elapsed();
140
141        // Upload to remote store
142        let upload_start = Instant::now();
143        let region_file_id = RegionFileId::new(region_id, file_id);
144        let remote_path = upload_request
145            .dest_path_provider
146            .build_sst_file_path(region_file_id);
147
148        if let Err(e) = self
149            .upload(parquet_key, &remote_path, &upload_request.remote_store)
150            .await
151        {
152            // Clean up cache on failure
153            self.remove(parquet_key).await;
154            return Err(e);
155        }
156
157        metrics.upload_parquet = upload_start.elapsed();
158        Ok(metrics)
159    }
160
161    /// Returns the intermediate manager of the write cache.
162    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
163        &self.intermediate_manager
164    }
165
166    /// Writes SST to the cache and then uploads it to the remote object store.
167    pub(crate) async fn write_and_upload_sst(
168        &self,
169        write_request: SstWriteRequest,
170        upload_request: SstUploadRequest,
171        write_opts: &WriteOptions,
172        metrics: &mut Metrics,
173    ) -> Result<SstInfoArray> {
174        let region_id = write_request.metadata.region_id;
175
176        let store = self.file_cache.local_store();
177        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
178        let indexer = IndexerBuilderImpl {
179            build_type: write_request.op_type.into(),
180            metadata: write_request.metadata.clone(),
181            row_group_size: write_opts.row_group_size,
182            puffin_manager: self
183                .puffin_manager_factory
184                .build(store.clone(), path_provider.clone()),
185            intermediate_manager: self.intermediate_manager.clone(),
186            index_options: write_request.index_options,
187            inverted_index_config: write_request.inverted_index_config,
188            fulltext_index_config: write_request.fulltext_index_config,
189            bloom_filter_index_config: write_request.bloom_filter_index_config,
190        };
191
192        let cleaner = TempFileCleaner::new(region_id, store.clone());
193        // Write to FileCache.
194        let mut writer = ParquetWriter::new_with_object_store(
195            store.clone(),
196            write_request.metadata,
197            write_request.index_config,
198            indexer,
199            path_provider.clone(),
200            metrics,
201        )
202        .await
203        .with_file_cleaner(cleaner);
204
205        let sst_info = match write_request.source {
206            either::Left(source) => {
207                writer
208                    .write_all(source, write_request.max_sequence, write_opts)
209                    .await?
210            }
211            either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
212        };
213
214        // Upload sst file to remote object store.
215        if sst_info.is_empty() {
216            return Ok(sst_info);
217        }
218
219        let mut upload_tracker = UploadTracker::new(region_id);
220        let mut err = None;
221        let remote_store = &upload_request.remote_store;
222        for sst in &sst_info {
223            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
224            let parquet_path = upload_request
225                .dest_path_provider
226                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
227            let start = Instant::now();
228            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
229                err = Some(e);
230                break;
231            }
232            metrics.upload_parquet += start.elapsed();
233            upload_tracker.push_uploaded_file(parquet_path);
234
235            if sst.index_metadata.file_size > 0 {
236                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
237                let puffin_path = upload_request
238                    .dest_path_provider
239                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
240                let start = Instant::now();
241                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
242                    err = Some(e);
243                    break;
244                }
245                metrics.upload_puffin += start.elapsed();
246                upload_tracker.push_uploaded_file(puffin_path);
247            }
248        }
249
250        if let Some(err) = err {
251            // Cleans files on failure.
252            upload_tracker
253                .clean(&sst_info, &self.file_cache, remote_store)
254                .await;
255            return Err(err);
256        }
257
258        Ok(sst_info)
259    }
260
261    /// Removes a file from the cache by `index_key`.
262    pub(crate) async fn remove(&self, index_key: IndexKey) {
263        self.file_cache.remove(index_key).await
264    }
265
266    /// Downloads a file in `remote_path` from the remote object store to the local cache
267    /// (specified by `index_key`).
268    pub(crate) async fn download(
269        &self,
270        index_key: IndexKey,
271        remote_path: &str,
272        remote_store: &ObjectStore,
273        file_size: u64,
274    ) -> Result<()> {
275        if let Err(e) = self
276            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
277            .await
278        {
279            let filename = index_key.to_string();
280            TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
281                .await;
282
283            return Err(e);
284        }
285        Ok(())
286    }
287
288    async fn download_without_cleaning(
289        &self,
290        index_key: IndexKey,
291        remote_path: &str,
292        remote_store: &ObjectStore,
293        file_size: u64,
294    ) -> Result<()> {
295        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
296        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
297
298        let file_type = index_key.file_type;
299        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
300            .with_label_values(&[match file_type {
301                FileType::Parquet => "download_parquet",
302                FileType::Puffin => "download_puffin",
303            }])
304            .start_timer();
305
306        let reader = remote_store
307            .reader_with(remote_path)
308            .concurrent(DOWNLOAD_READER_CONCURRENCY)
309            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
310            .await
311            .context(error::OpenDalSnafu)?
312            .into_futures_async_read(0..file_size)
313            .await
314            .context(error::OpenDalSnafu)?;
315
316        let cache_path = self.file_cache.cache_file_path(index_key);
317        let mut writer = self
318            .file_cache
319            .local_store()
320            .writer(&cache_path)
321            .await
322            .context(error::OpenDalSnafu)?
323            .into_futures_async_write();
324
325        let region_id = index_key.region_id;
326        let file_id = index_key.file_id;
327        let bytes_written =
328            futures::io::copy(reader, &mut writer)
329                .await
330                .context(error::DownloadSnafu {
331                    region_id,
332                    file_id,
333                    file_type,
334                })?;
335        writer.close().await.context(error::DownloadSnafu {
336            region_id,
337            file_id,
338            file_type,
339        })?;
340
341        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
342
343        let elapsed = timer.stop_and_record();
344        debug!(
345            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
346            remote_path, cache_path, bytes_written, region_id, elapsed,
347        );
348
349        let index_value = IndexValue {
350            file_size: bytes_written as _,
351        };
352        self.file_cache.put(index_key, index_value).await;
353        Ok(())
354    }
355
356    /// Uploads a Parquet file or a Puffin file to the remote object store.
357    pub(crate) async fn upload(
358        &self,
359        index_key: IndexKey,
360        upload_path: &str,
361        remote_store: &ObjectStore,
362    ) -> Result<()> {
363        let region_id = index_key.region_id;
364        let file_id = index_key.file_id;
365        let file_type = index_key.file_type;
366        let cache_path = self.file_cache.cache_file_path(index_key);
367
368        let start = Instant::now();
369        let cached_value = self
370            .file_cache
371            .local_store()
372            .stat(&cache_path)
373            .await
374            .context(error::OpenDalSnafu)?;
375        let reader = self
376            .file_cache
377            .local_store()
378            .reader(&cache_path)
379            .await
380            .context(error::OpenDalSnafu)?
381            .into_futures_async_read(0..cached_value.content_length())
382            .await
383            .context(error::OpenDalSnafu)?;
384
385        let mut writer = remote_store
386            .writer_with(upload_path)
387            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
388            .concurrent(DEFAULT_WRITE_CONCURRENCY)
389            .await
390            .context(error::OpenDalSnafu)?
391            .into_futures_async_write();
392
393        let bytes_written =
394            futures::io::copy(reader, &mut writer)
395                .await
396                .context(error::UploadSnafu {
397                    region_id,
398                    file_id,
399                    file_type,
400                })?;
401
402        // Must close to upload all data.
403        writer.close().await.context(error::UploadSnafu {
404            region_id,
405            file_id,
406            file_type,
407        })?;
408
409        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
410
411        debug!(
412            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
413            region_id,
414            file_id,
415            upload_path,
416            start.elapsed(),
417        );
418
419        let index_value = IndexValue {
420            file_size: bytes_written as _,
421        };
422        // Register to file cache
423        self.file_cache.put(index_key, index_value).await;
424
425        Ok(())
426    }
427}
428
429/// Request to write and upload a SST.
430pub struct SstUploadRequest {
431    /// Destination path provider of which SST files in write cache should be uploaded to.
432    pub dest_path_provider: RegionFilePathFactory,
433    /// Remote object store to upload.
434    pub remote_store: ObjectStore,
435}
436
437/// A structs to track files to upload and clean them if upload failed.
438pub(crate) struct UploadTracker {
439    /// Id of the region to track.
440    region_id: RegionId,
441    /// Paths of files uploaded successfully.
442    files_uploaded: Vec<String>,
443}
444
445impl UploadTracker {
446    /// Creates a new instance of `UploadTracker` for a given region.
447    pub(crate) fn new(region_id: RegionId) -> Self {
448        Self {
449            region_id,
450            files_uploaded: Vec::new(),
451        }
452    }
453
454    /// Add a file path to the list of uploaded files.
455    pub(crate) fn push_uploaded_file(&mut self, path: String) {
456        self.files_uploaded.push(path);
457    }
458
459    /// Cleans uploaded files and files in the file cache at best effort.
460    pub(crate) async fn clean(
461        &self,
462        sst_info: &SstInfoArray,
463        file_cache: &FileCacheRef,
464        remote_store: &ObjectStore,
465    ) {
466        common_telemetry::info!(
467            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
468            self.region_id,
469            sst_info.len()
470        );
471
472        // Cleans files in the file cache first.
473        for sst in sst_info {
474            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
475            file_cache.remove(parquet_key).await;
476
477            if sst.index_metadata.file_size > 0 {
478                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
479                file_cache.remove(puffin_key).await;
480            }
481        }
482
483        // Cleans uploaded files.
484        for file_path in &self.files_uploaded {
485            if let Err(e) = remote_store.delete(file_path).await {
486                common_telemetry::error!(e; "Failed to delete file {}", file_path);
487            }
488        }
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use common_test_util::temp_dir::create_temp_dir;
495    use object_store::ATOMIC_WRITE_DIR;
496    use store_api::region_request::PathType;
497
498    use super::*;
499    use crate::access_layer::OperationType;
500    use crate::cache::test_util::new_fs_store;
501    use crate::cache::{CacheManager, CacheStrategy};
502    use crate::error::InvalidBatchSnafu;
503    use crate::read::Source;
504    use crate::region::options::IndexOptions;
505    use crate::sst::parquet::reader::ParquetReaderBuilder;
506    use crate::test_util::TestEnv;
507    use crate::test_util::sst_util::{
508        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
509        sst_region_metadata,
510    };
511
512    #[tokio::test]
513    async fn test_write_and_upload_sst() {
514        // TODO(QuenKar): maybe find a way to create some object server for testing,
515        // and now just use local file system to mock.
516        let mut env = TestEnv::new().await;
517        let mock_store = env.init_object_store_manager();
518        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
519
520        let local_dir = create_temp_dir("");
521        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
522
523        let write_cache = env
524            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
525            .await;
526
527        // Create Source
528        let metadata = Arc::new(sst_region_metadata());
529        let region_id = metadata.region_id;
530        let source = new_source(&[
531            new_batch_by_range(&["a", "d"], 0, 60),
532            new_batch_by_range(&["b", "f"], 0, 40),
533            new_batch_by_range(&["b", "h"], 100, 200),
534        ]);
535
536        let write_request = SstWriteRequest {
537            op_type: OperationType::Flush,
538            metadata,
539            source: either::Left(source),
540            storage: None,
541            max_sequence: None,
542            cache_manager: Default::default(),
543            index_options: IndexOptions::default(),
544            index_config: Default::default(),
545            inverted_index_config: Default::default(),
546            fulltext_index_config: Default::default(),
547            bloom_filter_index_config: Default::default(),
548        };
549
550        let upload_request = SstUploadRequest {
551            dest_path_provider: path_provider.clone(),
552            remote_store: mock_store.clone(),
553        };
554
555        let write_opts = WriteOptions {
556            row_group_size: 512,
557            ..Default::default()
558        };
559
560        // Write to cache and upload sst to mock remote store
561        let mut metrics = Metrics::new(WriteType::Flush);
562        let mut sst_infos = write_cache
563            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
564            .await
565            .unwrap();
566        let sst_info = sst_infos.remove(0);
567
568        let file_id = sst_info.file_id;
569        let sst_upload_path =
570            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
571        let index_upload_path =
572            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
573
574        // Check write cache contains the key
575        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
576        assert!(write_cache.file_cache.contains_key(&key));
577
578        // Check file data
579        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
580        let cache_data = local_store
581            .read(&write_cache.file_cache.cache_file_path(key))
582            .await
583            .unwrap();
584        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
585
586        // Check write cache contains the index key
587        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
588        assert!(write_cache.file_cache.contains_key(&index_key));
589
590        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
591        let cache_index_data = local_store
592            .read(&write_cache.file_cache.cache_file_path(index_key))
593            .await
594            .unwrap();
595        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
596
597        // Removes the file from the cache.
598        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
599        write_cache.remove(sst_index_key).await;
600        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
601        write_cache.remove(index_key).await;
602        assert!(!write_cache.file_cache.contains_key(&index_key));
603    }
604
605    #[tokio::test]
606    async fn test_read_metadata_from_write_cache() {
607        common_telemetry::init_default_ut_logging();
608        let mut env = TestEnv::new().await;
609        let data_home = env.data_home().display().to_string();
610        let mock_store = env.init_object_store_manager();
611
612        let local_dir = create_temp_dir("");
613        let local_path = local_dir.path().to_str().unwrap();
614        let local_store = new_fs_store(local_path);
615
616        // Create a cache manager using only write cache
617        let write_cache = env
618            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
619            .await;
620        let cache_manager = Arc::new(
621            CacheManager::builder()
622                .write_cache(Some(write_cache.clone()))
623                .build(),
624        );
625
626        // Create source
627        let metadata = Arc::new(sst_region_metadata());
628
629        let source = new_source(&[
630            new_batch_by_range(&["a", "d"], 0, 60),
631            new_batch_by_range(&["b", "f"], 0, 40),
632            new_batch_by_range(&["b", "h"], 100, 200),
633        ]);
634
635        // Write to local cache and upload sst to mock remote store
636        let write_request = SstWriteRequest {
637            op_type: OperationType::Flush,
638            metadata,
639            source: either::Left(source),
640            storage: None,
641            max_sequence: None,
642            cache_manager: cache_manager.clone(),
643            index_options: IndexOptions::default(),
644            index_config: Default::default(),
645            inverted_index_config: Default::default(),
646            fulltext_index_config: Default::default(),
647            bloom_filter_index_config: Default::default(),
648        };
649        let write_opts = WriteOptions {
650            row_group_size: 512,
651            ..Default::default()
652        };
653        let upload_request = SstUploadRequest {
654            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
655            remote_store: mock_store.clone(),
656        };
657
658        let mut metrics = Metrics::new(WriteType::Flush);
659        let mut sst_infos = write_cache
660            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
661            .await
662            .unwrap();
663        let sst_info = sst_infos.remove(0);
664        let write_parquet_metadata = sst_info.file_metadata.unwrap();
665
666        // Read metadata from write cache
667        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
668        let builder = ParquetReaderBuilder::new(
669            data_home,
670            PathType::Bare,
671            handle.clone(),
672            mock_store.clone(),
673        )
674        .cache(CacheStrategy::EnableAll(cache_manager.clone()));
675        let reader = builder.build().await.unwrap();
676
677        // Check parquet metadata
678        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
679    }
680
681    #[tokio::test]
682    async fn test_write_cache_clean_tmp_files() {
683        common_telemetry::init_default_ut_logging();
684        let mut env = TestEnv::new().await;
685        let data_home = env.data_home().display().to_string();
686        let mock_store = env.init_object_store_manager();
687
688        let write_cache_dir = create_temp_dir("");
689        let write_cache_path = write_cache_dir.path().to_str().unwrap();
690        let write_cache = env
691            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
692            .await;
693
694        // Create a cache manager using only write cache
695        let cache_manager = Arc::new(
696            CacheManager::builder()
697                .write_cache(Some(write_cache.clone()))
698                .build(),
699        );
700
701        // Create source
702        let metadata = Arc::new(sst_region_metadata());
703
704        // Creates a source that can return an error to abort the writer.
705        let source = Source::Iter(Box::new(
706            [
707                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
708                InvalidBatchSnafu {
709                    reason: "Abort the writer",
710                }
711                .fail(),
712            ]
713            .into_iter(),
714        ));
715
716        // Write to local cache and upload sst to mock remote store
717        let write_request = SstWriteRequest {
718            op_type: OperationType::Flush,
719            metadata,
720            source: either::Left(source),
721            storage: None,
722            max_sequence: None,
723            cache_manager: cache_manager.clone(),
724            index_options: IndexOptions::default(),
725            index_config: Default::default(),
726            inverted_index_config: Default::default(),
727            fulltext_index_config: Default::default(),
728            bloom_filter_index_config: Default::default(),
729        };
730        let write_opts = WriteOptions {
731            row_group_size: 512,
732            ..Default::default()
733        };
734        let upload_request = SstUploadRequest {
735            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
736            remote_store: mock_store.clone(),
737        };
738
739        let mut metrics = Metrics::new(WriteType::Flush);
740        write_cache
741            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
742            .await
743            .unwrap_err();
744        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
745        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
746        let mut has_files = false;
747        while let Some(entry) = entries.next_entry().await.unwrap() {
748            if entry.file_type().await.unwrap().is_dir() {
749                continue;
750            }
751            has_files = true;
752            common_telemetry::warn!(
753                "Found remaining temporary file in atomic dir: {}",
754                entry.path().display()
755            );
756        }
757
758        assert!(!has_files);
759    }
760}