mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod checkpoint;
16pub(crate) mod delta;
17pub(crate) mod size_tracker;
18pub(crate) mod staging;
19pub(crate) mod utils;
20
21use std::iter::Iterator;
22use std::str::FromStr;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use common_datasource::compression::CompressionType;
27use common_telemetry::debug;
28use crc32fast::Hasher;
29use lazy_static::lazy_static;
30use object_store::util::join_dir;
31use object_store::{Lister, ObjectStore, util};
32use regex::Regex;
33use snafu::{ResultExt, ensure};
34use store_api::ManifestVersion;
35use store_api::storage::RegionId;
36
37use crate::cache::manifest_cache::ManifestCache;
38use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result};
39use crate::manifest::storage::checkpoint::CheckpointStorage;
40use crate::manifest::storage::delta::DeltaStorage;
41use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
42use crate::manifest::storage::staging::StagingStorage;
43use crate::manifest::storage::utils::remove_from_cache;
44
45lazy_static! {
46    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
47    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
48}
49
50pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
51const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
52/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
53/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
54pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
55const FETCH_MANIFEST_PARALLELISM: usize = 16;
56
57/// Returns the directory to the manifest files.
58pub fn manifest_dir(region_dir: &str) -> String {
59    join_dir(region_dir, "manifest")
60}
61
62/// Returns the [CompressionType] according to whether to compress manifest files.
63pub const fn manifest_compress_type(compress: bool) -> CompressionType {
64    if compress {
65        DEFAULT_MANIFEST_COMPRESSION_TYPE
66    } else {
67        FALL_BACK_COMPRESS_TYPE
68    }
69}
70
71pub fn delta_file(version: ManifestVersion) -> String {
72    format!("{version:020}.json")
73}
74
75pub fn checkpoint_file(version: ManifestVersion) -> String {
76    format!("{version:020}.checkpoint")
77}
78
79pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
80    if compress_type == CompressionType::Uncompressed {
81        format!("{}{}", path, file)
82    } else {
83        format!("{}{}.{}", path, file, compress_type.file_extension())
84    }
85}
86
87pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
88    let mut hasher = Hasher::new();
89    hasher.update(data);
90    hasher.finalize()
91}
92
93pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
94    if let Some(checksum) = wanted {
95        let calculated_checksum = checkpoint_checksum(data);
96        ensure!(
97            checksum == calculated_checksum,
98            ChecksumMismatchSnafu {
99                actual: calculated_checksum,
100                expected: checksum,
101            }
102        );
103    }
104    Ok(())
105}
106
107/// Return's the file manifest version from path
108///
109/// # Panics
110/// If the file path is not a valid delta or checkpoint file.
111pub fn file_version(path: &str) -> ManifestVersion {
112    let s = path.split('.').next().unwrap();
113    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
114}
115
116/// Return's the file compress algorithm by file extension.
117///
118/// for example file
119/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
120pub fn file_compress_type(path: &str) -> CompressionType {
121    let s = path.rsplit('.').next().unwrap_or("");
122    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
123}
124
125pub fn is_delta_file(file_name: &str) -> bool {
126    DELTA_RE.is_match(file_name)
127}
128
129pub fn is_checkpoint_file(file_name: &str) -> bool {
130    CHECKPOINT_RE.is_match(file_name)
131}
132
133#[derive(Clone, Debug)]
134pub struct ManifestObjectStore {
135    object_store: ObjectStore,
136    path: String,
137    /// Optional manifest cache for local caching.
138    manifest_cache: Option<ManifestCache>,
139    // Tracks the size of each file in the manifest directory.
140    size_tracker: SizeTracker,
141    // The checkpoint file storage.
142    checkpoint_storage: CheckpointStorage<CheckpointTracker>,
143    // The delta file storage.
144    delta_storage: DeltaStorage<DeltaTracker>,
145    /// The staging file storage.
146    staging_storage: StagingStorage,
147}
148
149impl ManifestObjectStore {
150    pub fn new(
151        path: &str,
152        object_store: ObjectStore,
153        compress_type: CompressionType,
154        total_manifest_size: Arc<AtomicU64>,
155        manifest_cache: Option<ManifestCache>,
156    ) -> Self {
157        common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
158
159        let path = util::normalize_dir(path);
160        let size_tracker = SizeTracker::new(total_manifest_size);
161        let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
162        let delta_tracker = Arc::new(size_tracker.manifest_tracker());
163        let checkpoint_storage = CheckpointStorage::new(
164            path.clone(),
165            object_store.clone(),
166            compress_type,
167            manifest_cache.clone(),
168            checkpoint_tracker,
169        );
170        let delta_storage = DeltaStorage::new(
171            path.clone(),
172            object_store.clone(),
173            compress_type,
174            manifest_cache.clone(),
175            delta_tracker,
176        );
177        let staging_storage =
178            StagingStorage::new(path.clone(), object_store.clone(), compress_type);
179
180        Self {
181            object_store,
182            path,
183            manifest_cache,
184            size_tracker,
185            checkpoint_storage,
186            delta_storage,
187            staging_storage,
188        }
189    }
190
191    /// Returns the manifest dir
192    pub(crate) fn manifest_dir(&self) -> &str {
193        &self.path
194    }
195
196    /// Returns an iterator of manifests from normal or staging directory.
197    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
198        if is_staging {
199            self.staging_storage.manifest_lister().await
200        } else {
201            self.delta_storage.manifest_lister().await
202        }
203    }
204
205    /// Fetches manifests in range [start_version, end_version).
206    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
207    pub async fn fetch_manifests_strict_from(
208        &self,
209        start_version: ManifestVersion,
210        end_version: ManifestVersion,
211        region_id: RegionId,
212    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
213        self.delta_storage
214            .fetch_manifests_strict_from(start_version, end_version, region_id)
215            .await
216    }
217
218    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
219    ///
220    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
221    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
222    pub async fn fetch_manifests(
223        &self,
224        start_version: ManifestVersion,
225        end_version: ManifestVersion,
226    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
227        self.delta_storage
228            .fetch_manifests(start_version, end_version)
229            .await
230    }
231
232    /// Delete manifest files that version < end.
233    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
234    /// ### Return
235    /// The number of deleted files.
236    pub async fn delete_until(
237        &self,
238        end: ManifestVersion,
239        keep_last_checkpoint: bool,
240    ) -> Result<usize> {
241        // Stores (entry, is_checkpoint, version) in a Vec.
242        let entries: Vec<_> = self
243            .delta_storage
244            .get_paths(|entry| {
245                let file_name = entry.name();
246                let is_checkpoint = is_checkpoint_file(file_name);
247                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
248                    let version = file_version(file_name);
249                    if version < end {
250                        return Some((entry, is_checkpoint, version));
251                    }
252                }
253                None
254            })
255            .await?;
256        let checkpoint_version = if keep_last_checkpoint {
257            // Note that the order of entries is unspecific.
258            entries
259                .iter()
260                .filter_map(
261                    |(_e, is_checkpoint, version)| {
262                        if *is_checkpoint { Some(version) } else { None }
263                    },
264                )
265                .max()
266        } else {
267            None
268        };
269        let del_entries: Vec<_> = entries
270            .iter()
271            .filter(|(_e, is_checkpoint, version)| {
272                if let Some(max_version) = checkpoint_version {
273                    if *is_checkpoint {
274                        // We need to keep the checkpoint file.
275                        version < max_version
276                    } else {
277                        // We can delete the log file with max_version as the checkpoint
278                        // file contains the log file's content.
279                        version <= max_version
280                    }
281                } else {
282                    true
283                }
284            })
285            .collect();
286        let paths = del_entries
287            .iter()
288            .map(|(e, _, _)| e.path().to_string())
289            .collect::<Vec<_>>();
290        let ret = paths.len();
291
292        debug!(
293            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
294            ret, self.path, end, checkpoint_version, paths,
295        );
296
297        // Remove from cache first
298        for (entry, _, _) in &del_entries {
299            remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
300        }
301
302        self.object_store
303            .delete_iter(paths)
304            .await
305            .context(OpenDalSnafu)?;
306
307        // delete manifest sizes
308        for (_, is_checkpoint, version) in &del_entries {
309            if *is_checkpoint {
310                self.size_tracker
311                    .remove(&size_tracker::FileKey::Checkpoint(*version));
312            } else {
313                self.size_tracker
314                    .remove(&size_tracker::FileKey::Delta(*version));
315            }
316        }
317
318        Ok(ret)
319    }
320
321    /// Save the delta manifest file.
322    pub async fn save(
323        &mut self,
324        version: ManifestVersion,
325        bytes: &[u8],
326        is_staging: bool,
327    ) -> Result<()> {
328        if is_staging {
329            self.staging_storage.save(version, bytes).await
330        } else {
331            self.delta_storage.save(version, bytes).await
332        }
333    }
334
335    /// Save the checkpoint manifest file.
336    pub(crate) async fn save_checkpoint(
337        &self,
338        version: ManifestVersion,
339        bytes: &[u8],
340    ) -> Result<()> {
341        self.checkpoint_storage
342            .save_checkpoint(version, bytes)
343            .await
344    }
345
346    /// Load the latest checkpoint.
347    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
348    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
349        self.checkpoint_storage.load_last_checkpoint().await
350    }
351
352    /// Compute the size(Byte) in manifest size map.
353    pub(crate) fn total_manifest_size(&self) -> u64 {
354        self.size_tracker.total()
355    }
356
357    /// Resets the size of all files.
358    pub(crate) fn reset_manifest_size(&mut self) {
359        self.size_tracker.reset();
360    }
361
362    /// Set the size of the delta file by delta version.
363    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
364        self.size_tracker.record_delta(version, size);
365    }
366
367    /// Set the size of the checkpoint file by checkpoint version.
368    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
369        self.size_tracker.record_checkpoint(version, size);
370    }
371
372    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
373    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
374        self.staging_storage.fetch_manifests().await
375    }
376
377    /// Clear all staging manifest files.
378    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
379        self.staging_storage.clear().await
380    }
381}
382
383#[cfg(test)]
384impl ManifestObjectStore {
385    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
386        self.object_store
387            .read(path)
388            .await
389            .context(OpenDalSnafu)
390            .map(|v| v.to_vec())
391    }
392
393    pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
394        &self.checkpoint_storage
395    }
396
397    pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
398        &self.delta_storage
399    }
400
401    pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
402        self.checkpoint_storage.set_compress_type(compress_type);
403        self.delta_storage.set_compress_type(compress_type);
404        self.staging_storage.set_compress_type(compress_type);
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use common_test_util::temp_dir::create_temp_dir;
411    use object_store::ObjectStore;
412    use object_store::services::Fs;
413
414    use super::*;
415    use crate::manifest::storage::checkpoint::CheckpointMetadata;
416
417    fn new_test_manifest_store() -> ManifestObjectStore {
418        common_telemetry::init_default_ut_logging();
419        let tmp_dir = create_temp_dir("test_manifest_log_store");
420        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
421        let object_store = ObjectStore::new(builder).unwrap().finish();
422        ManifestObjectStore::new(
423            "/",
424            object_store,
425            CompressionType::Uncompressed,
426            Default::default(),
427            None,
428        )
429    }
430
431    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
432        CheckpointMetadata {
433            size: 0,
434            version,
435            checksum: None,
436            extend_metadata: Default::default(),
437        }
438    }
439
440    #[test]
441    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
442    fn test_compress_file_path_generation() {
443        let path = "/foo/bar/";
444        let version: ManifestVersion = 0;
445        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
446        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
447    }
448
449    #[tokio::test]
450    async fn test_manifest_log_store_uncompress() {
451        let mut log_store = new_test_manifest_store();
452        log_store.set_compress_type(CompressionType::Uncompressed);
453        test_manifest_log_store_case(log_store).await;
454    }
455
456    #[tokio::test]
457    async fn test_manifest_log_store_compress() {
458        let mut log_store = new_test_manifest_store();
459        log_store.set_compress_type(CompressionType::Gzip);
460        test_manifest_log_store_case(log_store).await;
461    }
462
463    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
464        for v in 0..5 {
465            log_store
466                .save(v, format!("hello, {v}").as_bytes(), false)
467                .await
468                .unwrap();
469        }
470
471        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
472        let mut it = manifests.into_iter();
473        for v in 1..4 {
474            let (version, bytes) = it.next().unwrap();
475            assert_eq!(v, version);
476            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
477        }
478        assert!(it.next().is_none());
479
480        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
481        let mut it = manifests.into_iter();
482        for v in 0..5 {
483            let (version, bytes) = it.next().unwrap();
484            assert_eq!(v, version);
485            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
486        }
487        assert!(it.next().is_none());
488
489        // test checkpoint
490        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
491        log_store
492            .save_checkpoint(3, "checkpoint".as_bytes())
493            .await
494            .unwrap();
495
496        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
497        assert_eq!(checkpoint, "checkpoint".as_bytes());
498        assert_eq!(3, v);
499
500        //delete (,4) logs and keep checkpoint 3.
501        let _ = log_store.delete_until(4, true).await.unwrap();
502        let _ = log_store
503            .checkpoint_storage
504            .load_checkpoint(new_checkpoint_metadata_with_version(3))
505            .await
506            .unwrap()
507            .unwrap();
508        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
509        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
510        let mut it = manifests.into_iter();
511
512        let (version, bytes) = it.next().unwrap();
513        assert_eq!(4, version);
514        assert_eq!("hello, 4".as_bytes(), bytes);
515        assert!(it.next().is_none());
516
517        // delete all logs and checkpoints
518        let _ = log_store.delete_until(11, false).await.unwrap();
519        assert!(
520            log_store
521                .checkpoint_storage
522                .load_checkpoint(new_checkpoint_metadata_with_version(3))
523                .await
524                .unwrap()
525                .is_none()
526        );
527        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
528        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
529        let mut it = manifests.into_iter();
530
531        assert!(it.next().is_none());
532    }
533
534    #[tokio::test]
535    // test ManifestObjectStore can read/delete previously uncompressed data correctly
536    async fn test_compress_backward_compatible() {
537        let mut log_store = new_test_manifest_store();
538
539        // write uncompress data to stimulate previously uncompressed data
540        log_store.set_compress_type(CompressionType::Uncompressed);
541        for v in 0..5 {
542            log_store
543                .save(v, format!("hello, {v}").as_bytes(), false)
544                .await
545                .unwrap();
546        }
547        log_store
548            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
549            .await
550            .unwrap();
551
552        // change compress type
553        log_store.set_compress_type(CompressionType::Gzip);
554
555        // test load_last_checkpoint work correctly for previously uncompressed data
556        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
557        assert_eq!(v, 5);
558        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
559
560        // write compressed data to stimulate compress algorithm take effect
561        for v in 5..10 {
562            log_store
563                .save(v, format!("hello, {v}").as_bytes(), false)
564                .await
565                .unwrap();
566        }
567        log_store
568            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
569            .await
570            .unwrap();
571
572        // test data reading
573        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
574        let mut it = manifests.into_iter();
575
576        for v in 0..10 {
577            let (version, bytes) = it.next().unwrap();
578            assert_eq!(v, version);
579            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
580        }
581        let (v, checkpoint) = log_store
582            .checkpoint_storage
583            .load_checkpoint(new_checkpoint_metadata_with_version(5))
584            .await
585            .unwrap()
586            .unwrap();
587        assert_eq!(v, 5);
588        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
589        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
590        assert_eq!(v, 10);
591        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
592
593        // Delete util 10, contain uncompressed/compressed data
594        // log 0, 1, 2, 7, 8, 9 will be delete
595        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
596        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
597        let mut it = manifests.into_iter();
598        assert!(it.next().is_none());
599    }
600
601    #[tokio::test]
602    async fn test_file_version() {
603        let version = file_version("00000000000000000007.checkpoint");
604        assert_eq!(version, 7);
605
606        let name = delta_file(version);
607        assert_eq!(name, "00000000000000000007.json");
608
609        let name = checkpoint_file(version);
610        assert_eq!(name, "00000000000000000007.checkpoint");
611    }
612
613    #[tokio::test]
614    async fn test_uncompressed_manifest_files_size() {
615        let mut log_store = new_test_manifest_store();
616        // write 5 manifest files with uncompressed(8B per file)
617        log_store.set_compress_type(CompressionType::Uncompressed);
618        for v in 0..5 {
619            log_store
620                .save(v, format!("hello, {v}").as_bytes(), false)
621                .await
622                .unwrap();
623        }
624        // write 1 checkpoint file with uncompressed(23B)
625        log_store
626            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
627            .await
628            .unwrap();
629
630        // manifest files size
631        assert_eq!(log_store.total_manifest_size(), 63);
632
633        // delete 3 manifest files
634        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
635
636        // manifest files size after delete
637        assert_eq!(log_store.total_manifest_size(), 39);
638
639        // delete all manifest files
640        assert_eq!(
641            log_store
642                .delete_until(ManifestVersion::MAX, false)
643                .await
644                .unwrap(),
645            3
646        );
647
648        assert_eq!(log_store.total_manifest_size(), 0);
649    }
650
651    #[tokio::test]
652    async fn test_compressed_manifest_files_size() {
653        let mut log_store = new_test_manifest_store();
654        // Test with compressed manifest files
655        log_store.set_compress_type(CompressionType::Gzip);
656        // write 5 manifest files
657        for v in 0..5 {
658            log_store
659                .save(v, format!("hello, {v}").as_bytes(), false)
660                .await
661                .unwrap();
662        }
663        log_store
664            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
665            .await
666            .unwrap();
667
668        // manifest files size
669        assert_eq!(log_store.total_manifest_size(), 181);
670
671        // delete 3 manifest files
672        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
673
674        // manifest files size after delete
675        assert_eq!(log_store.total_manifest_size(), 97);
676
677        // delete all manifest files
678        assert_eq!(
679            log_store
680                .delete_until(ManifestVersion::MAX, false)
681                .await
682                .unwrap(),
683            3
684        );
685
686        assert_eq!(log_store.total_manifest_size(), 0);
687    }
688}