mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::future::try_join_all;
25use futures::TryStreamExt;
26use lazy_static::lazy_static;
27use object_store::{util, Entry, ErrorKind, Lister, ObjectStore};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::storage::RegionId;
32use store_api::ManifestVersion;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
48/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
49const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52/// Returns the [CompressionType] according to whether to compress manifest files.
53pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54    if compress {
55        DEFAULT_MANIFEST_COMPRESSION_TYPE
56    } else {
57        FALL_BACK_COMPRESS_TYPE
58    }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62    format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66    format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70    if compress_type == CompressionType::Uncompressed {
71        format!("{}{}", path, file)
72    } else {
73        format!("{}{}.{}", path, file, compress_type.file_extension())
74    }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78    let mut hasher = Hasher::new();
79    hasher.update(data);
80    hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84    if let Some(checksum) = wanted {
85        let calculated_checksum = checkpoint_checksum(data);
86        ensure!(
87            checksum == calculated_checksum,
88            ChecksumMismatchSnafu {
89                actual: calculated_checksum,
90                expected: checksum,
91            }
92        );
93    }
94    Ok(())
95}
96
97/// Return's the file manifest version from path
98///
99/// # Panics
100/// If the file path is not a valid delta or checkpoint file.
101pub fn file_version(path: &str) -> ManifestVersion {
102    let s = path.split('.').next().unwrap();
103    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106/// Return's the file compress algorithm by file extension.
107///
108/// for example file
109/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
110pub fn file_compress_type(path: &str) -> CompressionType {
111    let s = path.rsplit('.').next().unwrap_or("");
112    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116    DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120    CHECKPOINT_RE.is_match(file_name)
121}
122
123/// Key to identify a manifest file.
124#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126    /// A delta file (`.json`).
127    Delta(ManifestVersion),
128    /// A checkpoint file (`.checkpoint`).
129    Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134    object_store: ObjectStore,
135    compress_type: CompressionType,
136    path: String,
137    staging_path: String,
138    /// Stores the size of each manifest file.
139    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
140    total_manifest_size: Arc<AtomicU64>,
141}
142
143impl ManifestObjectStore {
144    pub fn new(
145        path: &str,
146        object_store: ObjectStore,
147        compress_type: CompressionType,
148        total_manifest_size: Arc<AtomicU64>,
149    ) -> Self {
150        let path = util::normalize_dir(path);
151        let staging_path = {
152            // Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
153            let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
154            util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
155        };
156        Self {
157            object_store,
158            compress_type,
159            path,
160            staging_path,
161            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
162            total_manifest_size,
163        }
164    }
165
166    /// Returns the delta file path under the **current** compression algorithm
167    fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
168        let base_path = if is_staging {
169            &self.staging_path
170        } else {
171            &self.path
172        };
173        gen_path(base_path, &delta_file(version), self.compress_type)
174    }
175
176    /// Returns the checkpoint file path under the **current** compression algorithm
177    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
178        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
179    }
180
181    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
182    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
183    pub(crate) fn last_checkpoint_path(&self) -> String {
184        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
185    }
186
187    /// Returns the manifest dir
188    pub(crate) fn manifest_dir(&self) -> &str {
189        &self.path
190    }
191
192    /// Returns a iterator of manifests.
193    pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
194        match self.object_store.lister_with(&self.path).await {
195            Ok(streamer) => Ok(Some(streamer)),
196            Err(e) if e.kind() == ErrorKind::NotFound => {
197                debug!("Manifest directory does not exists: {}", self.path);
198                Ok(None)
199            }
200            Err(e) => Err(e).context(OpenDalSnafu)?,
201        }
202    }
203
204    /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
205    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
206    /// Return an empty vector when directory is not found.
207    pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
208    where
209        F: Fn(Entry) -> Option<R>,
210    {
211        let Some(streamer) = self.manifest_lister().await? else {
212            return Ok(vec![]);
213        };
214
215        streamer
216            .try_filter_map(|e| async { Ok(filter(e)) })
217            .try_collect::<Vec<_>>()
218            .await
219            .context(OpenDalSnafu)
220    }
221
222    /// Sorts the manifest files.
223    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
224        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
225    }
226
227    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
228    pub async fn scan(
229        &self,
230        start: ManifestVersion,
231        end: ManifestVersion,
232    ) -> Result<Vec<(ManifestVersion, Entry)>> {
233        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
234
235        let mut entries: Vec<(ManifestVersion, Entry)> = self
236            .get_paths(|entry| {
237                let file_name = entry.name();
238                if is_delta_file(file_name) {
239                    let version = file_version(file_name);
240                    if start <= version && version < end {
241                        return Some((version, entry));
242                    }
243                }
244                None
245            })
246            .await?;
247
248        Self::sort_manifests(&mut entries);
249
250        Ok(entries)
251    }
252
253    /// Fetches manifests in range [start_version, end_version).
254    ///
255    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
256    pub async fn fetch_manifests_strict_from(
257        &self,
258        start_version: ManifestVersion,
259        end_version: ManifestVersion,
260        region_id: RegionId,
261    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
262        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
263        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
264        debug!(
265            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
266            start_version,
267            end_version,
268            start_index,
269            region_id,
270            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
271        );
272        if let Some(start_index) = start_index {
273            Ok(manifests.split_off(start_index))
274        } else {
275            Ok(vec![])
276        }
277    }
278
279    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
280    ///
281    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
282    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
283    pub async fn fetch_manifests(
284        &self,
285        start_version: ManifestVersion,
286        end_version: ManifestVersion,
287    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
288        let manifests = self.scan(start_version, end_version).await?;
289
290        // TODO(weny): Make it configurable.
291        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
292
293        let tasks = manifests.iter().map(|(v, entry)| async {
294            // Safety: semaphore must exist.
295            let _permit = semaphore.acquire().await.unwrap();
296
297            let compress_type = file_compress_type(entry.name());
298            let bytes = self
299                .object_store
300                .read(entry.path())
301                .await
302                .context(OpenDalSnafu)?;
303            let data = compress_type
304                .decode(bytes)
305                .await
306                .context(DecompressObjectSnafu {
307                    compress_type,
308                    path: entry.path(),
309                })?;
310            Ok((*v, data))
311        });
312
313        try_join_all(tasks).await
314    }
315
316    /// Delete manifest files that version < end.
317    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
318    /// ### Return
319    /// The number of deleted files.
320    pub async fn delete_until(
321        &self,
322        end: ManifestVersion,
323        keep_last_checkpoint: bool,
324    ) -> Result<usize> {
325        // Stores (entry, is_checkpoint, version) in a Vec.
326        let entries: Vec<_> = self
327            .get_paths(|entry| {
328                let file_name = entry.name();
329                let is_checkpoint = is_checkpoint_file(file_name);
330                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
331                    let version = file_version(file_name);
332                    if version < end {
333                        return Some((entry, is_checkpoint, version));
334                    }
335                }
336                None
337            })
338            .await?;
339        let checkpoint_version = if keep_last_checkpoint {
340            // Note that the order of entries is unspecific.
341            entries
342                .iter()
343                .filter_map(
344                    |(_e, is_checkpoint, version)| {
345                        if *is_checkpoint {
346                            Some(version)
347                        } else {
348                            None
349                        }
350                    },
351                )
352                .max()
353        } else {
354            None
355        };
356        let del_entries: Vec<_> = entries
357            .iter()
358            .filter(|(_e, is_checkpoint, version)| {
359                if let Some(max_version) = checkpoint_version {
360                    if *is_checkpoint {
361                        // We need to keep the checkpoint file.
362                        version < max_version
363                    } else {
364                        // We can delete the log file with max_version as the checkpoint
365                        // file contains the log file's content.
366                        version <= max_version
367                    }
368                } else {
369                    true
370                }
371            })
372            .collect();
373        let paths = del_entries
374            .iter()
375            .map(|(e, _, _)| e.path().to_string())
376            .collect::<Vec<_>>();
377        let ret = paths.len();
378
379        debug!(
380            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
381            ret,
382            self.path,
383            end,
384            checkpoint_version,
385            paths,
386        );
387
388        self.object_store
389            .delete_iter(paths)
390            .await
391            .context(OpenDalSnafu)?;
392
393        // delete manifest sizes
394        for (_, is_checkpoint, version) in &del_entries {
395            if *is_checkpoint {
396                self.unset_file_size(&FileKey::Checkpoint(*version));
397            } else {
398                self.unset_file_size(&FileKey::Delta(*version));
399            }
400        }
401
402        Ok(ret)
403    }
404
405    /// Save the delta manifest file.
406    pub async fn save(
407        &mut self,
408        version: ManifestVersion,
409        bytes: &[u8],
410        is_staging: bool,
411    ) -> Result<()> {
412        let path = self.delta_file_path(version, is_staging);
413        debug!("Save log to manifest storage, version: {}", version);
414        let data = self
415            .compress_type
416            .encode(bytes)
417            .await
418            .context(CompressObjectSnafu {
419                compress_type: self.compress_type,
420                path: &path,
421            })?;
422        let delta_size = data.len();
423        self.object_store
424            .write(&path, data)
425            .await
426            .context(OpenDalSnafu)?;
427        self.set_delta_file_size(version, delta_size as u64);
428        Ok(())
429    }
430
431    /// Save the checkpoint manifest file.
432    pub(crate) async fn save_checkpoint(
433        &self,
434        version: ManifestVersion,
435        bytes: &[u8],
436    ) -> Result<()> {
437        let path = self.checkpoint_file_path(version);
438        let data = self
439            .compress_type
440            .encode(bytes)
441            .await
442            .context(CompressObjectSnafu {
443                compress_type: self.compress_type,
444                path: &path,
445            })?;
446        let checkpoint_size = data.len();
447        let checksum = checkpoint_checksum(bytes);
448        self.object_store
449            .write(&path, data)
450            .await
451            .context(OpenDalSnafu)?;
452        self.set_checkpoint_file_size(version, checkpoint_size as u64);
453
454        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
455        let last_checkpoint_path = self.last_checkpoint_path();
456
457        let checkpoint_metadata = CheckpointMetadata {
458            size: bytes.len(),
459            version,
460            checksum: Some(checksum),
461            extend_metadata: HashMap::new(),
462        };
463
464        debug!(
465            "Save checkpoint in path: {},  metadata: {:?}",
466            last_checkpoint_path, checkpoint_metadata
467        );
468
469        let bytes = checkpoint_metadata.encode()?;
470        self.object_store
471            .write(&last_checkpoint_path, bytes)
472            .await
473            .context(OpenDalSnafu)?;
474
475        Ok(())
476    }
477
478    async fn load_checkpoint(
479        &mut self,
480        metadata: CheckpointMetadata,
481    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
482        let version = metadata.version;
483        let path = self.checkpoint_file_path(version);
484        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
485        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
486        let checkpoint_data =
487            match self.object_store.read(&path).await {
488                Ok(checkpoint) => {
489                    let checkpoint_size = checkpoint.len();
490                    let decompress_data = self.compress_type.decode(checkpoint).await.context(
491                        DecompressObjectSnafu {
492                            compress_type: self.compress_type,
493                            path,
494                        },
495                    )?;
496                    verify_checksum(&decompress_data, metadata.checksum)?;
497                    // set the checkpoint size
498                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
499                    Ok(Some(decompress_data))
500                }
501                Err(e) => {
502                    if e.kind() == ErrorKind::NotFound {
503                        if self.compress_type != FALL_BACK_COMPRESS_TYPE {
504                            let fall_back_path = gen_path(
505                                &self.path,
506                                &checkpoint_file(version),
507                                FALL_BACK_COMPRESS_TYPE,
508                            );
509                            debug!(
510                                "Failed to load checkpoint from path: {}, fall back to path: {}",
511                                path, fall_back_path
512                            );
513                            match self.object_store.read(&fall_back_path).await {
514                                Ok(checkpoint) => {
515                                    let checkpoint_size = checkpoint.len();
516                                    let decompress_data = FALL_BACK_COMPRESS_TYPE
517                                        .decode(checkpoint)
518                                        .await
519                                        .context(DecompressObjectSnafu {
520                                            compress_type: FALL_BACK_COMPRESS_TYPE,
521                                            path,
522                                        })?;
523                                    verify_checksum(&decompress_data, metadata.checksum)?;
524                                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
525                                    Ok(Some(decompress_data))
526                                }
527                                Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
528                                Err(e) => Err(e).context(OpenDalSnafu),
529                            }
530                        } else {
531                            Ok(None)
532                        }
533                    } else {
534                        Err(e).context(OpenDalSnafu)
535                    }
536                }
537            }?;
538        Ok(checkpoint_data.map(|data| (version, data)))
539    }
540
541    /// Load the latest checkpoint.
542    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
543    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
544        let last_checkpoint_path = self.last_checkpoint_path();
545        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
546            Ok(data) => data,
547            Err(e) if e.kind() == ErrorKind::NotFound => {
548                return Ok(None);
549            }
550            Err(e) => {
551                return Err(e).context(OpenDalSnafu)?;
552            }
553        };
554
555        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
556
557        debug!(
558            "Load checkpoint in path: {}, metadata: {:?}",
559            last_checkpoint_path, checkpoint_metadata
560        );
561
562        self.load_checkpoint(checkpoint_metadata).await
563    }
564
565    #[cfg(test)]
566    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
567        self.object_store
568            .read(path)
569            .await
570            .context(OpenDalSnafu)
571            .map(|v| v.to_vec())
572    }
573
574    #[cfg(test)]
575    pub async fn write_last_checkpoint(
576        &mut self,
577        version: ManifestVersion,
578        bytes: &[u8],
579    ) -> Result<()> {
580        let path = self.checkpoint_file_path(version);
581        let data = self
582            .compress_type
583            .encode(bytes)
584            .await
585            .context(CompressObjectSnafu {
586                compress_type: self.compress_type,
587                path: &path,
588            })?;
589
590        let checkpoint_size = data.len();
591
592        self.object_store
593            .write(&path, data)
594            .await
595            .context(OpenDalSnafu)?;
596
597        self.set_checkpoint_file_size(version, checkpoint_size as u64);
598
599        let last_checkpoint_path = self.last_checkpoint_path();
600        let checkpoint_metadata = CheckpointMetadata {
601            size: bytes.len(),
602            version,
603            checksum: Some(1218259706),
604            extend_metadata: HashMap::new(),
605        };
606
607        debug!(
608            "Rewrite checkpoint in path: {},  metadata: {:?}",
609            last_checkpoint_path, checkpoint_metadata
610        );
611
612        let bytes = checkpoint_metadata.encode()?;
613
614        // Overwrite the last checkpoint with the modified content
615        self.object_store
616            .write(&last_checkpoint_path, bytes.clone())
617            .await
618            .context(OpenDalSnafu)?;
619        Ok(())
620    }
621
622    /// Compute the size(Byte) in manifest size map.
623    pub(crate) fn total_manifest_size(&self) -> u64 {
624        self.manifest_size_map.read().unwrap().values().sum()
625    }
626
627    /// Resets the size of all files.
628    pub(crate) fn reset_manifest_size(&mut self) {
629        self.manifest_size_map.write().unwrap().clear();
630        self.total_manifest_size.store(0, Ordering::Relaxed);
631    }
632
633    /// Set the size of the delta file by delta version.
634    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
635        let mut m = self.manifest_size_map.write().unwrap();
636        m.insert(FileKey::Delta(version), size);
637
638        self.inc_total_manifest_size(size);
639    }
640
641    /// Set the size of the checkpoint file by checkpoint version.
642    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
643        let mut m = self.manifest_size_map.write().unwrap();
644        m.insert(FileKey::Checkpoint(version), size);
645
646        self.inc_total_manifest_size(size);
647    }
648
649    fn unset_file_size(&self, key: &FileKey) {
650        let mut m = self.manifest_size_map.write().unwrap();
651        if let Some(val) = m.remove(key) {
652            debug!("Unset file size: {:?}, size: {}", key, val);
653            self.dec_total_manifest_size(val);
654        }
655    }
656
657    fn inc_total_manifest_size(&self, val: u64) {
658        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
659    }
660
661    fn dec_total_manifest_size(&self, val: u64) {
662        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
663    }
664}
665
666#[derive(Serialize, Deserialize, Debug)]
667pub(crate) struct CheckpointMetadata {
668    pub size: usize,
669    /// The latest version this checkpoint contains.
670    pub version: ManifestVersion,
671    pub checksum: Option<u32>,
672    pub extend_metadata: HashMap<String, String>,
673}
674
675impl CheckpointMetadata {
676    fn encode(&self) -> Result<Vec<u8>> {
677        Ok(serde_json::to_string(self)
678            .context(SerdeJsonSnafu)?
679            .into_bytes())
680    }
681
682    fn decode(bs: &[u8]) -> Result<Self> {
683        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
684
685        serde_json::from_str(data).context(SerdeJsonSnafu)
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use common_test_util::temp_dir::create_temp_dir;
692    use object_store::services::Fs;
693    use object_store::ObjectStore;
694
695    use super::*;
696
697    fn new_test_manifest_store() -> ManifestObjectStore {
698        common_telemetry::init_default_ut_logging();
699        let tmp_dir = create_temp_dir("test_manifest_log_store");
700        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
701        let object_store = ObjectStore::new(builder).unwrap().finish();
702        ManifestObjectStore::new(
703            "/",
704            object_store,
705            CompressionType::Uncompressed,
706            Default::default(),
707        )
708    }
709
710    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
711        CheckpointMetadata {
712            size: 0,
713            version,
714            checksum: None,
715            extend_metadata: Default::default(),
716        }
717    }
718
719    #[test]
720    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
721    fn test_compress_file_path_generation() {
722        let path = "/foo/bar/";
723        let version: ManifestVersion = 0;
724        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
725        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
726    }
727
728    #[tokio::test]
729    async fn test_manifest_log_store_uncompress() {
730        let mut log_store = new_test_manifest_store();
731        log_store.compress_type = CompressionType::Uncompressed;
732        test_manifest_log_store_case(log_store).await;
733    }
734
735    #[tokio::test]
736    async fn test_manifest_log_store_compress() {
737        let mut log_store = new_test_manifest_store();
738        log_store.compress_type = CompressionType::Gzip;
739        test_manifest_log_store_case(log_store).await;
740    }
741
742    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
743        for v in 0..5 {
744            log_store
745                .save(v, format!("hello, {v}").as_bytes(), false)
746                .await
747                .unwrap();
748        }
749
750        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
751        let mut it = manifests.into_iter();
752        for v in 1..4 {
753            let (version, bytes) = it.next().unwrap();
754            assert_eq!(v, version);
755            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
756        }
757        assert!(it.next().is_none());
758
759        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
760        let mut it = manifests.into_iter();
761        for v in 0..5 {
762            let (version, bytes) = it.next().unwrap();
763            assert_eq!(v, version);
764            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
765        }
766        assert!(it.next().is_none());
767
768        // test checkpoint
769        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
770        log_store
771            .save_checkpoint(3, "checkpoint".as_bytes())
772            .await
773            .unwrap();
774
775        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
776        assert_eq!(checkpoint, "checkpoint".as_bytes());
777        assert_eq!(3, v);
778
779        //delete (,4) logs and keep checkpoint 3.
780        let _ = log_store.delete_until(4, true).await.unwrap();
781        let _ = log_store
782            .load_checkpoint(new_checkpoint_metadata_with_version(3))
783            .await
784            .unwrap()
785            .unwrap();
786        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
787        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
788        let mut it = manifests.into_iter();
789
790        let (version, bytes) = it.next().unwrap();
791        assert_eq!(4, version);
792        assert_eq!("hello, 4".as_bytes(), bytes);
793        assert!(it.next().is_none());
794
795        // delete all logs and checkpoints
796        let _ = log_store.delete_until(11, false).await.unwrap();
797        assert!(log_store
798            .load_checkpoint(new_checkpoint_metadata_with_version(3))
799            .await
800            .unwrap()
801            .is_none());
802        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
803        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
804        let mut it = manifests.into_iter();
805
806        assert!(it.next().is_none());
807    }
808
809    #[tokio::test]
810    // test ManifestObjectStore can read/delete previously uncompressed data correctly
811    async fn test_compress_backward_compatible() {
812        let mut log_store = new_test_manifest_store();
813
814        // write uncompress data to stimulate previously uncompressed data
815        log_store.compress_type = CompressionType::Uncompressed;
816        for v in 0..5 {
817            log_store
818                .save(v, format!("hello, {v}").as_bytes(), false)
819                .await
820                .unwrap();
821        }
822        log_store
823            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
824            .await
825            .unwrap();
826
827        // change compress type
828        log_store.compress_type = CompressionType::Gzip;
829
830        // test load_last_checkpoint work correctly for previously uncompressed data
831        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
832        assert_eq!(v, 5);
833        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
834
835        // write compressed data to stimulate compress algorithm take effect
836        for v in 5..10 {
837            log_store
838                .save(v, format!("hello, {v}").as_bytes(), false)
839                .await
840                .unwrap();
841        }
842        log_store
843            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
844            .await
845            .unwrap();
846
847        // test data reading
848        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
849        let mut it = manifests.into_iter();
850
851        for v in 0..10 {
852            let (version, bytes) = it.next().unwrap();
853            assert_eq!(v, version);
854            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
855        }
856        let (v, checkpoint) = log_store
857            .load_checkpoint(new_checkpoint_metadata_with_version(5))
858            .await
859            .unwrap()
860            .unwrap();
861        assert_eq!(v, 5);
862        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
863        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
864        assert_eq!(v, 10);
865        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
866
867        // Delete util 10, contain uncompressed/compressed data
868        // log 0, 1, 2, 7, 8, 9 will be delete
869        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
870        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
871        let mut it = manifests.into_iter();
872        assert!(it.next().is_none());
873    }
874
875    #[tokio::test]
876    async fn test_file_version() {
877        let version = file_version("00000000000000000007.checkpoint");
878        assert_eq!(version, 7);
879
880        let name = delta_file(version);
881        assert_eq!(name, "00000000000000000007.json");
882
883        let name = checkpoint_file(version);
884        assert_eq!(name, "00000000000000000007.checkpoint");
885    }
886
887    #[tokio::test]
888    async fn test_uncompressed_manifest_files_size() {
889        let mut log_store = new_test_manifest_store();
890        // write 5 manifest files with uncompressed(8B per file)
891        log_store.compress_type = CompressionType::Uncompressed;
892        for v in 0..5 {
893            log_store
894                .save(v, format!("hello, {v}").as_bytes(), false)
895                .await
896                .unwrap();
897        }
898        // write 1 checkpoint file with uncompressed(23B)
899        log_store
900            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
901            .await
902            .unwrap();
903
904        // manifest files size
905        assert_eq!(log_store.total_manifest_size(), 63);
906
907        // delete 3 manifest files
908        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
909
910        // manifest files size after delete
911        assert_eq!(log_store.total_manifest_size(), 39);
912
913        // delete all manifest files
914        assert_eq!(
915            log_store
916                .delete_until(ManifestVersion::MAX, false)
917                .await
918                .unwrap(),
919            3
920        );
921
922        assert_eq!(log_store.total_manifest_size(), 0);
923    }
924
925    #[tokio::test]
926    async fn test_compressed_manifest_files_size() {
927        let mut log_store = new_test_manifest_store();
928        // Test with compressed manifest files
929        log_store.compress_type = CompressionType::Gzip;
930        // write 5 manifest files
931        for v in 0..5 {
932            log_store
933                .save(v, format!("hello, {v}").as_bytes(), false)
934                .await
935                .unwrap();
936        }
937        log_store
938            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
939            .await
940            .unwrap();
941
942        // manifest files size
943        assert_eq!(log_store.total_manifest_size(), 181);
944
945        // delete 3 manifest files
946        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
947
948        // manifest files size after delete
949        assert_eq!(log_store.total_manifest_size(), 97);
950
951        // delete all manifest files
952        assert_eq!(
953            log_store
954                .delete_until(ManifestVersion::MAX, false)
955                .await
956                .unwrap(),
957            3
958        );
959
960        assert_eq!(log_store.total_manifest_size(), 0);
961    }
962}