mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28    new_fs_cache_store, FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray,
29    SstWriteRequest, TempFileCleaner, WriteCachePathProvider, WriteType,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34    UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59    /// Create the cache with a `local_store` to cache files and a
60    /// `object_store_manager` for all object stores.
61    pub async fn new(
62        local_store: ObjectStore,
63        cache_capacity: ReadableSize,
64        ttl: Option<Duration>,
65        puffin_manager_factory: PuffinManagerFactory,
66        intermediate_manager: IntermediateManager,
67    ) -> Result<Self> {
68        let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69        file_cache.recover(false).await;
70
71        Ok(Self {
72            file_cache,
73            puffin_manager_factory,
74            intermediate_manager,
75        })
76    }
77
78    /// Creates a write cache based on local fs.
79    pub async fn new_fs(
80        cache_dir: &str,
81        cache_capacity: ReadableSize,
82        ttl: Option<Duration>,
83        puffin_manager_factory: PuffinManagerFactory,
84        intermediate_manager: IntermediateManager,
85    ) -> Result<Self> {
86        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88        let local_store = new_fs_cache_store(cache_dir).await?;
89        Self::new(
90            local_store,
91            cache_capacity,
92            ttl,
93            puffin_manager_factory,
94            intermediate_manager,
95        )
96        .await
97    }
98
99    /// Returns the file cache of the write cache.
100    pub(crate) fn file_cache(&self) -> FileCacheRef {
101        self.file_cache.clone()
102    }
103
104    /// Writes SST to the cache and then uploads it to the remote object store.
105    pub(crate) async fn write_and_upload_sst(
106        &self,
107        write_request: SstWriteRequest,
108        upload_request: SstUploadRequest,
109        write_opts: &WriteOptions,
110        write_type: WriteType,
111    ) -> Result<(SstInfoArray, Metrics)> {
112        let region_id = write_request.metadata.region_id;
113
114        let store = self.file_cache.local_store();
115        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
116        let indexer = IndexerBuilderImpl {
117            op_type: write_request.op_type,
118            metadata: write_request.metadata.clone(),
119            row_group_size: write_opts.row_group_size,
120            puffin_manager: self
121                .puffin_manager_factory
122                .build(store.clone(), path_provider.clone()),
123            intermediate_manager: self.intermediate_manager.clone(),
124            index_options: write_request.index_options,
125            inverted_index_config: write_request.inverted_index_config,
126            fulltext_index_config: write_request.fulltext_index_config,
127            bloom_filter_index_config: write_request.bloom_filter_index_config,
128        };
129
130        let cleaner = TempFileCleaner::new(region_id, store.clone());
131        // Write to FileCache.
132        let mut writer = ParquetWriter::new_with_object_store(
133            store.clone(),
134            write_request.metadata,
135            indexer,
136            path_provider.clone(),
137            Metrics::new(write_type),
138        )
139        .await
140        .with_file_cleaner(cleaner);
141
142        let sst_info = writer
143            .write_all(write_request.source, write_request.max_sequence, write_opts)
144            .await?;
145        let mut metrics = writer.into_metrics();
146
147        // Upload sst file to remote object store.
148        if sst_info.is_empty() {
149            return Ok((sst_info, metrics));
150        }
151
152        let mut upload_tracker = UploadTracker::new(region_id);
153        let mut err = None;
154        let remote_store = &upload_request.remote_store;
155        for sst in &sst_info {
156            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
157            let parquet_path = upload_request
158                .dest_path_provider
159                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
160            let start = Instant::now();
161            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
162                err = Some(e);
163                break;
164            }
165            metrics.upload_parquet += start.elapsed();
166            upload_tracker.push_uploaded_file(parquet_path);
167
168            if sst.index_metadata.file_size > 0 {
169                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
170                let puffin_path = upload_request
171                    .dest_path_provider
172                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
173                let start = Instant::now();
174                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
175                    err = Some(e);
176                    break;
177                }
178                metrics.upload_puffin += start.elapsed();
179                upload_tracker.push_uploaded_file(puffin_path);
180            }
181        }
182
183        if let Some(err) = err {
184            // Cleans files on failure.
185            upload_tracker
186                .clean(&sst_info, &self.file_cache, remote_store)
187                .await;
188            return Err(err);
189        }
190
191        Ok((sst_info, metrics))
192    }
193
194    /// Removes a file from the cache by `index_key`.
195    pub(crate) async fn remove(&self, index_key: IndexKey) {
196        self.file_cache.remove(index_key).await
197    }
198
199    /// Downloads a file in `remote_path` from the remote object store to the local cache
200    /// (specified by `index_key`).
201    pub(crate) async fn download(
202        &self,
203        index_key: IndexKey,
204        remote_path: &str,
205        remote_store: &ObjectStore,
206        file_size: u64,
207    ) -> Result<()> {
208        if let Err(e) = self
209            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
210            .await
211        {
212            let filename = index_key.to_string();
213            TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
214                .await;
215
216            return Err(e);
217        }
218        Ok(())
219    }
220
221    async fn download_without_cleaning(
222        &self,
223        index_key: IndexKey,
224        remote_path: &str,
225        remote_store: &ObjectStore,
226        file_size: u64,
227    ) -> Result<()> {
228        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
229        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
230
231        let file_type = index_key.file_type;
232        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
233            .with_label_values(&[match file_type {
234                FileType::Parquet => "download_parquet",
235                FileType::Puffin => "download_puffin",
236            }])
237            .start_timer();
238
239        let reader = remote_store
240            .reader_with(remote_path)
241            .concurrent(DOWNLOAD_READER_CONCURRENCY)
242            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
243            .await
244            .context(error::OpenDalSnafu)?
245            .into_futures_async_read(0..file_size)
246            .await
247            .context(error::OpenDalSnafu)?;
248
249        let cache_path = self.file_cache.cache_file_path(index_key);
250        let mut writer = self
251            .file_cache
252            .local_store()
253            .writer(&cache_path)
254            .await
255            .context(error::OpenDalSnafu)?
256            .into_futures_async_write();
257
258        let region_id = index_key.region_id;
259        let file_id = index_key.file_id;
260        let bytes_written =
261            futures::io::copy(reader, &mut writer)
262                .await
263                .context(error::DownloadSnafu {
264                    region_id,
265                    file_id,
266                    file_type,
267                })?;
268        writer.close().await.context(error::DownloadSnafu {
269            region_id,
270            file_id,
271            file_type,
272        })?;
273
274        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
275
276        let elapsed = timer.stop_and_record();
277        debug!(
278            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
279            remote_path, cache_path, bytes_written, region_id, elapsed,
280        );
281
282        let index_value = IndexValue {
283            file_size: bytes_written as _,
284        };
285        self.file_cache.put(index_key, index_value).await;
286        Ok(())
287    }
288
289    /// Uploads a Parquet file or a Puffin file to the remote object store.
290    async fn upload(
291        &self,
292        index_key: IndexKey,
293        upload_path: &str,
294        remote_store: &ObjectStore,
295    ) -> Result<()> {
296        let region_id = index_key.region_id;
297        let file_id = index_key.file_id;
298        let file_type = index_key.file_type;
299        let cache_path = self.file_cache.cache_file_path(index_key);
300
301        let start = Instant::now();
302        let cached_value = self
303            .file_cache
304            .local_store()
305            .stat(&cache_path)
306            .await
307            .context(error::OpenDalSnafu)?;
308        let reader = self
309            .file_cache
310            .local_store()
311            .reader(&cache_path)
312            .await
313            .context(error::OpenDalSnafu)?
314            .into_futures_async_read(0..cached_value.content_length())
315            .await
316            .context(error::OpenDalSnafu)?;
317
318        let mut writer = remote_store
319            .writer_with(upload_path)
320            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
321            .concurrent(DEFAULT_WRITE_CONCURRENCY)
322            .await
323            .context(error::OpenDalSnafu)?
324            .into_futures_async_write();
325
326        let bytes_written =
327            futures::io::copy(reader, &mut writer)
328                .await
329                .context(error::UploadSnafu {
330                    region_id,
331                    file_id,
332                    file_type,
333                })?;
334
335        // Must close to upload all data.
336        writer.close().await.context(error::UploadSnafu {
337            region_id,
338            file_id,
339            file_type,
340        })?;
341
342        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
343
344        debug!(
345            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
346            region_id,
347            file_id,
348            upload_path,
349            start.elapsed(),
350        );
351
352        let index_value = IndexValue {
353            file_size: bytes_written as _,
354        };
355        // Register to file cache
356        self.file_cache.put(index_key, index_value).await;
357
358        Ok(())
359    }
360}
361
362/// Request to write and upload a SST.
363pub struct SstUploadRequest {
364    /// Destination path provider of which SST files in write cache should be uploaded to.
365    pub dest_path_provider: RegionFilePathFactory,
366    /// Remote object store to upload.
367    pub remote_store: ObjectStore,
368}
369
370/// A structs to track files to upload and clean them if upload failed.
371struct UploadTracker {
372    /// Id of the region to track.
373    region_id: RegionId,
374    /// Paths of files uploaded successfully.
375    files_uploaded: Vec<String>,
376}
377
378impl UploadTracker {
379    /// Creates a new instance of `UploadTracker` for a given region.
380    fn new(region_id: RegionId) -> Self {
381        Self {
382            region_id,
383            files_uploaded: Vec::new(),
384        }
385    }
386
387    /// Add a file path to the list of uploaded files.
388    fn push_uploaded_file(&mut self, path: String) {
389        self.files_uploaded.push(path);
390    }
391
392    /// Cleans uploaded files and files in the file cache at best effort.
393    async fn clean(
394        &self,
395        sst_info: &SstInfoArray,
396        file_cache: &FileCacheRef,
397        remote_store: &ObjectStore,
398    ) {
399        common_telemetry::info!(
400            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
401            self.region_id,
402            sst_info.len()
403        );
404
405        // Cleans files in the file cache first.
406        for sst in sst_info {
407            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
408            file_cache.remove(parquet_key).await;
409
410            if sst.index_metadata.file_size > 0 {
411                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
412                file_cache.remove(puffin_key).await;
413            }
414        }
415
416        // Cleans uploaded files.
417        for file_path in &self.files_uploaded {
418            if let Err(e) = remote_store.delete(file_path).await {
419                common_telemetry::error!(e; "Failed to delete file {}", file_path);
420            }
421        }
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use common_test_util::temp_dir::create_temp_dir;
428    use object_store::ATOMIC_WRITE_DIR;
429    use store_api::region_request::PathType;
430
431    use super::*;
432    use crate::access_layer::OperationType;
433    use crate::cache::test_util::new_fs_store;
434    use crate::cache::{CacheManager, CacheStrategy};
435    use crate::error::InvalidBatchSnafu;
436    use crate::read::Source;
437    use crate::region::options::IndexOptions;
438    use crate::sst::parquet::reader::ParquetReaderBuilder;
439    use crate::test_util::sst_util::{
440        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
441        sst_region_metadata,
442    };
443    use crate::test_util::TestEnv;
444
445    #[tokio::test]
446    async fn test_write_and_upload_sst() {
447        // TODO(QuenKar): maybe find a way to create some object server for testing,
448        // and now just use local file system to mock.
449        let mut env = TestEnv::new().await;
450        let mock_store = env.init_object_store_manager();
451        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
452
453        let local_dir = create_temp_dir("");
454        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
455
456        let write_cache = env
457            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
458            .await;
459
460        // Create Source
461        let metadata = Arc::new(sst_region_metadata());
462        let region_id = metadata.region_id;
463        let source = new_source(&[
464            new_batch_by_range(&["a", "d"], 0, 60),
465            new_batch_by_range(&["b", "f"], 0, 40),
466            new_batch_by_range(&["b", "h"], 100, 200),
467        ]);
468
469        let write_request = SstWriteRequest {
470            op_type: OperationType::Flush,
471            metadata,
472            source,
473            storage: None,
474            max_sequence: None,
475            cache_manager: Default::default(),
476            index_options: IndexOptions::default(),
477            inverted_index_config: Default::default(),
478            fulltext_index_config: Default::default(),
479            bloom_filter_index_config: Default::default(),
480        };
481
482        let upload_request = SstUploadRequest {
483            dest_path_provider: path_provider.clone(),
484            remote_store: mock_store.clone(),
485        };
486
487        let write_opts = WriteOptions {
488            row_group_size: 512,
489            ..Default::default()
490        };
491
492        // Write to cache and upload sst to mock remote store
493        let (mut sst_infos, _) = write_cache
494            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
495            .await
496            .unwrap();
497        let sst_info = sst_infos.remove(0);
498
499        let file_id = sst_info.file_id;
500        let sst_upload_path =
501            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
502        let index_upload_path =
503            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
504
505        // Check write cache contains the key
506        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
507        assert!(write_cache.file_cache.contains_key(&key));
508
509        // Check file data
510        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
511        let cache_data = local_store
512            .read(&write_cache.file_cache.cache_file_path(key))
513            .await
514            .unwrap();
515        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
516
517        // Check write cache contains the index key
518        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
519        assert!(write_cache.file_cache.contains_key(&index_key));
520
521        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
522        let cache_index_data = local_store
523            .read(&write_cache.file_cache.cache_file_path(index_key))
524            .await
525            .unwrap();
526        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
527
528        // Removes the file from the cache.
529        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
530        write_cache.remove(sst_index_key).await;
531        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
532        write_cache.remove(index_key).await;
533        assert!(!write_cache.file_cache.contains_key(&index_key));
534    }
535
536    #[tokio::test]
537    async fn test_read_metadata_from_write_cache() {
538        common_telemetry::init_default_ut_logging();
539        let mut env = TestEnv::new().await;
540        let data_home = env.data_home().display().to_string();
541        let mock_store = env.init_object_store_manager();
542
543        let local_dir = create_temp_dir("");
544        let local_path = local_dir.path().to_str().unwrap();
545        let local_store = new_fs_store(local_path);
546
547        // Create a cache manager using only write cache
548        let write_cache = env
549            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
550            .await;
551        let cache_manager = Arc::new(
552            CacheManager::builder()
553                .write_cache(Some(write_cache.clone()))
554                .build(),
555        );
556
557        // Create source
558        let metadata = Arc::new(sst_region_metadata());
559
560        let source = new_source(&[
561            new_batch_by_range(&["a", "d"], 0, 60),
562            new_batch_by_range(&["b", "f"], 0, 40),
563            new_batch_by_range(&["b", "h"], 100, 200),
564        ]);
565
566        // Write to local cache and upload sst to mock remote store
567        let write_request = SstWriteRequest {
568            op_type: OperationType::Flush,
569            metadata,
570            source,
571            storage: None,
572            max_sequence: None,
573            cache_manager: cache_manager.clone(),
574            index_options: IndexOptions::default(),
575            inverted_index_config: Default::default(),
576            fulltext_index_config: Default::default(),
577            bloom_filter_index_config: Default::default(),
578        };
579        let write_opts = WriteOptions {
580            row_group_size: 512,
581            ..Default::default()
582        };
583        let upload_request = SstUploadRequest {
584            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
585            remote_store: mock_store.clone(),
586        };
587
588        let (mut sst_infos, _) = write_cache
589            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
590            .await
591            .unwrap();
592        let sst_info = sst_infos.remove(0);
593        let write_parquet_metadata = sst_info.file_metadata.unwrap();
594
595        // Read metadata from write cache
596        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
597        let builder = ParquetReaderBuilder::new(
598            data_home,
599            PathType::Bare,
600            handle.clone(),
601            mock_store.clone(),
602        )
603        .cache(CacheStrategy::EnableAll(cache_manager.clone()));
604        let reader = builder.build().await.unwrap();
605
606        // Check parquet metadata
607        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
608    }
609
610    #[tokio::test]
611    async fn test_write_cache_clean_tmp_files() {
612        common_telemetry::init_default_ut_logging();
613        let mut env = TestEnv::new().await;
614        let data_home = env.data_home().display().to_string();
615        let mock_store = env.init_object_store_manager();
616
617        let write_cache_dir = create_temp_dir("");
618        let write_cache_path = write_cache_dir.path().to_str().unwrap();
619        let write_cache = env
620            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
621            .await;
622
623        // Create a cache manager using only write cache
624        let cache_manager = Arc::new(
625            CacheManager::builder()
626                .write_cache(Some(write_cache.clone()))
627                .build(),
628        );
629
630        // Create source
631        let metadata = Arc::new(sst_region_metadata());
632
633        // Creates a source that can return an error to abort the writer.
634        let source = Source::Iter(Box::new(
635            [
636                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
637                InvalidBatchSnafu {
638                    reason: "Abort the writer",
639                }
640                .fail(),
641            ]
642            .into_iter(),
643        ));
644
645        // Write to local cache and upload sst to mock remote store
646        let write_request = SstWriteRequest {
647            op_type: OperationType::Flush,
648            metadata,
649            source,
650            storage: None,
651            max_sequence: None,
652            cache_manager: cache_manager.clone(),
653            index_options: IndexOptions::default(),
654            inverted_index_config: Default::default(),
655            fulltext_index_config: Default::default(),
656            bloom_filter_index_config: Default::default(),
657        };
658        let write_opts = WriteOptions {
659            row_group_size: 512,
660            ..Default::default()
661        };
662        let upload_request = SstUploadRequest {
663            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
664            remote_store: mock_store.clone(),
665        };
666
667        write_cache
668            .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
669            .await
670            .unwrap_err();
671        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
672        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
673        let mut has_files = false;
674        while let Some(entry) = entries.next_entry().await.unwrap() {
675            if entry.file_type().await.unwrap().is_dir() {
676                continue;
677            }
678            has_files = true;
679            common_telemetry::warn!(
680                "Found remaining temporary file in atomic dir: {}",
681                entry.path().display()
682            );
683        }
684
685        assert!(!has_files);
686    }
687}