mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod checkpoint;
16pub(crate) mod delta;
17pub(crate) mod size_tracker;
18pub(crate) mod staging;
19pub(crate) mod utils;
20
21use std::iter::Iterator;
22use std::str::FromStr;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use common_datasource::compression::CompressionType;
27use common_telemetry::debug;
28use crc32fast::Hasher;
29use lazy_static::lazy_static;
30use object_store::util::join_dir;
31use object_store::{Lister, ObjectStore, util};
32use regex::Regex;
33use snafu::{ResultExt, ensure};
34use store_api::ManifestVersion;
35use store_api::storage::RegionId;
36
37use crate::cache::manifest_cache::ManifestCache;
38use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result};
39use crate::manifest::storage::checkpoint::CheckpointStorage;
40use crate::manifest::storage::delta::DeltaStorage;
41use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
42use crate::manifest::storage::staging::StagingStorage;
43use crate::manifest::storage::utils::remove_from_cache;
44
45lazy_static! {
46    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
47    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
48}
49
50pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
51const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
52/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
53/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
54pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
55const FETCH_MANIFEST_PARALLELISM: usize = 16;
56
57/// Returns the directory to the manifest files.
58pub fn manifest_dir(region_dir: &str) -> String {
59    join_dir(region_dir, "manifest")
60}
61
62/// Returns the [CompressionType] according to whether to compress manifest files.
63pub const fn manifest_compress_type(compress: bool) -> CompressionType {
64    if compress {
65        DEFAULT_MANIFEST_COMPRESSION_TYPE
66    } else {
67        FALL_BACK_COMPRESS_TYPE
68    }
69}
70
71pub fn delta_file(version: ManifestVersion) -> String {
72    format!("{version:020}.json")
73}
74
75pub fn checkpoint_file(version: ManifestVersion) -> String {
76    format!("{version:020}.checkpoint")
77}
78
79pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
80    if compress_type == CompressionType::Uncompressed {
81        format!("{}{}", path, file)
82    } else {
83        format!("{}{}.{}", path, file, compress_type.file_extension())
84    }
85}
86
87pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
88    let mut hasher = Hasher::new();
89    hasher.update(data);
90    hasher.finalize()
91}
92
93pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
94    if let Some(checksum) = wanted {
95        let calculated_checksum = checkpoint_checksum(data);
96        ensure!(
97            checksum == calculated_checksum,
98            ChecksumMismatchSnafu {
99                actual: calculated_checksum,
100                expected: checksum,
101            }
102        );
103    }
104    Ok(())
105}
106
107/// Return's the file manifest version from path
108///
109/// # Panics
110/// If the file path is not a valid delta or checkpoint file.
111pub fn file_version(path: &str) -> ManifestVersion {
112    let s = path.split('.').next().unwrap();
113    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
114}
115
116/// Return's the file compress algorithm by file extension.
117///
118/// for example file
119/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
120pub fn file_compress_type(path: &str) -> CompressionType {
121    let s = path.rsplit('.').next().unwrap_or("");
122    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
123}
124
125pub fn is_delta_file(file_name: &str) -> bool {
126    DELTA_RE.is_match(file_name)
127}
128
129pub fn is_checkpoint_file(file_name: &str) -> bool {
130    CHECKPOINT_RE.is_match(file_name)
131}
132
133#[derive(Clone, Debug)]
134pub struct ManifestObjectStore {
135    object_store: ObjectStore,
136    path: String,
137    /// Optional manifest cache for local caching.
138    manifest_cache: Option<ManifestCache>,
139    // Tracks the size of each file in the manifest directory.
140    size_tracker: SizeTracker,
141    // The checkpoint file storage.
142    checkpoint_storage: CheckpointStorage<CheckpointTracker>,
143    // The delta file storage.
144    delta_storage: DeltaStorage<DeltaTracker>,
145    /// The staging file storage.
146    staging_storage: StagingStorage,
147}
148
149impl ManifestObjectStore {
150    pub fn new(
151        path: &str,
152        object_store: ObjectStore,
153        compress_type: CompressionType,
154        total_manifest_size: Arc<AtomicU64>,
155        manifest_cache: Option<ManifestCache>,
156    ) -> Self {
157        common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
158
159        let path = util::normalize_dir(path);
160        let size_tracker = SizeTracker::new(total_manifest_size);
161        let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
162        let delta_tracker = Arc::new(size_tracker.manifest_tracker());
163        let checkpoint_storage = CheckpointStorage::new(
164            path.clone(),
165            object_store.clone(),
166            compress_type,
167            manifest_cache.clone(),
168            checkpoint_tracker,
169        );
170        let delta_storage = DeltaStorage::new(
171            path.clone(),
172            object_store.clone(),
173            compress_type,
174            manifest_cache.clone(),
175            delta_tracker,
176        );
177        let staging_storage =
178            StagingStorage::new(path.clone(), object_store.clone(), compress_type);
179
180        Self {
181            object_store,
182            path,
183            manifest_cache,
184            size_tracker,
185            checkpoint_storage,
186            delta_storage,
187            staging_storage,
188        }
189    }
190
191    /// Returns the manifest dir
192    pub(crate) fn manifest_dir(&self) -> &str {
193        &self.path
194    }
195
196    /// Returns an iterator of manifests from normal or staging directory.
197    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
198        if is_staging {
199            self.staging_storage.manifest_lister().await
200        } else {
201            self.delta_storage.manifest_lister().await
202        }
203    }
204
205    /// Fetches manifests in range [start_version, end_version).
206    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
207    pub async fn fetch_manifests_strict_from(
208        &self,
209        start_version: ManifestVersion,
210        end_version: ManifestVersion,
211        region_id: RegionId,
212    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
213        self.delta_storage
214            .fetch_manifests_strict_from(start_version, end_version, region_id)
215            .await
216    }
217
218    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
219    ///
220    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
221    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
222    pub async fn fetch_manifests(
223        &self,
224        start_version: ManifestVersion,
225        end_version: ManifestVersion,
226    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
227        self.delta_storage
228            .fetch_manifests(start_version, end_version)
229            .await
230    }
231
232    /// Delete manifest files that version < end.
233    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
234    /// ### Return
235    /// The number of deleted files.
236    pub async fn delete_until(
237        &self,
238        end: ManifestVersion,
239        keep_last_checkpoint: bool,
240    ) -> Result<usize> {
241        // Stores (entry, is_checkpoint, version) in a Vec.
242        let entries: Vec<_> = self
243            .delta_storage
244            .get_paths(|entry| {
245                let file_name = entry.name();
246                let is_checkpoint = is_checkpoint_file(file_name);
247                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
248                    let version = file_version(file_name);
249                    if version < end {
250                        return Some((entry, is_checkpoint, version));
251                    }
252                }
253                None
254            })
255            .await?;
256        let checkpoint_version = if keep_last_checkpoint {
257            // Note that the order of entries is unspecific.
258            entries
259                .iter()
260                .filter_map(
261                    |(_e, is_checkpoint, version)| {
262                        if *is_checkpoint { Some(version) } else { None }
263                    },
264                )
265                .max()
266        } else {
267            None
268        };
269        let del_entries: Vec<_> = entries
270            .iter()
271            .filter(|(_e, is_checkpoint, version)| {
272                if let Some(max_version) = checkpoint_version {
273                    if *is_checkpoint {
274                        // We need to keep the checkpoint file.
275                        version < max_version
276                    } else {
277                        // We can delete the log file with max_version as the checkpoint
278                        // file contains the log file's content.
279                        version <= max_version
280                    }
281                } else {
282                    true
283                }
284            })
285            .collect();
286        let paths = del_entries
287            .iter()
288            .map(|(e, _, _)| e.path().to_string())
289            .collect::<Vec<_>>();
290        let ret = paths.len();
291
292        debug!(
293            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
294            ret, self.path, end, checkpoint_version, paths,
295        );
296
297        // Remove from cache first
298        for (entry, _, _) in &del_entries {
299            remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
300        }
301
302        self.object_store
303            .delete_iter(paths)
304            .await
305            .context(OpenDalSnafu)?;
306
307        // delete manifest sizes
308        for (_, is_checkpoint, version) in &del_entries {
309            if *is_checkpoint {
310                self.size_tracker
311                    .remove(&size_tracker::FileKey::Checkpoint(*version));
312            } else {
313                self.size_tracker
314                    .remove(&size_tracker::FileKey::Delta(*version));
315            }
316        }
317
318        Ok(ret)
319    }
320
321    /// Save the delta manifest file.
322    pub async fn save(
323        &mut self,
324        version: ManifestVersion,
325        bytes: &[u8],
326        is_staging: bool,
327    ) -> Result<()> {
328        if is_staging {
329            self.staging_storage.save(version, bytes).await
330        } else {
331            self.delta_storage.save(version, bytes).await
332        }
333    }
334
335    /// Save the checkpoint manifest file.
336    pub(crate) async fn save_checkpoint(
337        &self,
338        version: ManifestVersion,
339        bytes: &[u8],
340    ) -> Result<()> {
341        self.checkpoint_storage
342            .save_checkpoint(version, bytes)
343            .await
344    }
345
346    /// Load the latest checkpoint.
347    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
348    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
349        self.checkpoint_storage.load_last_checkpoint().await
350    }
351
352    /// Compute the size(Byte) in manifest size map.
353    pub(crate) fn total_manifest_size(&self) -> u64 {
354        self.size_tracker.total()
355    }
356
357    /// Resets the size of all files.
358    pub(crate) fn reset_manifest_size(&mut self) {
359        self.size_tracker.reset();
360    }
361
362    /// Set the size of the delta file by delta version.
363    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
364        self.size_tracker.record_delta(version, size);
365    }
366
367    /// Set the size of the checkpoint file by checkpoint version.
368    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
369        self.size_tracker.record_checkpoint(version, size);
370    }
371
372    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
373    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
374        self.staging_storage.fetch_manifests().await
375    }
376
377    /// Clear all staging manifest files.
378    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
379        self.staging_storage.clear().await
380    }
381
382    /// Returns the staging storage.
383    pub(crate) fn staging_storage(&self) -> &StagingStorage {
384        &self.staging_storage
385    }
386}
387
388#[cfg(test)]
389impl ManifestObjectStore {
390    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
391        self.object_store
392            .read(path)
393            .await
394            .context(OpenDalSnafu)
395            .map(|v| v.to_vec())
396    }
397
398    pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
399        &self.checkpoint_storage
400    }
401
402    pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
403        &self.delta_storage
404    }
405
406    pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
407        self.checkpoint_storage.set_compress_type(compress_type);
408        self.delta_storage.set_compress_type(compress_type);
409        self.staging_storage.set_compress_type(compress_type);
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use common_test_util::temp_dir::create_temp_dir;
416    use object_store::ObjectStore;
417    use object_store::services::Fs;
418
419    use super::*;
420    use crate::manifest::storage::checkpoint::CheckpointMetadata;
421
422    fn new_test_manifest_store() -> ManifestObjectStore {
423        common_telemetry::init_default_ut_logging();
424        let tmp_dir = create_temp_dir("test_manifest_log_store");
425        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
426        let object_store = ObjectStore::new(builder).unwrap().finish();
427        ManifestObjectStore::new(
428            "/",
429            object_store,
430            CompressionType::Uncompressed,
431            Default::default(),
432            None,
433        )
434    }
435
436    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
437        CheckpointMetadata {
438            size: 0,
439            version,
440            checksum: None,
441            extend_metadata: Default::default(),
442        }
443    }
444
445    #[test]
446    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
447    fn test_compress_file_path_generation() {
448        let path = "/foo/bar/";
449        let version: ManifestVersion = 0;
450        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
451        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
452    }
453
454    #[tokio::test]
455    async fn test_manifest_log_store_uncompress() {
456        let mut log_store = new_test_manifest_store();
457        log_store.set_compress_type(CompressionType::Uncompressed);
458        test_manifest_log_store_case(log_store).await;
459    }
460
461    #[tokio::test]
462    async fn test_manifest_log_store_compress() {
463        let mut log_store = new_test_manifest_store();
464        log_store.set_compress_type(CompressionType::Gzip);
465        test_manifest_log_store_case(log_store).await;
466    }
467
468    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
469        for v in 0..5 {
470            log_store
471                .save(v, format!("hello, {v}").as_bytes(), false)
472                .await
473                .unwrap();
474        }
475
476        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
477        let mut it = manifests.into_iter();
478        for v in 1..4 {
479            let (version, bytes) = it.next().unwrap();
480            assert_eq!(v, version);
481            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
482        }
483        assert!(it.next().is_none());
484
485        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
486        let mut it = manifests.into_iter();
487        for v in 0..5 {
488            let (version, bytes) = it.next().unwrap();
489            assert_eq!(v, version);
490            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
491        }
492        assert!(it.next().is_none());
493
494        // test checkpoint
495        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
496        log_store
497            .save_checkpoint(3, "checkpoint".as_bytes())
498            .await
499            .unwrap();
500
501        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
502        assert_eq!(checkpoint, "checkpoint".as_bytes());
503        assert_eq!(3, v);
504
505        //delete (,4) logs and keep checkpoint 3.
506        let _ = log_store.delete_until(4, true).await.unwrap();
507        let _ = log_store
508            .checkpoint_storage
509            .load_checkpoint(new_checkpoint_metadata_with_version(3))
510            .await
511            .unwrap()
512            .unwrap();
513        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
514        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
515        let mut it = manifests.into_iter();
516
517        let (version, bytes) = it.next().unwrap();
518        assert_eq!(4, version);
519        assert_eq!("hello, 4".as_bytes(), bytes);
520        assert!(it.next().is_none());
521
522        // delete all logs and checkpoints
523        let _ = log_store.delete_until(11, false).await.unwrap();
524        assert!(
525            log_store
526                .checkpoint_storage
527                .load_checkpoint(new_checkpoint_metadata_with_version(3))
528                .await
529                .unwrap()
530                .is_none()
531        );
532        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
533        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
534        let mut it = manifests.into_iter();
535
536        assert!(it.next().is_none());
537    }
538
539    #[tokio::test]
540    // test ManifestObjectStore can read/delete previously uncompressed data correctly
541    async fn test_compress_backward_compatible() {
542        let mut log_store = new_test_manifest_store();
543
544        // write uncompress data to stimulate previously uncompressed data
545        log_store.set_compress_type(CompressionType::Uncompressed);
546        for v in 0..5 {
547            log_store
548                .save(v, format!("hello, {v}").as_bytes(), false)
549                .await
550                .unwrap();
551        }
552        log_store
553            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
554            .await
555            .unwrap();
556
557        // change compress type
558        log_store.set_compress_type(CompressionType::Gzip);
559
560        // test load_last_checkpoint work correctly for previously uncompressed data
561        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
562        assert_eq!(v, 5);
563        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
564
565        // write compressed data to stimulate compress algorithm take effect
566        for v in 5..10 {
567            log_store
568                .save(v, format!("hello, {v}").as_bytes(), false)
569                .await
570                .unwrap();
571        }
572        log_store
573            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
574            .await
575            .unwrap();
576
577        // test data reading
578        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
579        let mut it = manifests.into_iter();
580
581        for v in 0..10 {
582            let (version, bytes) = it.next().unwrap();
583            assert_eq!(v, version);
584            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
585        }
586        let (v, checkpoint) = log_store
587            .checkpoint_storage
588            .load_checkpoint(new_checkpoint_metadata_with_version(5))
589            .await
590            .unwrap()
591            .unwrap();
592        assert_eq!(v, 5);
593        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
594        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
595        assert_eq!(v, 10);
596        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
597
598        // Delete util 10, contain uncompressed/compressed data
599        // log 0, 1, 2, 7, 8, 9 will be delete
600        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
601        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
602        let mut it = manifests.into_iter();
603        assert!(it.next().is_none());
604    }
605
606    #[tokio::test]
607    async fn test_file_version() {
608        let version = file_version("00000000000000000007.checkpoint");
609        assert_eq!(version, 7);
610
611        let name = delta_file(version);
612        assert_eq!(name, "00000000000000000007.json");
613
614        let name = checkpoint_file(version);
615        assert_eq!(name, "00000000000000000007.checkpoint");
616    }
617
618    #[tokio::test]
619    async fn test_uncompressed_manifest_files_size() {
620        let mut log_store = new_test_manifest_store();
621        // write 5 manifest files with uncompressed(8B per file)
622        log_store.set_compress_type(CompressionType::Uncompressed);
623        for v in 0..5 {
624            log_store
625                .save(v, format!("hello, {v}").as_bytes(), false)
626                .await
627                .unwrap();
628        }
629        // write 1 checkpoint file with uncompressed(23B)
630        log_store
631            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
632            .await
633            .unwrap();
634
635        // manifest files size
636        assert_eq!(log_store.total_manifest_size(), 63);
637
638        // delete 3 manifest files
639        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
640
641        // manifest files size after delete
642        assert_eq!(log_store.total_manifest_size(), 39);
643
644        // delete all manifest files
645        assert_eq!(
646            log_store
647                .delete_until(ManifestVersion::MAX, false)
648                .await
649                .unwrap(),
650            3
651        );
652
653        assert_eq!(log_store.total_manifest_size(), 0);
654    }
655
656    #[tokio::test]
657    async fn test_compressed_manifest_files_size() {
658        let mut log_store = new_test_manifest_store();
659        // Test with compressed manifest files
660        log_store.set_compress_type(CompressionType::Gzip);
661        // write 5 manifest files
662        for v in 0..5 {
663            log_store
664                .save(v, format!("hello, {v}").as_bytes(), false)
665                .await
666                .unwrap();
667        }
668        log_store
669            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
670            .await
671            .unwrap();
672
673        // manifest files size
674        assert_eq!(log_store.total_manifest_size(), 181);
675
676        // delete 3 manifest files
677        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
678
679        // manifest files size after delete
680        assert_eq!(log_store.total_manifest_size(), 97);
681
682        // delete all manifest files
683        assert_eq!(
684            log_store
685                .delete_until(ManifestVersion::MAX, false)
686                .await
687                .unwrap(),
688            3
689        );
690
691        assert_eq!(log_store.total_manifest_size(), 0);
692    }
693}