mito2/cache/
write_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A write-through cache for remote object stores.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28    new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29    WriteCachePathProvider,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34    FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
35    WRITE_CACHE_DOWNLOAD_ELAPSED,
36};
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44/// A cache for uploading files to remote object stores.
45///
46/// It keeps files in local disk and then sends files to object stores.
47pub struct WriteCache {
48    /// Local file cache.
49    file_cache: FileCacheRef,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for index.
53    intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59    /// Create the cache with a `local_store` to cache files and a
60    /// `object_store_manager` for all object stores.
61    pub async fn new(
62        local_store: ObjectStore,
63        cache_capacity: ReadableSize,
64        ttl: Option<Duration>,
65        puffin_manager_factory: PuffinManagerFactory,
66        intermediate_manager: IntermediateManager,
67    ) -> Result<Self> {
68        let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69        file_cache.recover(false).await;
70
71        Ok(Self {
72            file_cache,
73            puffin_manager_factory,
74            intermediate_manager,
75        })
76    }
77
78    /// Creates a write cache based on local fs.
79    pub async fn new_fs(
80        cache_dir: &str,
81        cache_capacity: ReadableSize,
82        ttl: Option<Duration>,
83        puffin_manager_factory: PuffinManagerFactory,
84        intermediate_manager: IntermediateManager,
85    ) -> Result<Self> {
86        info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88        let local_store = new_fs_cache_store(cache_dir).await?;
89        Self::new(
90            local_store,
91            cache_capacity,
92            ttl,
93            puffin_manager_factory,
94            intermediate_manager,
95        )
96        .await
97    }
98
99    /// Returns the file cache of the write cache.
100    pub(crate) fn file_cache(&self) -> FileCacheRef {
101        self.file_cache.clone()
102    }
103
104    /// Writes SST to the cache and then uploads it to the remote object store.
105    pub(crate) async fn write_and_upload_sst(
106        &self,
107        write_request: SstWriteRequest,
108        upload_request: SstUploadRequest,
109        write_opts: &WriteOptions,
110    ) -> Result<SstInfoArray> {
111        let timer = FLUSH_ELAPSED
112            .with_label_values(&["write_sst"])
113            .start_timer();
114
115        let region_id = write_request.metadata.region_id;
116
117        let store = self.file_cache.local_store();
118        let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
119        let indexer = IndexerBuilderImpl {
120            op_type: write_request.op_type,
121            metadata: write_request.metadata.clone(),
122            row_group_size: write_opts.row_group_size,
123            puffin_manager: self
124                .puffin_manager_factory
125                .build(store, path_provider.clone()),
126            intermediate_manager: self.intermediate_manager.clone(),
127            index_options: write_request.index_options,
128            inverted_index_config: write_request.inverted_index_config,
129            fulltext_index_config: write_request.fulltext_index_config,
130            bloom_filter_index_config: write_request.bloom_filter_index_config,
131        };
132
133        // Write to FileCache.
134        let mut writer = ParquetWriter::new_with_object_store(
135            self.file_cache.local_store(),
136            write_request.metadata,
137            indexer,
138            path_provider,
139        )
140        .await;
141
142        let sst_info = writer
143            .write_all(write_request.source, write_request.max_sequence, write_opts)
144            .await?;
145
146        timer.stop_and_record();
147
148        // Upload sst file to remote object store.
149        if sst_info.is_empty() {
150            return Ok(sst_info);
151        }
152
153        let mut upload_tracker = UploadTracker::new(region_id);
154        let mut err = None;
155        let remote_store = &upload_request.remote_store;
156        for sst in &sst_info {
157            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
158            let parquet_path = upload_request
159                .dest_path_provider
160                .build_sst_file_path(sst.file_id);
161            if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
162                err = Some(e);
163                break;
164            }
165            upload_tracker.push_uploaded_file(parquet_path);
166
167            if sst.index_metadata.file_size > 0 {
168                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
169                let puffin_path = upload_request
170                    .dest_path_provider
171                    .build_index_file_path(sst.file_id);
172                if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
173                    err = Some(e);
174                    break;
175                }
176                upload_tracker.push_uploaded_file(puffin_path);
177            }
178        }
179
180        if let Some(err) = err {
181            // Cleans files on failure.
182            upload_tracker
183                .clean(&sst_info, &self.file_cache, remote_store)
184                .await;
185            return Err(err);
186        }
187
188        Ok(sst_info)
189    }
190
191    /// Removes a file from the cache by `index_key`.
192    pub(crate) async fn remove(&self, index_key: IndexKey) {
193        self.file_cache.remove(index_key).await
194    }
195
196    /// Downloads a file in `remote_path` from the remote object store to the local cache
197    /// (specified by `index_key`).
198    pub(crate) async fn download(
199        &self,
200        index_key: IndexKey,
201        remote_path: &str,
202        remote_store: &ObjectStore,
203        file_size: u64,
204    ) -> Result<()> {
205        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
206        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
207
208        let file_type = index_key.file_type;
209        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
210            .with_label_values(&[match file_type {
211                FileType::Parquet => "download_parquet",
212                FileType::Puffin => "download_puffin",
213            }])
214            .start_timer();
215
216        let reader = remote_store
217            .reader_with(remote_path)
218            .concurrent(DOWNLOAD_READER_CONCURRENCY)
219            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
220            .await
221            .context(error::OpenDalSnafu)?
222            .into_futures_async_read(0..file_size)
223            .await
224            .context(error::OpenDalSnafu)?;
225
226        let cache_path = self.file_cache.cache_file_path(index_key);
227        let mut writer = self
228            .file_cache
229            .local_store()
230            .writer(&cache_path)
231            .await
232            .context(error::OpenDalSnafu)?
233            .into_futures_async_write();
234
235        let region_id = index_key.region_id;
236        let file_id = index_key.file_id;
237        let bytes_written =
238            futures::io::copy(reader, &mut writer)
239                .await
240                .context(error::DownloadSnafu {
241                    region_id,
242                    file_id,
243                    file_type,
244                })?;
245        writer.close().await.context(error::DownloadSnafu {
246            region_id,
247            file_id,
248            file_type,
249        })?;
250
251        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
252
253        let elapsed = timer.stop_and_record();
254        debug!(
255            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
256            remote_path, cache_path, bytes_written, region_id, elapsed,
257        );
258
259        let index_value = IndexValue {
260            file_size: bytes_written as _,
261        };
262        self.file_cache.put(index_key, index_value).await;
263        Ok(())
264    }
265
266    /// Uploads a Parquet file or a Puffin file to the remote object store.
267    async fn upload(
268        &self,
269        index_key: IndexKey,
270        upload_path: &str,
271        remote_store: &ObjectStore,
272    ) -> Result<()> {
273        let region_id = index_key.region_id;
274        let file_id = index_key.file_id;
275        let file_type = index_key.file_type;
276        let cache_path = self.file_cache.cache_file_path(index_key);
277
278        let timer = FLUSH_ELAPSED
279            .with_label_values(&[match file_type {
280                FileType::Parquet => "upload_parquet",
281                FileType::Puffin => "upload_puffin",
282            }])
283            .start_timer();
284
285        let cached_value = self
286            .file_cache
287            .local_store()
288            .stat(&cache_path)
289            .await
290            .context(error::OpenDalSnafu)?;
291        let reader = self
292            .file_cache
293            .local_store()
294            .reader(&cache_path)
295            .await
296            .context(error::OpenDalSnafu)?
297            .into_futures_async_read(0..cached_value.content_length())
298            .await
299            .context(error::OpenDalSnafu)?;
300
301        let mut writer = remote_store
302            .writer_with(upload_path)
303            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
304            .concurrent(DEFAULT_WRITE_CONCURRENCY)
305            .await
306            .context(error::OpenDalSnafu)?
307            .into_futures_async_write();
308
309        let bytes_written =
310            futures::io::copy(reader, &mut writer)
311                .await
312                .context(error::UploadSnafu {
313                    region_id,
314                    file_id,
315                    file_type,
316                })?;
317
318        // Must close to upload all data.
319        writer.close().await.context(error::UploadSnafu {
320            region_id,
321            file_id,
322            file_type,
323        })?;
324
325        UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
326
327        debug!(
328            "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
329            region_id,
330            file_id,
331            upload_path,
332            timer.stop_and_record()
333        );
334
335        let index_value = IndexValue {
336            file_size: bytes_written as _,
337        };
338        // Register to file cache
339        self.file_cache.put(index_key, index_value).await;
340
341        Ok(())
342    }
343}
344
345/// Request to write and upload a SST.
346pub struct SstUploadRequest {
347    /// Destination path provider of which SST files in write cache should be uploaded to.
348    pub dest_path_provider: RegionFilePathFactory,
349    /// Remote object store to upload.
350    pub remote_store: ObjectStore,
351}
352
353/// A structs to track files to upload and clean them if upload failed.
354struct UploadTracker {
355    /// Id of the region to track.
356    region_id: RegionId,
357    /// Paths of files uploaded successfully.
358    files_uploaded: Vec<String>,
359}
360
361impl UploadTracker {
362    /// Creates a new instance of `UploadTracker` for a given region.
363    fn new(region_id: RegionId) -> Self {
364        Self {
365            region_id,
366            files_uploaded: Vec::new(),
367        }
368    }
369
370    /// Add a file path to the list of uploaded files.
371    fn push_uploaded_file(&mut self, path: String) {
372        self.files_uploaded.push(path);
373    }
374
375    /// Cleans uploaded files and files in the file cache at best effort.
376    async fn clean(
377        &self,
378        sst_info: &SstInfoArray,
379        file_cache: &FileCacheRef,
380        remote_store: &ObjectStore,
381    ) {
382        common_telemetry::info!(
383            "Start cleaning files on upload failure, region: {}, num_ssts: {}",
384            self.region_id,
385            sst_info.len()
386        );
387
388        // Cleans files in the file cache first.
389        for sst in sst_info {
390            let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
391            file_cache.remove(parquet_key).await;
392
393            if sst.index_metadata.file_size > 0 {
394                let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
395                file_cache.remove(puffin_key).await;
396            }
397        }
398
399        // Cleans uploaded files.
400        for file_path in &self.files_uploaded {
401            if let Err(e) = remote_store.delete(file_path).await {
402                common_telemetry::error!(e; "Failed to delete file {}", file_path);
403            }
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use common_test_util::temp_dir::create_temp_dir;
411
412    use super::*;
413    use crate::access_layer::OperationType;
414    use crate::cache::test_util::new_fs_store;
415    use crate::cache::{CacheManager, CacheStrategy};
416    use crate::region::options::IndexOptions;
417    use crate::sst::parquet::reader::ParquetReaderBuilder;
418    use crate::test_util::sst_util::{
419        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
420        sst_region_metadata,
421    };
422    use crate::test_util::TestEnv;
423
424    #[tokio::test]
425    async fn test_write_and_upload_sst() {
426        // TODO(QuenKar): maybe find a way to create some object server for testing,
427        // and now just use local file system to mock.
428        let mut env = TestEnv::new();
429        let mock_store = env.init_object_store_manager();
430        let path_provider = RegionFilePathFactory::new("test".to_string());
431
432        let local_dir = create_temp_dir("");
433        let local_store = new_fs_store(local_dir.path().to_str().unwrap());
434
435        let write_cache = env
436            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
437            .await;
438
439        // Create Source
440        let metadata = Arc::new(sst_region_metadata());
441        let region_id = metadata.region_id;
442        let source = new_source(&[
443            new_batch_by_range(&["a", "d"], 0, 60),
444            new_batch_by_range(&["b", "f"], 0, 40),
445            new_batch_by_range(&["b", "h"], 100, 200),
446        ]);
447
448        let write_request = SstWriteRequest {
449            op_type: OperationType::Flush,
450            metadata,
451            source,
452            storage: None,
453            max_sequence: None,
454            cache_manager: Default::default(),
455            index_options: IndexOptions::default(),
456            inverted_index_config: Default::default(),
457            fulltext_index_config: Default::default(),
458            bloom_filter_index_config: Default::default(),
459        };
460
461        let upload_request = SstUploadRequest {
462            dest_path_provider: path_provider.clone(),
463            remote_store: mock_store.clone(),
464        };
465
466        let write_opts = WriteOptions {
467            row_group_size: 512,
468            ..Default::default()
469        };
470
471        // Write to cache and upload sst to mock remote store
472        let sst_info = write_cache
473            .write_and_upload_sst(write_request, upload_request, &write_opts)
474            .await
475            .unwrap()
476            .remove(0); //todo(hl): we assume it only creates one file.
477
478        let file_id = sst_info.file_id;
479        let sst_upload_path = path_provider.build_sst_file_path(file_id);
480        let index_upload_path = path_provider.build_index_file_path(file_id);
481
482        // Check write cache contains the key
483        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
484        assert!(write_cache.file_cache.contains_key(&key));
485
486        // Check file data
487        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
488        let cache_data = local_store
489            .read(&write_cache.file_cache.cache_file_path(key))
490            .await
491            .unwrap();
492        assert_eq!(remote_data.to_vec(), cache_data.to_vec());
493
494        // Check write cache contains the index key
495        let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
496        assert!(write_cache.file_cache.contains_key(&index_key));
497
498        let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
499        let cache_index_data = local_store
500            .read(&write_cache.file_cache.cache_file_path(index_key))
501            .await
502            .unwrap();
503        assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
504
505        // Removes the file from the cache.
506        let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
507        write_cache.remove(sst_index_key).await;
508        assert!(!write_cache.file_cache.contains_key(&sst_index_key));
509        write_cache.remove(index_key).await;
510        assert!(!write_cache.file_cache.contains_key(&index_key));
511    }
512
513    #[tokio::test]
514    async fn test_read_metadata_from_write_cache() {
515        common_telemetry::init_default_ut_logging();
516        let mut env = TestEnv::new();
517        let data_home = env.data_home().display().to_string();
518        let mock_store = env.init_object_store_manager();
519
520        let local_dir = create_temp_dir("");
521        let local_path = local_dir.path().to_str().unwrap();
522        let local_store = new_fs_store(local_path);
523
524        // Create a cache manager using only write cache
525        let write_cache = env
526            .create_write_cache(local_store.clone(), ReadableSize::mb(10))
527            .await;
528        let cache_manager = Arc::new(
529            CacheManager::builder()
530                .write_cache(Some(write_cache.clone()))
531                .build(),
532        );
533
534        // Create source
535        let metadata = Arc::new(sst_region_metadata());
536
537        let source = new_source(&[
538            new_batch_by_range(&["a", "d"], 0, 60),
539            new_batch_by_range(&["b", "f"], 0, 40),
540            new_batch_by_range(&["b", "h"], 100, 200),
541        ]);
542
543        // Write to local cache and upload sst to mock remote store
544        let write_request = SstWriteRequest {
545            op_type: OperationType::Flush,
546            metadata,
547            source,
548            storage: None,
549            max_sequence: None,
550            cache_manager: cache_manager.clone(),
551            index_options: IndexOptions::default(),
552            inverted_index_config: Default::default(),
553            fulltext_index_config: Default::default(),
554            bloom_filter_index_config: Default::default(),
555        };
556        let write_opts = WriteOptions {
557            row_group_size: 512,
558            ..Default::default()
559        };
560        let upload_request = SstUploadRequest {
561            dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
562            remote_store: mock_store.clone(),
563        };
564
565        let sst_info = write_cache
566            .write_and_upload_sst(write_request, upload_request, &write_opts)
567            .await
568            .unwrap()
569            .remove(0);
570        let write_parquet_metadata = sst_info.file_metadata.unwrap();
571
572        // Read metadata from write cache
573        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
574        let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
575            .cache(CacheStrategy::EnableAll(cache_manager.clone()));
576        let reader = builder.build().await.unwrap();
577
578        // Check parquet metadata
579        assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
580    }
581}