mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28    new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29    TempFileCleaner, WriteCachePathProvider,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34    FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
35    WRITE_CACHE_DOWNLOAD_ELAPSED,
36};
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59    /// Create the cache with a `local_store` to cache files and a
60    /// `object_store_manager` for all object stores.
61    pub async fn new(
62        local_store: ObjectStore,
63        cache_capacity: ReadableSize,
64        ttl: Option<Duration>,
65        puffin_manager_factory: PuffinManagerFactory,
66        intermediate_manager: IntermediateManager,
67    ) -> Result<Self> {
68        let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69        file_cache.recover(false).await;
70
71        Ok(Self {
72            file_cache,
73            puffin_manager_factory,
74            intermediate_manager,
75        })
76    }
77
78    /// Creates a write cache based on local fs.
79    pub async fn new_fs(
80        cache_dir: &str,
81        cache_capacity: ReadableSize,
82        ttl: Option<Duration>,
83        puffin_manager_factory: PuffinManagerFactory,
84        intermediate_manager: IntermediateManager,
85    ) -> Result<Self> {
86        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88        let local_store = new_fs_cache_store(cache_dir).await?;
89        Self::new(
90            local_store,
91            cache_capacity,
92            ttl,
93            puffin_manager_factory,
94            intermediate_manager,
95        )
96        .await
97    }
98
99    /// Returns the file cache of the write cache.
100    pub(crate) fn file_cache(&self) -> FileCacheRef {
101        self.file_cache.clone()
102    }
103
104    /// Writes SST to the cache and then uploads it to the remote object store.
105    pub(crate) async fn write_and_upload_sst(
106        &self,
107        write_request: SstWriteRequest,
108        upload_request: SstUploadRequest,
109        write_opts: &WriteOptions,
110    ) -> Result<SstInfoArray> {
111        let timer = FLUSH_ELAPSED
112            .with_label_values(&["write_sst"])
113            .start_timer();
114
115        let region_id = write_request.metadata.region_id;
116
117        let store = self.file_cache.local_store();
118        let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
119        let indexer = IndexerBuilderImpl {
120            op_type: write_request.op_type,
121            metadata: write_request.metadata.clone(),
122            row_group_size: write_opts.row_group_size,
123            puffin_manager: self
124                .puffin_manager_factory
125                .build(store.clone(), path_provider.clone()),
126            intermediate_manager: self.intermediate_manager.clone(),
127            index_options: write_request.index_options,
128            inverted_index_config: write_request.inverted_index_config,
129            fulltext_index_config: write_request.fulltext_index_config,
130            bloom_filter_index_config: write_request.bloom_filter_index_config,
131        };
132
133        let cleaner = TempFileCleaner::new(region_id, store.clone());
134        // Write to FileCache.
135        let mut writer = ParquetWriter::new_with_object_store(
136            store.clone(),
137            write_request.metadata,
138            indexer,
139            path_provider.clone(),
140        )
141        .await
142        .with_file_cleaner(cleaner);
143
144        let sst_info = writer
145            .write_all(write_request.source, write_request.max_sequence, write_opts)
146            .await?;
147
148        timer.stop_and_record();
149
150        // Upload sst file to remote object store.
151        if sst_info.is_empty() {
152            return Ok(sst_info);
153        }
154
155        let mut upload_tracker = UploadTracker::new(region_id);
156        let mut err = None;
157        let remote_store = &upload_request.remote_store;
158        for sst in &sst_info {
159            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
160            let parquet_path = upload_request
161                .dest_path_provider
162                .build_sst_file_path(sst.file_id);
163            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
164                err = Some(e);
165                break;
166            }
167            upload_tracker.push_uploaded_file(parquet_path);
168
169            if sst.index_metadata.file_size > 0 {
170                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
171                let puffin_path = upload_request
172                    .dest_path_provider
173                    .build_index_file_path(sst.file_id);
174                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
175                    err = Some(e);
176                    break;
177                }
178                upload_tracker.push_uploaded_file(puffin_path);
179            }
180        }
181
182        if let Some(err) = err {
183            // Cleans files on failure.
184            upload_tracker
185                .clean(&sst_info, &self.file_cache, remote_store)
186                .await;
187            return Err(err);
188        }
189
190        Ok(sst_info)
191    }
192
193    /// Removes a file from the cache by `index_key`.
194    pub(crate) async fn remove(&self, index_key: IndexKey) {
195        self.file_cache.remove(index_key).await
196    }
197
198    /// Downloads a file in `remote_path` from the remote object store to the local cache
199    /// (specified by `index_key`).
200    pub(crate) async fn download(
201        &self,
202        index_key: IndexKey,
203        remote_path: &str,
204        remote_store: &ObjectStore,
205        file_size: u64,
206    ) -> Result<()> {
207        if let Err(e) = self
208            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
209            .await
210        {
211            let filename = index_key.to_string();
212            TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
213                .await;
214
215            return Err(e);
216        }
217        Ok(())
218    }
219
220    async fn download_without_cleaning(
221        &self,
222        index_key: IndexKey,
223        remote_path: &str,
224        remote_store: &ObjectStore,
225        file_size: u64,
226    ) -> Result<()> {
227        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
228        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
229
230        let file_type = index_key.file_type;
231        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
232            .with_label_values(&[match file_type {
233                FileType::Parquet => "download_parquet",
234                FileType::Puffin => "download_puffin",
235            }])
236            .start_timer();
237
238        let reader = remote_store
239            .reader_with(remote_path)
240            .concurrent(DOWNLOAD_READER_CONCURRENCY)
241            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
242            .await
243            .context(error::OpenDalSnafu)?
244            .into_futures_async_read(0..file_size)
245            .await
246            .context(error::OpenDalSnafu)?;
247
248        let cache_path = self.file_cache.cache_file_path(index_key);
249        let mut writer = self
250            .file_cache
251            .local_store()
252            .writer(&cache_path)
253            .await
254            .context(error::OpenDalSnafu)?
255            .into_futures_async_write();
256
257        let region_id = index_key.region_id;
258        let file_id = index_key.file_id;
259        let bytes_written =
260            futures::io::copy(reader, &mut writer)
261                .await
262                .context(error::DownloadSnafu {
263                    region_id,
264                    file_id,
265                    file_type,
266                })?;
267        writer.close().await.context(error::DownloadSnafu {
268            region_id,
269            file_id,
270            file_type,
271        })?;
272
273        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
274
275        let elapsed = timer.stop_and_record();
276        debug!(
277            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
278            remote_path, cache_path, bytes_written, region_id, elapsed,
279        );
280
281        let index_value = IndexValue {
282            file_size: bytes_written as _,
283        };
284        self.file_cache.put(index_key, index_value).await;
285        Ok(())
286    }
287
288    /// Uploads a Parquet file or a Puffin file to the remote object store.
289    async fn upload(
290        &self,
291        index_key: IndexKey,
292        upload_path: &str,
293        remote_store: &ObjectStore,
294    ) -> Result<()> {
295        let region_id = index_key.region_id;
296        let file_id = index_key.file_id;
297        let file_type = index_key.file_type;
298        let cache_path = self.file_cache.cache_file_path(index_key);
299
300        let timer = FLUSH_ELAPSED
301            .with_label_values(&[match file_type {
302                FileType::Parquet => "upload_parquet",
303                FileType::Puffin => "upload_puffin",
304            }])
305            .start_timer();
306
307        let cached_value = self
308            .file_cache
309            .local_store()
310            .stat(&cache_path)
311            .await
312            .context(error::OpenDalSnafu)?;
313        let reader = self
314            .file_cache
315            .local_store()
316            .reader(&cache_path)
317            .await
318            .context(error::OpenDalSnafu)?
319            .into_futures_async_read(0..cached_value.content_length())
320            .await
321            .context(error::OpenDalSnafu)?;
322
323        let mut writer = remote_store
324            .writer_with(upload_path)
325            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
326            .concurrent(DEFAULT_WRITE_CONCURRENCY)
327            .await
328            .context(error::OpenDalSnafu)?
329            .into_futures_async_write();
330
331        let bytes_written =
332            futures::io::copy(reader, &mut writer)
333                .await
334                .context(error::UploadSnafu {
335                    region_id,
336                    file_id,
337                    file_type,
338                })?;
339
340        // Must close to upload all data.
341        writer.close().await.context(error::UploadSnafu {
342            region_id,
343            file_id,
344            file_type,
345        })?;
346
347        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
348
349        debug!(
350            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
351            region_id,
352            file_id,
353            upload_path,
354            timer.stop_and_record()
355        );
356
357        let index_value = IndexValue {
358            file_size: bytes_written as _,
359        };
360        // Register to file cache
361        self.file_cache.put(index_key, index_value).await;
362
363        Ok(())
364    }
365}
366
367/// Request to write and upload a SST.
368pub struct SstUploadRequest {
369    /// Destination path provider of which SST files in write cache should be uploaded to.
370    pub dest_path_provider: RegionFilePathFactory,
371    /// Remote object store to upload.
372    pub remote_store: ObjectStore,
373}
374
375/// A structs to track files to upload and clean them if upload failed.
376struct UploadTracker {
377    /// Id of the region to track.
378    region_id: RegionId,
379    /// Paths of files uploaded successfully.
380    files_uploaded: Vec<String>,
381}
382
383impl UploadTracker {
384    /// Creates a new instance of `UploadTracker` for a given region.
385    fn new(region_id: RegionId) -> Self {
386        Self {
387            region_id,
388            files_uploaded: Vec::new(),
389        }
390    }
391
392    /// Add a file path to the list of uploaded files.
393    fn push_uploaded_file(&mut self, path: String) {
394        self.files_uploaded.push(path);
395    }
396
397    /// Cleans uploaded files and files in the file cache at best effort.
398    async fn clean(
399        &self,
400        sst_info: &SstInfoArray,
401        file_cache: &FileCacheRef,
402        remote_store: &ObjectStore,
403    ) {
404        common_telemetry::info!(
405            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
406            self.region_id,
407            sst_info.len()
408        );
409
410        // Cleans files in the file cache first.
411        for sst in sst_info {
412            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
413            file_cache.remove(parquet_key).await;
414
415            if sst.index_metadata.file_size > 0 {
416                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
417                file_cache.remove(puffin_key).await;
418            }
419        }
420
421        // Cleans uploaded files.
422        for file_path in &self.files_uploaded {
423            if let Err(e) = remote_store.delete(file_path).await {
424                common_telemetry::error!(e; "Failed to delete file {}", file_path);
425            }
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use common_test_util::temp_dir::create_temp_dir;
433    use object_store::ATOMIC_WRITE_DIR;
434
435    use super::*;
436    use crate::access_layer::OperationType;
437    use crate::cache::test_util::new_fs_store;
438    use crate::cache::{CacheManager, CacheStrategy};
439    use crate::error::InvalidBatchSnafu;
440    use crate::read::Source;
441    use crate::region::options::IndexOptions;
442    use crate::sst::parquet::reader::ParquetReaderBuilder;
443    use crate::test_util::sst_util::{
444        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
445        sst_region_metadata,
446    };
447    use crate::test_util::TestEnv;
448
449    #[tokio::test]
450    async fn test_write_and_upload_sst() {
451        // TODO(QuenKar): maybe find a way to create some object server for testing,
452        // and now just use local file system to mock.
453        let mut env = TestEnv::new().await;
454        let mock_store = env.init_object_store_manager();
455        let path_provider = RegionFilePathFactory::new("test".to_string());
456
457        let local_dir = create_temp_dir("");
458        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
459
460        let write_cache = env
461            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
462            .await;
463
464        // Create Source
465        let metadata = Arc::new(sst_region_metadata());
466        let region_id = metadata.region_id;
467        let source = new_source(&[
468            new_batch_by_range(&["a", "d"], 0, 60),
469            new_batch_by_range(&["b", "f"], 0, 40),
470            new_batch_by_range(&["b", "h"], 100, 200),
471        ]);
472
473        let write_request = SstWriteRequest {
474            op_type: OperationType::Flush,
475            metadata,
476            source,
477            storage: None,
478            max_sequence: None,
479            cache_manager: Default::default(),
480            index_options: IndexOptions::default(),
481            inverted_index_config: Default::default(),
482            fulltext_index_config: Default::default(),
483            bloom_filter_index_config: Default::default(),
484        };
485
486        let upload_request = SstUploadRequest {
487            dest_path_provider: path_provider.clone(),
488            remote_store: mock_store.clone(),
489        };
490
491        let write_opts = WriteOptions {
492            row_group_size: 512,
493            ..Default::default()
494        };
495
496        // Write to cache and upload sst to mock remote store
497        let sst_info = write_cache
498            .write_and_upload_sst(write_request, upload_request, &write_opts)
499            .await
500            .unwrap()
501            .remove(0); //todo(hl): we assume it only creates one file.
502
503        let file_id = sst_info.file_id;
504        let sst_upload_path = path_provider.build_sst_file_path(file_id);
505        let index_upload_path = path_provider.build_index_file_path(file_id);
506
507        // Check write cache contains the key
508        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
509        assert!(write_cache.file_cache.contains_key(&key));
510
511        // Check file data
512        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
513        let cache_data = local_store
514            .read(&write_cache.file_cache.cache_file_path(key))
515            .await
516            .unwrap();
517        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
518
519        // Check write cache contains the index key
520        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
521        assert!(write_cache.file_cache.contains_key(&index_key));
522
523        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
524        let cache_index_data = local_store
525            .read(&write_cache.file_cache.cache_file_path(index_key))
526            .await
527            .unwrap();
528        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
529
530        // Removes the file from the cache.
531        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
532        write_cache.remove(sst_index_key).await;
533        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
534        write_cache.remove(index_key).await;
535        assert!(!write_cache.file_cache.contains_key(&index_key));
536    }
537
538    #[tokio::test]
539    async fn test_read_metadata_from_write_cache() {
540        common_telemetry::init_default_ut_logging();
541        let mut env = TestEnv::new().await;
542        let data_home = env.data_home().display().to_string();
543        let mock_store = env.init_object_store_manager();
544
545        let local_dir = create_temp_dir("");
546        let local_path = local_dir.path().to_str().unwrap();
547        let local_store = new_fs_store(local_path);
548
549        // Create a cache manager using only write cache
550        let write_cache = env
551            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
552            .await;
553        let cache_manager = Arc::new(
554            CacheManager::builder()
555                .write_cache(Some(write_cache.clone()))
556                .build(),
557        );
558
559        // Create source
560        let metadata = Arc::new(sst_region_metadata());
561
562        let source = new_source(&[
563            new_batch_by_range(&["a", "d"], 0, 60),
564            new_batch_by_range(&["b", "f"], 0, 40),
565            new_batch_by_range(&["b", "h"], 100, 200),
566        ]);
567
568        // Write to local cache and upload sst to mock remote store
569        let write_request = SstWriteRequest {
570            op_type: OperationType::Flush,
571            metadata,
572            source,
573            storage: None,
574            max_sequence: None,
575            cache_manager: cache_manager.clone(),
576            index_options: IndexOptions::default(),
577            inverted_index_config: Default::default(),
578            fulltext_index_config: Default::default(),
579            bloom_filter_index_config: Default::default(),
580        };
581        let write_opts = WriteOptions {
582            row_group_size: 512,
583            ..Default::default()
584        };
585        let upload_request = SstUploadRequest {
586            dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
587            remote_store: mock_store.clone(),
588        };
589
590        let sst_info = write_cache
591            .write_and_upload_sst(write_request, upload_request, &write_opts)
592            .await
593            .unwrap()
594            .remove(0);
595        let write_parquet_metadata = sst_info.file_metadata.unwrap();
596
597        // Read metadata from write cache
598        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
599        let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
600            .cache(CacheStrategy::EnableAll(cache_manager.clone()));
601        let reader = builder.build().await.unwrap();
602
603        // Check parquet metadata
604        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
605    }
606
607    #[tokio::test]
608    async fn test_write_cache_clean_tmp_files() {
609        common_telemetry::init_default_ut_logging();
610        let mut env = TestEnv::new().await;
611        let data_home = env.data_home().display().to_string();
612        let mock_store = env.init_object_store_manager();
613
614        let write_cache_dir = create_temp_dir("");
615        let write_cache_path = write_cache_dir.path().to_str().unwrap();
616        let write_cache = env
617            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
618            .await;
619
620        // Create a cache manager using only write cache
621        let cache_manager = Arc::new(
622            CacheManager::builder()
623                .write_cache(Some(write_cache.clone()))
624                .build(),
625        );
626
627        // Create source
628        let metadata = Arc::new(sst_region_metadata());
629
630        // Creates a source that can return an error to abort the writer.
631        let source = Source::Iter(Box::new(
632            [
633                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
634                InvalidBatchSnafu {
635                    reason: "Abort the writer",
636                }
637                .fail(),
638            ]
639            .into_iter(),
640        ));
641
642        // Write to local cache and upload sst to mock remote store
643        let write_request = SstWriteRequest {
644            op_type: OperationType::Flush,
645            metadata,
646            source,
647            storage: None,
648            max_sequence: None,
649            cache_manager: cache_manager.clone(),
650            index_options: IndexOptions::default(),
651            inverted_index_config: Default::default(),
652            fulltext_index_config: Default::default(),
653            bloom_filter_index_config: Default::default(),
654        };
655        let write_opts = WriteOptions {
656            row_group_size: 512,
657            ..Default::default()
658        };
659        let upload_request = SstUploadRequest {
660            dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
661            remote_store: mock_store.clone(),
662        };
663
664        write_cache
665            .write_and_upload_sst(write_request, upload_request, &write_opts)
666            .await
667            .unwrap_err();
668        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
669        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
670        let mut has_files = false;
671        while let Some(entry) = entries.next_entry().await.unwrap() {
672            if entry.file_type().await.unwrap().is_dir() {
673                continue;
674            }
675            has_files = true;
676            common_telemetry::warn!(
677                "Found remaining temporary file in atomic dir: {}",
678                entry.path().display()
679            );
680        }
681
682        assert!(!has_files);
683    }
684}