mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29    FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30    TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::error::{self, Result};
34use crate::metrics::UPLOAD_BYTES_TOTAL;
35use crate::region::opener::RegionLoadCacheTask;
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54    /// Sender for region load cache tasks.
55    task_sender: UnboundedSender<RegionLoadCacheTask>,
56}
57
58pub type WriteCacheRef = Arc<WriteCache>;
59
60impl WriteCache {
61    /// Create the cache with a `local_store` to cache files and a
62    /// `object_store_manager` for all object stores.
63    pub async fn new(
64        local_store: ObjectStore,
65        cache_capacity: ReadableSize,
66        ttl: Option<Duration>,
67        index_cache_percent: Option<u8>,
68        puffin_manager_factory: PuffinManagerFactory,
69        intermediate_manager: IntermediateManager,
70    ) -> Result<Self> {
71        let (task_sender, task_receiver) = unbounded_channel();
72
73        let file_cache = Arc::new(FileCache::new(
74            local_store,
75            cache_capacity,
76            ttl,
77            index_cache_percent,
78        ));
79        file_cache.recover(false, Some(task_receiver)).await;
80
81        Ok(Self {
82            file_cache,
83            puffin_manager_factory,
84            intermediate_manager,
85            task_sender,
86        })
87    }
88
89    /// Creates a write cache based on local fs.
90    pub async fn new_fs(
91        cache_dir: &str,
92        cache_capacity: ReadableSize,
93        ttl: Option<Duration>,
94        index_cache_percent: Option<u8>,
95        puffin_manager_factory: PuffinManagerFactory,
96        intermediate_manager: IntermediateManager,
97    ) -> Result<Self> {
98        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
99
100        let local_store = new_fs_cache_store(cache_dir).await?;
101        Self::new(
102            local_store,
103            cache_capacity,
104            ttl,
105            index_cache_percent,
106            puffin_manager_factory,
107            intermediate_manager,
108        )
109        .await
110    }
111
112    /// Returns the file cache of the write cache.
113    pub(crate) fn file_cache(&self) -> FileCacheRef {
114        self.file_cache.clone()
115    }
116
117    /// Build the puffin manager
118    pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
119        let store = self.file_cache.local_store();
120        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
121        self.puffin_manager_factory.build(store, path_provider)
122    }
123
124    /// Put encoded SST data to the cache and upload to the remote object store.
125    pub(crate) async fn put_and_upload_sst(
126        &self,
127        data: &bytes::Bytes,
128        region_id: RegionId,
129        sst_info: &SstInfo,
130        upload_request: SstUploadRequest,
131    ) -> Result<Metrics> {
132        let file_id = sst_info.file_id;
133        let mut metrics = Metrics::new(WriteType::Flush);
134
135        // Create index key for the SST file
136        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
137
138        // Write to cache first
139        let cache_start = Instant::now();
140        let cache_path = self.file_cache.cache_file_path(parquet_key);
141        let store = self.file_cache.local_store();
142        let cleaner = TempFileCleaner::new(region_id, store.clone());
143        let write_res = store
144            .write(&cache_path, data.clone())
145            .await
146            .context(crate::error::OpenDalSnafu);
147        if let Err(e) = write_res {
148            cleaner.clean_by_file_id(file_id).await;
149            return Err(e);
150        }
151
152        metrics.write_batch = cache_start.elapsed();
153
154        // Upload to remote store
155        let upload_start = Instant::now();
156        let region_file_id = RegionFileId::new(region_id, file_id);
157        let remote_path = upload_request
158            .dest_path_provider
159            .build_sst_file_path(region_file_id);
160
161        if let Err(e) = self
162            .upload(parquet_key, &remote_path, &upload_request.remote_store)
163            .await
164        {
165            // Clean up cache on failure
166            self.remove(parquet_key).await;
167            return Err(e);
168        }
169
170        metrics.upload_parquet = upload_start.elapsed();
171        Ok(metrics)
172    }
173
174    /// Returns the intermediate manager of the write cache.
175    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
176        &self.intermediate_manager
177    }
178
179    /// Writes SST to the cache and then uploads it to the remote object store.
180    pub(crate) async fn write_and_upload_sst(
181        &self,
182        write_request: SstWriteRequest,
183        upload_request: SstUploadRequest,
184        write_opts: &WriteOptions,
185        metrics: &mut Metrics,
186    ) -> Result<SstInfoArray> {
187        let region_id = write_request.metadata.region_id;
188
189        let store = self.file_cache.local_store();
190        let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
191        let indexer = IndexerBuilderImpl {
192            build_type: write_request.op_type.into(),
193            metadata: write_request.metadata.clone(),
194            row_group_size: write_opts.row_group_size,
195            puffin_manager: self
196                .puffin_manager_factory
197                .build(store.clone(), path_provider.clone()),
198            intermediate_manager: self.intermediate_manager.clone(),
199            index_options: write_request.index_options,
200            inverted_index_config: write_request.inverted_index_config,
201            fulltext_index_config: write_request.fulltext_index_config,
202            bloom_filter_index_config: write_request.bloom_filter_index_config,
203        };
204
205        let cleaner = TempFileCleaner::new(region_id, store.clone());
206        // Write to FileCache.
207        let mut writer = ParquetWriter::new_with_object_store(
208            store.clone(),
209            write_request.metadata,
210            write_request.index_config,
211            indexer,
212            path_provider.clone(),
213            metrics,
214        )
215        .await
216        .with_file_cleaner(cleaner);
217
218        let sst_info = match write_request.source {
219            either::Left(source) => {
220                writer
221                    .write_all(source, write_request.max_sequence, write_opts)
222                    .await?
223            }
224            either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
225        };
226
227        // Upload sst file to remote object store.
228        if sst_info.is_empty() {
229            return Ok(sst_info);
230        }
231
232        let mut upload_tracker = UploadTracker::new(region_id);
233        let mut err = None;
234        let remote_store = &upload_request.remote_store;
235        for sst in &sst_info {
236            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
237            let parquet_path = upload_request
238                .dest_path_provider
239                .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
240            let start = Instant::now();
241            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
242                err = Some(e);
243                break;
244            }
245            metrics.upload_parquet += start.elapsed();
246            upload_tracker.push_uploaded_file(parquet_path);
247
248            if sst.index_metadata.file_size > 0 {
249                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
250                let puffin_path = upload_request
251                    .dest_path_provider
252                    .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
253                let start = Instant::now();
254                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
255                    err = Some(e);
256                    break;
257                }
258                metrics.upload_puffin += start.elapsed();
259                upload_tracker.push_uploaded_file(puffin_path);
260            }
261        }
262
263        if let Some(err) = err {
264            // Cleans files on failure.
265            upload_tracker
266                .clean(&sst_info, &self.file_cache, remote_store)
267                .await;
268            return Err(err);
269        }
270
271        Ok(sst_info)
272    }
273
274    /// Removes a file from the cache by `index_key`.
275    pub(crate) async fn remove(&self, index_key: IndexKey) {
276        self.file_cache.remove(index_key).await
277    }
278
279    /// Downloads a file in `remote_path` from the remote object store to the local cache
280    /// (specified by `index_key`).
281    pub(crate) async fn download(
282        &self,
283        index_key: IndexKey,
284        remote_path: &str,
285        remote_store: &ObjectStore,
286        file_size: u64,
287    ) -> Result<()> {
288        self.file_cache
289            .download(index_key, remote_path, remote_store, file_size)
290            .await
291    }
292
293    /// Uploads a Parquet file or a Puffin file to the remote object store.
294    pub(crate) async fn upload(
295        &self,
296        index_key: IndexKey,
297        upload_path: &str,
298        remote_store: &ObjectStore,
299    ) -> Result<()> {
300        let region_id = index_key.region_id;
301        let file_id = index_key.file_id;
302        let file_type = index_key.file_type;
303        let cache_path = self.file_cache.cache_file_path(index_key);
304
305        let start = Instant::now();
306        let cached_value = self
307            .file_cache
308            .local_store()
309            .stat(&cache_path)
310            .await
311            .context(error::OpenDalSnafu)?;
312        let reader = self
313            .file_cache
314            .local_store()
315            .reader(&cache_path)
316            .await
317            .context(error::OpenDalSnafu)?
318            .into_futures_async_read(0..cached_value.content_length())
319            .await
320            .context(error::OpenDalSnafu)?;
321
322        let mut writer = remote_store
323            .writer_with(upload_path)
324            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
325            .concurrent(DEFAULT_WRITE_CONCURRENCY)
326            .await
327            .context(error::OpenDalSnafu)?
328            .into_futures_async_write();
329
330        let bytes_written =
331            futures::io::copy(reader, &mut writer)
332                .await
333                .context(error::UploadSnafu {
334                    region_id,
335                    file_id,
336                    file_type,
337                })?;
338
339        // Must close to upload all data.
340        writer.close().await.context(error::UploadSnafu {
341            region_id,
342            file_id,
343            file_type,
344        })?;
345
346        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
347
348        debug!(
349            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
350            region_id,
351            file_id,
352            upload_path,
353            start.elapsed(),
354        );
355
356        let index_value = IndexValue {
357            file_size: bytes_written as _,
358        };
359        // Register to file cache
360        self.file_cache.put(index_key, index_value).await;
361
362        Ok(())
363    }
364
365    /// Sends a region load cache task to the background processing queue.
366    ///
367    /// If the receiver has been dropped, the error is ignored.
368    pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
369        let _ = self.task_sender.send(task);
370    }
371}
372
373/// Request to write and upload a SST.
374pub struct SstUploadRequest {
375    /// Destination path provider of which SST files in write cache should be uploaded to.
376    pub dest_path_provider: RegionFilePathFactory,
377    /// Remote object store to upload.
378    pub remote_store: ObjectStore,
379}
380
381/// A structs to track files to upload and clean them if upload failed.
382pub(crate) struct UploadTracker {
383    /// Id of the region to track.
384    region_id: RegionId,
385    /// Paths of files uploaded successfully.
386    files_uploaded: Vec<String>,
387}
388
389impl UploadTracker {
390    /// Creates a new instance of `UploadTracker` for a given region.
391    pub(crate) fn new(region_id: RegionId) -> Self {
392        Self {
393            region_id,
394            files_uploaded: Vec::new(),
395        }
396    }
397
398    /// Add a file path to the list of uploaded files.
399    pub(crate) fn push_uploaded_file(&mut self, path: String) {
400        self.files_uploaded.push(path);
401    }
402
403    /// Cleans uploaded files and files in the file cache at best effort.
404    pub(crate) async fn clean(
405        &self,
406        sst_info: &SstInfoArray,
407        file_cache: &FileCacheRef,
408        remote_store: &ObjectStore,
409    ) {
410        common_telemetry::info!(
411            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
412            self.region_id,
413            sst_info.len()
414        );
415
416        // Cleans files in the file cache first.
417        for sst in sst_info {
418            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
419            file_cache.remove(parquet_key).await;
420
421            if sst.index_metadata.file_size > 0 {
422                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
423                file_cache.remove(puffin_key).await;
424            }
425        }
426
427        // Cleans uploaded files.
428        for file_path in &self.files_uploaded {
429            if let Err(e) = remote_store.delete(file_path).await {
430                common_telemetry::error!(e; "Failed to delete file {}", file_path);
431            }
432        }
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use common_test_util::temp_dir::create_temp_dir;
439    use object_store::ATOMIC_WRITE_DIR;
440    use store_api::region_request::PathType;
441
442    use super::*;
443    use crate::access_layer::OperationType;
444    use crate::cache::test_util::new_fs_store;
445    use crate::cache::{CacheManager, CacheStrategy};
446    use crate::error::InvalidBatchSnafu;
447    use crate::read::Source;
448    use crate::region::options::IndexOptions;
449    use crate::sst::parquet::reader::ParquetReaderBuilder;
450    use crate::test_util::TestEnv;
451    use crate::test_util::sst_util::{
452        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
453        sst_region_metadata,
454    };
455
456    #[tokio::test]
457    async fn test_write_and_upload_sst() {
458        // TODO(QuenKar): maybe find a way to create some object server for testing,
459        // and now just use local file system to mock.
460        let mut env = TestEnv::new().await;
461        let mock_store = env.init_object_store_manager();
462        let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
463
464        let local_dir = create_temp_dir("");
465        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
466
467        let write_cache = env
468            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
469            .await;
470
471        // Create Source
472        let metadata = Arc::new(sst_region_metadata());
473        let region_id = metadata.region_id;
474        let source = new_source(&[
475            new_batch_by_range(&["a", "d"], 0, 60),
476            new_batch_by_range(&["b", "f"], 0, 40),
477            new_batch_by_range(&["b", "h"], 100, 200),
478        ]);
479
480        let write_request = SstWriteRequest {
481            op_type: OperationType::Flush,
482            metadata,
483            source: either::Left(source),
484            storage: None,
485            max_sequence: None,
486            cache_manager: Default::default(),
487            index_options: IndexOptions::default(),
488            index_config: Default::default(),
489            inverted_index_config: Default::default(),
490            fulltext_index_config: Default::default(),
491            bloom_filter_index_config: Default::default(),
492        };
493
494        let upload_request = SstUploadRequest {
495            dest_path_provider: path_provider.clone(),
496            remote_store: mock_store.clone(),
497        };
498
499        let write_opts = WriteOptions {
500            row_group_size: 512,
501            ..Default::default()
502        };
503
504        // Write to cache and upload sst to mock remote store
505        let mut metrics = Metrics::new(WriteType::Flush);
506        let mut sst_infos = write_cache
507            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
508            .await
509            .unwrap();
510        let sst_info = sst_infos.remove(0);
511
512        let file_id = sst_info.file_id;
513        let sst_upload_path =
514            path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
515        let index_upload_path =
516            path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
517
518        // Check write cache contains the key
519        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
520        assert!(write_cache.file_cache.contains_key(&key));
521
522        // Check file data
523        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
524        let cache_data = local_store
525            .read(&write_cache.file_cache.cache_file_path(key))
526            .await
527            .unwrap();
528        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
529
530        // Check write cache contains the index key
531        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
532        assert!(write_cache.file_cache.contains_key(&index_key));
533
534        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
535        let cache_index_data = local_store
536            .read(&write_cache.file_cache.cache_file_path(index_key))
537            .await
538            .unwrap();
539        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
540
541        // Removes the file from the cache.
542        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
543        write_cache.remove(sst_index_key).await;
544        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
545        write_cache.remove(index_key).await;
546        assert!(!write_cache.file_cache.contains_key(&index_key));
547    }
548
549    #[tokio::test]
550    async fn test_read_metadata_from_write_cache() {
551        common_telemetry::init_default_ut_logging();
552        let mut env = TestEnv::new().await;
553        let data_home = env.data_home().display().to_string();
554        let mock_store = env.init_object_store_manager();
555
556        let local_dir = create_temp_dir("");
557        let local_path = local_dir.path().to_str().unwrap();
558        let local_store = new_fs_store(local_path);
559
560        // Create a cache manager using only write cache
561        let write_cache = env
562            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
563            .await;
564        let cache_manager = Arc::new(
565            CacheManager::builder()
566                .write_cache(Some(write_cache.clone()))
567                .build(),
568        );
569
570        // Create source
571        let metadata = Arc::new(sst_region_metadata());
572
573        let source = new_source(&[
574            new_batch_by_range(&["a", "d"], 0, 60),
575            new_batch_by_range(&["b", "f"], 0, 40),
576            new_batch_by_range(&["b", "h"], 100, 200),
577        ]);
578
579        // Write to local cache and upload sst to mock remote store
580        let write_request = SstWriteRequest {
581            op_type: OperationType::Flush,
582            metadata,
583            source: either::Left(source),
584            storage: None,
585            max_sequence: None,
586            cache_manager: cache_manager.clone(),
587            index_options: IndexOptions::default(),
588            index_config: Default::default(),
589            inverted_index_config: Default::default(),
590            fulltext_index_config: Default::default(),
591            bloom_filter_index_config: Default::default(),
592        };
593        let write_opts = WriteOptions {
594            row_group_size: 512,
595            ..Default::default()
596        };
597        let upload_request = SstUploadRequest {
598            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
599            remote_store: mock_store.clone(),
600        };
601
602        let mut metrics = Metrics::new(WriteType::Flush);
603        let mut sst_infos = write_cache
604            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
605            .await
606            .unwrap();
607        let sst_info = sst_infos.remove(0);
608        let write_parquet_metadata = sst_info.file_metadata.unwrap();
609
610        // Read metadata from write cache
611        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
612        let builder = ParquetReaderBuilder::new(
613            data_home,
614            PathType::Bare,
615            handle.clone(),
616            mock_store.clone(),
617        )
618        .cache(CacheStrategy::EnableAll(cache_manager.clone()));
619        let reader = builder.build().await.unwrap();
620
621        // Check parquet metadata
622        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
623    }
624
625    #[tokio::test]
626    async fn test_write_cache_clean_tmp_files() {
627        common_telemetry::init_default_ut_logging();
628        let mut env = TestEnv::new().await;
629        let data_home = env.data_home().display().to_string();
630        let mock_store = env.init_object_store_manager();
631
632        let write_cache_dir = create_temp_dir("");
633        let write_cache_path = write_cache_dir.path().to_str().unwrap();
634        let write_cache = env
635            .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
636            .await;
637
638        // Create a cache manager using only write cache
639        let cache_manager = Arc::new(
640            CacheManager::builder()
641                .write_cache(Some(write_cache.clone()))
642                .build(),
643        );
644
645        // Create source
646        let metadata = Arc::new(sst_region_metadata());
647
648        // Creates a source that can return an error to abort the writer.
649        let source = Source::Iter(Box::new(
650            [
651                Ok(new_batch_by_range(&["a", "d"], 0, 60)),
652                InvalidBatchSnafu {
653                    reason: "Abort the writer",
654                }
655                .fail(),
656            ]
657            .into_iter(),
658        ));
659
660        // Write to local cache and upload sst to mock remote store
661        let write_request = SstWriteRequest {
662            op_type: OperationType::Flush,
663            metadata,
664            source: either::Left(source),
665            storage: None,
666            max_sequence: None,
667            cache_manager: cache_manager.clone(),
668            index_options: IndexOptions::default(),
669            index_config: Default::default(),
670            inverted_index_config: Default::default(),
671            fulltext_index_config: Default::default(),
672            bloom_filter_index_config: Default::default(),
673        };
674        let write_opts = WriteOptions {
675            row_group_size: 512,
676            ..Default::default()
677        };
678        let upload_request = SstUploadRequest {
679            dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
680            remote_store: mock_store.clone(),
681        };
682
683        let mut metrics = Metrics::new(WriteType::Flush);
684        write_cache
685            .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
686            .await
687            .unwrap_err();
688        let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
689        let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
690        let mut has_files = false;
691        while let Some(entry) = entries.next_entry().await.unwrap() {
692            if entry.file_type().await.unwrap().is_dir() {
693                continue;
694            }
695            has_files = true;
696            common_telemetry::warn!(
697                "Found remaining temporary file in atomic dir: {}",
698                entry.path().display()
699            );
700        }
701
702        assert!(!has_files);
703    }
704}