mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29    FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30    TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45/// A cache for uploading files to remote object stores.
46///
47/// It keeps files in local disk and then sends files to object stores.
48pub struct WriteCache {
49    /// Local file cache.
50    file_cache: FileCacheRef,
51    /// Puffin manager factory for index.
52    puffin_manager_factory: PuffinManagerFactory,
53    /// Intermediate manager for index.
54    intermediate_manager: IntermediateManager,
55    /// Sender for region load cache tasks.
56    task_sender: UnboundedSender<RegionLoadCacheTask>,
57    /// Optional cache for manifest files.
58    manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64    /// Create the cache with a `local_store` to cache files and a
65    /// `object_store_manager` for all object stores.
66    pub async fn new(
67        local_store: ObjectStore,
68        cache_capacity: ReadableSize,
69        ttl: Option<Duration>,
70        index_cache_percent: Option<u8>,
71        puffin_manager_factory: PuffinManagerFactory,
72        intermediate_manager: IntermediateManager,
73        manifest_cache: Option<ManifestCache>,
74    ) -> Result<Self> {
75        let (task_sender, task_receiver) = unbounded_channel();
76
77        let file_cache = Arc::new(FileCache::new(
78            local_store,
79            cache_capacity,
80            ttl,
81            index_cache_percent,
82        ));
83        file_cache.recover(false, Some(task_receiver)).await;
84
85        Ok(Self {
86            file_cache,
87            puffin_manager_factory,
88            intermediate_manager,
89            task_sender,
90            manifest_cache,
91        })
92    }
93
94    /// Creates a write cache based on local fs.
95    pub async fn new_fs(
96        cache_dir: &str,
97        cache_capacity: ReadableSize,
98        ttl: Option<Duration>,
99        index_cache_percent: Option<u8>,
100        puffin_manager_factory: PuffinManagerFactory,
101        intermediate_manager: IntermediateManager,
102        manifest_cache_capacity: ReadableSize,
103    ) -> Result<Self> {
104        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
105
106        let local_store = new_fs_cache_store(cache_dir).await?;
107
108        // Create manifest cache if capacity is non-zero
109        let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
110            Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl).await)
111        } else {
112            None
113        };
114
115        Self::new(
116            local_store,
117            cache_capacity,
118            ttl,
119            index_cache_percent,
120            puffin_manager_factory,
121            intermediate_manager,
122            manifest_cache,
123        )
124        .await
125    }
126
127    /// Returns the file cache of the write cache.
128    pub(crate) fn file_cache(&self) -> FileCacheRef {
129        self.file_cache.clone()
130    }
131
132    /// Returns the manifest cache if available.
133    pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
134        self.manifest_cache.clone()
135    }
136
137    /// Build the puffin manager
138    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
139        let store = self.file_cache.local_store();
140        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
141        self.puffin_manager_factory.build(store, path_provider)
142    }
143
144    /// Put encoded SST data to the cache and upload to the remote object store.
145    pub(crate) async fn put_and_upload_sst(
146        &self,
147        data: &bytes::Bytes,
148        region_id: RegionId,
149        sst_info: &SstInfo,
150        upload_request: SstUploadRequest,
151    ) -> Result<Metrics> {
152        let file_id = sst_info.file_id;
153        let mut metrics = Metrics::new(WriteType::Flush);
154
155        // Create index key for the SST file
156        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
157
158        // Write to cache first
159        let cache_start = Instant::now();
160        let cache_path = self.file_cache.cache_file_path(parquet_key);
161        let store = self.file_cache.local_store();
162        let cleaner = TempFileCleaner::new(region_id, store.clone());
163        let write_res = store
164            .write(&cache_path, data.clone())
165            .await
166            .context(crate::error::OpenDalSnafu);
167        if let Err(e) = write_res {
168            cleaner.clean_by_file_id(file_id).await;
169            return Err(e);
170        }
171
172        metrics.write_batch = cache_start.elapsed();
173
174        // Upload to remote store
175        let upload_start = Instant::now();
176        let region_file_id = RegionFileId::new(region_id, file_id);
177        let remote_path = upload_request
178            .dest_path_provider
179            .build_sst_file_path(region_file_id);
180
181        if let Err(e) = self
182            .upload(parquet_key, &remote_path, &upload_request.remote_store)
183            .await
184        {
185            // Clean up cache on failure
186            self.remove(parquet_key).await;
187            return Err(e);
188        }
189
190        metrics.upload_parquet = upload_start.elapsed();
191        Ok(metrics)
192    }
193
194    /// Returns the intermediate manager of the write cache.
195    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
196        &self.intermediate_manager
197    }
198
199    /// Writes SST to the cache and then uploads it to the remote object store.
200    pub(crate) async fn write_and_upload_sst(
201        &self,
202        write_request: SstWriteRequest,
203        upload_request: SstUploadRequest,
204        write_opts: &WriteOptions,
205        metrics: &mut Metrics,
206    ) -> Result<SstInfoArray> {
207        let region_id = write_request.metadata.region_id;
208
209        let store = self.file_cache.local_store();
210        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
211        let indexer = IndexerBuilderImpl {
212            build_type: write_request.op_type.into(),
213            metadata: write_request.metadata.clone(),
214            row_group_size: write_opts.row_group_size,
215            puffin_manager: self
216                .puffin_manager_factory
217                .build(store.clone(), path_provider.clone()),
218            write_cache_enabled: true,
219            intermediate_manager: self.intermediate_manager.clone(),
220            index_options: write_request.index_options,
221            inverted_index_config: write_request.inverted_index_config,
222            fulltext_index_config: write_request.fulltext_index_config,
223            bloom_filter_index_config: write_request.bloom_filter_index_config,
224        };
225
226        let cleaner = TempFileCleaner::new(region_id, store.clone());
227        // Write to FileCache.
228        let mut writer = ParquetWriter::new_with_object_store(
229            store.clone(),
230            write_request.metadata,
231            write_request.index_config,
232            indexer,
233            path_provider.clone(),
234            metrics,
235        )
236        .await
237        .with_file_cleaner(cleaner);
238
239        let sst_info = match write_request.source {
240            either::Left(source) => {
241                writer
242                    .write_all(source, write_request.max_sequence, write_opts)
243                    .await?
244            }
245            either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
246        };
247
248        // Upload sst file to remote object store.
249        if sst_info.is_empty() {
250            return Ok(sst_info);
251        }
252
253        let mut upload_tracker = UploadTracker::new(region_id);
254        let mut err = None;
255        let remote_store = &upload_request.remote_store;
256        for sst in &sst_info {
257            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
258            let parquet_path = upload_request
259                .dest_path_provider
260                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
261            let start = Instant::now();
262            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
263                err = Some(e);
264                break;
265            }
266            metrics.upload_parquet += start.elapsed();
267            upload_tracker.push_uploaded_file(parquet_path);
268
269            if sst.index_metadata.file_size > 0 {
270                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
271                let puffin_path = upload_request
272                    .dest_path_provider
273                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
274                let start = Instant::now();
275                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
276                    err = Some(e);
277                    break;
278                }
279                metrics.upload_puffin += start.elapsed();
280                upload_tracker.push_uploaded_file(puffin_path);
281            }
282        }
283
284        if let Some(err) = err {
285            // Cleans files on failure.
286            upload_tracker
287                .clean(&sst_info, &self.file_cache, remote_store)
288                .await;
289            return Err(err);
290        }
291
292        Ok(sst_info)
293    }
294
295    /// Removes a file from the cache by `index_key`.
296    pub(crate) async fn remove(&self, index_key: IndexKey) {
297        self.file_cache.remove(index_key).await
298    }
299
300    /// Downloads a file in `remote_path` from the remote object store to the local cache
301    /// (specified by `index_key`).
302    pub(crate) async fn download(
303        &self,
304        index_key: IndexKey,
305        remote_path: &str,
306        remote_store: &ObjectStore,
307        file_size: u64,
308    ) -> Result<()> {
309        self.file_cache
310            .download(index_key, remote_path, remote_store, file_size)
311            .await
312    }
313
314    /// Uploads a Parquet file or a Puffin file to the remote object store.
315    pub(crate) async fn upload(
316        &self,
317        index_key: IndexKey,
318        upload_path: &str,
319        remote_store: &ObjectStore,
320    ) -> Result<()> {
321        let region_id = index_key.region_id;
322        let file_id = index_key.file_id;
323        let file_type = index_key.file_type;
324        let cache_path = self.file_cache.cache_file_path(index_key);
325
326        let start = Instant::now();
327        let cached_value = self
328            .file_cache
329            .local_store()
330            .stat(&cache_path)
331            .await
332            .context(error::OpenDalSnafu)?;
333        let reader = self
334            .file_cache
335            .local_store()
336            .reader(&cache_path)
337            .await
338            .context(error::OpenDalSnafu)?
339            .into_futures_async_read(0..cached_value.content_length())
340            .await
341            .context(error::OpenDalSnafu)?;
342
343        let mut writer = remote_store
344            .writer_with(upload_path)
345            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
346            .concurrent(DEFAULT_WRITE_CONCURRENCY)
347            .await
348            .context(error::OpenDalSnafu)?
349            .into_futures_async_write();
350
351        let bytes_written =
352            futures::io::copy(reader, &mut writer)
353                .await
354                .context(error::UploadSnafu {
355                    region_id,
356                    file_id,
357                    file_type,
358                })?;
359
360        // Must close to upload all data.
361        writer.close().await.context(error::UploadSnafu {
362            region_id,
363            file_id,
364            file_type,
365        })?;
366
367        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
368
369        debug!(
370            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
371            region_id,
372            file_id,
373            upload_path,
374            start.elapsed(),
375        );
376
377        let index_value = IndexValue {
378            file_size: bytes_written as _,
379        };
380        // Register to file cache
381        self.file_cache.put(index_key, index_value).await;
382
383        Ok(())
384    }
385
386    /// Sends a region load cache task to the background processing queue.
387    ///
388    /// If the receiver has been dropped, the error is ignored.
389    pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
390        let _ = self.task_sender.send(task);
391    }
392}
393
394/// Request to write and upload a SST.
395pub struct SstUploadRequest {
396    /// Destination path provider of which SST files in write cache should be uploaded to.
397    pub dest_path_provider: RegionFilePathFactory,
398    /// Remote object store to upload.
399    pub remote_store: ObjectStore,
400}
401
402/// A structs to track files to upload and clean them if upload failed.
403pub(crate) struct UploadTracker {
404    /// Id of the region to track.
405    region_id: RegionId,
406    /// Paths of files uploaded successfully.
407    files_uploaded: Vec<String>,
408}
409
410impl UploadTracker {
411    /// Creates a new instance of `UploadTracker` for a given region.
412    pub(crate) fn new(region_id: RegionId) -> Self {
413        Self {
414            region_id,
415            files_uploaded: Vec::new(),
416        }
417    }
418
419    /// Add a file path to the list of uploaded files.
420    pub(crate) fn push_uploaded_file(&mut self, path: String) {
421        self.files_uploaded.push(path);
422    }
423
424    /// Cleans uploaded files and files in the file cache at best effort.
425    pub(crate) async fn clean(
426        &self,
427        sst_info: &SstInfoArray,
428        file_cache: &FileCacheRef,
429        remote_store: &ObjectStore,
430    ) {
431        common_telemetry::info!(
432            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
433            self.region_id,
434            sst_info.len()
435        );
436
437        // Cleans files in the file cache first.
438        for sst in sst_info {
439            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
440            file_cache.remove(parquet_key).await;
441
442            if sst.index_metadata.file_size > 0 {
443                let puffin_key = IndexKey::new(
444                    self.region_id,
445                    sst.file_id,
446                    FileType::Puffin(sst.index_metadata.version),
447                );
448                file_cache.remove(puffin_key).await;
449            }
450        }
451
452        // Cleans uploaded files.
453        for file_path in &self.files_uploaded {
454            if let Err(e) = remote_store.delete(file_path).await {
455                common_telemetry::error!(e; "Failed to delete file {}", file_path);
456            }
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use common_test_util::temp_dir::create_temp_dir;
464    use object_store::ATOMIC_WRITE_DIR;
465    use store_api::region_request::PathType;
466
467    use super::*;
468    use crate::access_layer::OperationType;
469    use crate::cache::test_util::new_fs_store;
470    use crate::cache::{CacheManager, CacheStrategy};
471    use crate::error::InvalidBatchSnafu;
472    use crate::read::Source;
473    use crate::region::options::IndexOptions;
474    use crate::sst::parquet::reader::ParquetReaderBuilder;
475    use crate::test_util::TestEnv;
476    use crate::test_util::sst_util::{
477        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
478        sst_region_metadata,
479    };
480
481    #[tokio::test]
482    async fn test_write_and_upload_sst() {
483        // TODO(QuenKar): maybe find a way to create some object server for testing,
484        // and now just use local file system to mock.
485        let mut env = TestEnv::new().await;
486        let mock_store = env.init_object_store_manager();
487        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
488
489        let local_dir = create_temp_dir("");
490        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
491
492        let write_cache = env
493            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
494            .await;
495
496        // Create Source
497        let metadata = Arc::new(sst_region_metadata());
498        let region_id = metadata.region_id;
499        let source = new_source(&[
500            new_batch_by_range(&["a", "d"], 0, 60),
501            new_batch_by_range(&["b", "f"], 0, 40),
502            new_batch_by_range(&["b", "h"], 100, 200),
503        ]);
504
505        let write_request = SstWriteRequest {
506            op_type: OperationType::Flush,
507            metadata,
508            source: either::Left(source),
509            storage: None,
510            max_sequence: None,
511            cache_manager: Default::default(),
512            index_options: IndexOptions::default(),
513            index_config: Default::default(),
514            inverted_index_config: Default::default(),
515            fulltext_index_config: Default::default(),
516            bloom_filter_index_config: Default::default(),
517        };
518
519        let upload_request = SstUploadRequest {
520            dest_path_provider: path_provider.clone(),
521            remote_store: mock_store.clone(),
522        };
523
524        let write_opts = WriteOptions {
525            row_group_size: 512,
526            ..Default::default()
527        };
528
529        // Write to cache and upload sst to mock remote store
530        let mut metrics = Metrics::new(WriteType::Flush);
531        let mut sst_infos = write_cache
532            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
533            .await
534            .unwrap();
535        let sst_info = sst_infos.remove(0);
536
537        let file_id = sst_info.file_id;
538        let sst_upload_path =
539            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
540        let index_upload_path =
541            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
542
543        // Check write cache contains the key
544        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
545        assert!(write_cache.file_cache.contains_key(&key));
546
547        // Check file data
548        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
549        let cache_data = local_store
550            .read(&write_cache.file_cache.cache_file_path(key))
551            .await
552            .unwrap();
553        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
554
555        // Check write cache contains the index key
556        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
557        assert!(write_cache.file_cache.contains_key(&index_key));
558
559        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
560        let cache_index_data = local_store
561            .read(&write_cache.file_cache.cache_file_path(index_key))
562            .await
563            .unwrap();
564        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
565
566        // Removes the file from the cache.
567        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
568        write_cache.remove(sst_index_key).await;
569        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
570        write_cache.remove(index_key).await;
571        assert!(!write_cache.file_cache.contains_key(&index_key));
572    }
573
574    #[tokio::test]
575    async fn test_read_metadata_from_write_cache() {
576        common_telemetry::init_default_ut_logging();
577        let mut env = TestEnv::new().await;
578        let data_home = env.data_home().display().to_string();
579        let mock_store = env.init_object_store_manager();
580
581        let local_dir = create_temp_dir("");
582        let local_path = local_dir.path().to_str().unwrap();
583        let local_store = new_fs_store(local_path);
584
585        // Create a cache manager using only write cache
586        let write_cache = env
587            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
588            .await;
589        let cache_manager = Arc::new(
590            CacheManager::builder()
591                .write_cache(Some(write_cache.clone()))
592                .build(),
593        );
594
595        // Create source
596        let metadata = Arc::new(sst_region_metadata());
597
598        let source = new_source(&[
599            new_batch_by_range(&["a", "d"], 0, 60),
600            new_batch_by_range(&["b", "f"], 0, 40),
601            new_batch_by_range(&["b", "h"], 100, 200),
602        ]);
603
604        // Write to local cache and upload sst to mock remote store
605        let write_request = SstWriteRequest {
606            op_type: OperationType::Flush,
607            metadata,
608            source: either::Left(source),
609            storage: None,
610            max_sequence: None,
611            cache_manager: cache_manager.clone(),
612            index_options: IndexOptions::default(),
613            index_config: Default::default(),
614            inverted_index_config: Default::default(),
615            fulltext_index_config: Default::default(),
616            bloom_filter_index_config: Default::default(),
617        };
618        let write_opts = WriteOptions {
619            row_group_size: 512,
620            ..Default::default()
621        };
622        let upload_request = SstUploadRequest {
623            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
624            remote_store: mock_store.clone(),
625        };
626
627        let mut metrics = Metrics::new(WriteType::Flush);
628        let mut sst_infos = write_cache
629            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
630            .await
631            .unwrap();
632        let sst_info = sst_infos.remove(0);
633        let write_parquet_metadata = sst_info.file_metadata.unwrap();
634
635        // Read metadata from write cache
636        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
637        let builder = ParquetReaderBuilder::new(
638            data_home,
639            PathType::Bare,
640            handle.clone(),
641            mock_store.clone(),
642        )
643        .cache(CacheStrategy::EnableAll(cache_manager.clone()));
644        let reader = builder.build().await.unwrap();
645
646        // Check parquet metadata
647        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
648    }
649
650    #[tokio::test]
651    async fn test_write_cache_clean_tmp_files() {
652        common_telemetry::init_default_ut_logging();
653        let mut env = TestEnv::new().await;
654        let data_home = env.data_home().display().to_string();
655        let mock_store = env.init_object_store_manager();
656
657        let write_cache_dir = create_temp_dir("");
658        let write_cache_path = write_cache_dir.path().to_str().unwrap();
659        let write_cache = env
660            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
661            .await;
662
663        // Create a cache manager using only write cache
664        let cache_manager = Arc::new(
665            CacheManager::builder()
666                .write_cache(Some(write_cache.clone()))
667                .build(),
668        );
669
670        // Create source
671        let metadata = Arc::new(sst_region_metadata());
672
673        // Creates a source that can return an error to abort the writer.
674        let source = Source::Iter(Box::new(
675            [
676                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
677                InvalidBatchSnafu {
678                    reason: "Abort the writer",
679                }
680                .fail(),
681            ]
682            .into_iter(),
683        ));
684
685        // Write to local cache and upload sst to mock remote store
686        let write_request = SstWriteRequest {
687            op_type: OperationType::Flush,
688            metadata,
689            source: either::Left(source),
690            storage: None,
691            max_sequence: None,
692            cache_manager: cache_manager.clone(),
693            index_options: IndexOptions::default(),
694            index_config: Default::default(),
695            inverted_index_config: Default::default(),
696            fulltext_index_config: Default::default(),
697            bloom_filter_index_config: Default::default(),
698        };
699        let write_opts = WriteOptions {
700            row_group_size: 512,
701            ..Default::default()
702        };
703        let upload_request = SstUploadRequest {
704            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
705            remote_store: mock_store.clone(),
706        };
707
708        let mut metrics = Metrics::new(WriteType::Flush);
709        write_cache
710            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
711            .await
712            .unwrap_err();
713        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
714        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
715        let mut has_files = false;
716        while let Some(entry) = entries.next_entry().await.unwrap() {
717            if entry.file_type().await.unwrap().is_dir() {
718                continue;
719            }
720            has_files = true;
721            common_telemetry::warn!(
722                "Found remaining temporary file in atomic dir: {}",
723                entry.path().display()
724            );
725        }
726
727        assert!(!has_files);
728    }
729}