mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::future::try_join_all;
25use futures::TryStreamExt;
26use lazy_static::lazy_static;
27use object_store::{util, Entry, ErrorKind, Lister, ObjectStore};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::manifest::ManifestVersion;
32use store_api::storage::RegionId;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
48/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
49const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52/// Returns the [CompressionType] according to whether to compress manifest files.
53pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54    if compress {
55        DEFAULT_MANIFEST_COMPRESSION_TYPE
56    } else {
57        FALL_BACK_COMPRESS_TYPE
58    }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62    format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66    format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70    if compress_type == CompressionType::Uncompressed {
71        format!("{}{}", path, file)
72    } else {
73        format!("{}{}.{}", path, file, compress_type.file_extension())
74    }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78    let mut hasher = Hasher::new();
79    hasher.update(data);
80    hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84    if let Some(checksum) = wanted {
85        let calculated_checksum = checkpoint_checksum(data);
86        ensure!(
87            checksum == calculated_checksum,
88            ChecksumMismatchSnafu {
89                actual: calculated_checksum,
90                expected: checksum,
91            }
92        );
93    }
94    Ok(())
95}
96
97/// Return's the file manifest version from path
98///
99/// # Panics
100/// If the file path is not a valid delta or checkpoint file.
101pub fn file_version(path: &str) -> ManifestVersion {
102    let s = path.split('.').next().unwrap();
103    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106/// Return's the file compress algorithm by file extension.
107///
108/// for example file
109/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
110pub fn file_compress_type(path: &str) -> CompressionType {
111    let s = path.rsplit('.').next().unwrap_or("");
112    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116    DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120    CHECKPOINT_RE.is_match(file_name)
121}
122
123/// Key to identify a manifest file.
124#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126    /// A delta file (`.json`).
127    Delta(ManifestVersion),
128    /// A checkpoint file (`.checkpoint`).
129    Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134    object_store: ObjectStore,
135    compress_type: CompressionType,
136    path: String,
137    /// Stores the size of each manifest file.
138    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
139    total_manifest_size: Arc<AtomicU64>,
140}
141
142impl ManifestObjectStore {
143    pub fn new(
144        path: &str,
145        object_store: ObjectStore,
146        compress_type: CompressionType,
147        total_manifest_size: Arc<AtomicU64>,
148    ) -> Self {
149        Self {
150            object_store,
151            compress_type,
152            path: util::normalize_dir(path),
153            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
154            total_manifest_size,
155        }
156    }
157
158    /// Returns the delta file path under the **current** compression algorithm
159    fn delta_file_path(&self, version: ManifestVersion) -> String {
160        gen_path(&self.path, &delta_file(version), self.compress_type)
161    }
162
163    /// Returns the checkpoint file path under the **current** compression algorithm
164    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
165        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
166    }
167
168    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
169    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
170    pub(crate) fn last_checkpoint_path(&self) -> String {
171        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
172    }
173
174    /// Returns the manifest dir
175    pub(crate) fn manifest_dir(&self) -> &str {
176        &self.path
177    }
178
179    /// Returns a iterator of manifests.
180    pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
181        match self.object_store.lister_with(&self.path).await {
182            Ok(streamer) => Ok(Some(streamer)),
183            Err(e) if e.kind() == ErrorKind::NotFound => {
184                debug!("Manifest directory does not exists: {}", self.path);
185                Ok(None)
186            }
187            Err(e) => Err(e).context(OpenDalSnafu)?,
188        }
189    }
190
191    /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
192    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
193    /// Return an empty vector when directory is not found.
194    pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
195    where
196        F: Fn(Entry) -> Option<R>,
197    {
198        let Some(streamer) = self.manifest_lister().await? else {
199            return Ok(vec![]);
200        };
201
202        streamer
203            .try_filter_map(|e| async { Ok(filter(e)) })
204            .try_collect::<Vec<_>>()
205            .await
206            .context(OpenDalSnafu)
207    }
208
209    /// Sorts the manifest files.
210    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
211        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
212    }
213
214    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
215    pub async fn scan(
216        &self,
217        start: ManifestVersion,
218        end: ManifestVersion,
219    ) -> Result<Vec<(ManifestVersion, Entry)>> {
220        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
221
222        let mut entries: Vec<(ManifestVersion, Entry)> = self
223            .get_paths(|entry| {
224                let file_name = entry.name();
225                if is_delta_file(file_name) {
226                    let version = file_version(file_name);
227                    if start <= version && version < end {
228                        return Some((version, entry));
229                    }
230                }
231                None
232            })
233            .await?;
234
235        Self::sort_manifests(&mut entries);
236
237        Ok(entries)
238    }
239
240    /// Fetches manifests in range [start_version, end_version).
241    ///
242    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
243    pub async fn fetch_manifests_strict_from(
244        &self,
245        start_version: ManifestVersion,
246        end_version: ManifestVersion,
247        region_id: RegionId,
248    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
249        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
250        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
251        debug!(
252            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
253            start_version,
254            end_version,
255            start_index,
256            region_id,
257            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
258        );
259        if let Some(start_index) = start_index {
260            Ok(manifests.split_off(start_index))
261        } else {
262            Ok(vec![])
263        }
264    }
265
266    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
267    ///
268    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
269    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
270    pub async fn fetch_manifests(
271        &self,
272        start_version: ManifestVersion,
273        end_version: ManifestVersion,
274    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
275        let manifests = self.scan(start_version, end_version).await?;
276
277        // TODO(weny): Make it configurable.
278        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
279
280        let tasks = manifests.iter().map(|(v, entry)| async {
281            // Safety: semaphore must exist.
282            let _permit = semaphore.acquire().await.unwrap();
283
284            let compress_type = file_compress_type(entry.name());
285            let bytes = self
286                .object_store
287                .read(entry.path())
288                .await
289                .context(OpenDalSnafu)?;
290            let data = compress_type
291                .decode(bytes)
292                .await
293                .context(DecompressObjectSnafu {
294                    compress_type,
295                    path: entry.path(),
296                })?;
297            Ok((*v, data))
298        });
299
300        try_join_all(tasks).await
301    }
302
303    /// Delete manifest files that version < end.
304    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
305    /// ### Return
306    /// The number of deleted files.
307    pub async fn delete_until(
308        &self,
309        end: ManifestVersion,
310        keep_last_checkpoint: bool,
311    ) -> Result<usize> {
312        // Stores (entry, is_checkpoint, version) in a Vec.
313        let entries: Vec<_> = self
314            .get_paths(|entry| {
315                let file_name = entry.name();
316                let is_checkpoint = is_checkpoint_file(file_name);
317                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
318                    let version = file_version(file_name);
319                    if version < end {
320                        return Some((entry, is_checkpoint, version));
321                    }
322                }
323                None
324            })
325            .await?;
326        let checkpoint_version = if keep_last_checkpoint {
327            // Note that the order of entries is unspecific.
328            entries
329                .iter()
330                .filter_map(
331                    |(_e, is_checkpoint, version)| {
332                        if *is_checkpoint {
333                            Some(version)
334                        } else {
335                            None
336                        }
337                    },
338                )
339                .max()
340        } else {
341            None
342        };
343        let del_entries: Vec<_> = entries
344            .iter()
345            .filter(|(_e, is_checkpoint, version)| {
346                if let Some(max_version) = checkpoint_version {
347                    if *is_checkpoint {
348                        // We need to keep the checkpoint file.
349                        version < max_version
350                    } else {
351                        // We can delete the log file with max_version as the checkpoint
352                        // file contains the log file's content.
353                        version <= max_version
354                    }
355                } else {
356                    true
357                }
358            })
359            .collect();
360        let paths = del_entries
361            .iter()
362            .map(|(e, _, _)| e.path().to_string())
363            .collect::<Vec<_>>();
364        let ret = paths.len();
365
366        debug!(
367            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
368            ret,
369            self.path,
370            end,
371            checkpoint_version,
372            paths,
373        );
374
375        self.object_store
376            .delete_iter(paths)
377            .await
378            .context(OpenDalSnafu)?;
379
380        // delete manifest sizes
381        for (_, is_checkpoint, version) in &del_entries {
382            if *is_checkpoint {
383                self.unset_file_size(&FileKey::Checkpoint(*version));
384            } else {
385                self.unset_file_size(&FileKey::Delta(*version));
386            }
387        }
388
389        Ok(ret)
390    }
391
392    /// Save the delta manifest file.
393    pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
394        let path = self.delta_file_path(version);
395        debug!("Save log to manifest storage, version: {}", version);
396        let data = self
397            .compress_type
398            .encode(bytes)
399            .await
400            .context(CompressObjectSnafu {
401                compress_type: self.compress_type,
402                path: &path,
403            })?;
404        let delta_size = data.len();
405        self.object_store
406            .write(&path, data)
407            .await
408            .context(OpenDalSnafu)?;
409        self.set_delta_file_size(version, delta_size as u64);
410        Ok(())
411    }
412
413    /// Save the checkpoint manifest file.
414    pub(crate) async fn save_checkpoint(
415        &self,
416        version: ManifestVersion,
417        bytes: &[u8],
418    ) -> Result<()> {
419        let path = self.checkpoint_file_path(version);
420        let data = self
421            .compress_type
422            .encode(bytes)
423            .await
424            .context(CompressObjectSnafu {
425                compress_type: self.compress_type,
426                path: &path,
427            })?;
428        let checkpoint_size = data.len();
429        let checksum = checkpoint_checksum(bytes);
430        self.object_store
431            .write(&path, data)
432            .await
433            .context(OpenDalSnafu)?;
434        self.set_checkpoint_file_size(version, checkpoint_size as u64);
435
436        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
437        let last_checkpoint_path = self.last_checkpoint_path();
438
439        let checkpoint_metadata = CheckpointMetadata {
440            size: bytes.len(),
441            version,
442            checksum: Some(checksum),
443            extend_metadata: HashMap::new(),
444        };
445
446        debug!(
447            "Save checkpoint in path: {},  metadata: {:?}",
448            last_checkpoint_path, checkpoint_metadata
449        );
450
451        let bytes = checkpoint_metadata.encode()?;
452        self.object_store
453            .write(&last_checkpoint_path, bytes)
454            .await
455            .context(OpenDalSnafu)?;
456
457        Ok(())
458    }
459
460    async fn load_checkpoint(
461        &mut self,
462        metadata: CheckpointMetadata,
463    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
464        let version = metadata.version;
465        let path = self.checkpoint_file_path(version);
466        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
467        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
468        let checkpoint_data =
469            match self.object_store.read(&path).await {
470                Ok(checkpoint) => {
471                    let checkpoint_size = checkpoint.len();
472                    let decompress_data = self.compress_type.decode(checkpoint).await.context(
473                        DecompressObjectSnafu {
474                            compress_type: self.compress_type,
475                            path,
476                        },
477                    )?;
478                    verify_checksum(&decompress_data, metadata.checksum)?;
479                    // set the checkpoint size
480                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
481                    Ok(Some(decompress_data))
482                }
483                Err(e) => {
484                    if e.kind() == ErrorKind::NotFound {
485                        if self.compress_type != FALL_BACK_COMPRESS_TYPE {
486                            let fall_back_path = gen_path(
487                                &self.path,
488                                &checkpoint_file(version),
489                                FALL_BACK_COMPRESS_TYPE,
490                            );
491                            debug!(
492                                "Failed to load checkpoint from path: {}, fall back to path: {}",
493                                path, fall_back_path
494                            );
495                            match self.object_store.read(&fall_back_path).await {
496                                Ok(checkpoint) => {
497                                    let checkpoint_size = checkpoint.len();
498                                    let decompress_data = FALL_BACK_COMPRESS_TYPE
499                                        .decode(checkpoint)
500                                        .await
501                                        .context(DecompressObjectSnafu {
502                                            compress_type: FALL_BACK_COMPRESS_TYPE,
503                                            path,
504                                        })?;
505                                    verify_checksum(&decompress_data, metadata.checksum)?;
506                                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
507                                    Ok(Some(decompress_data))
508                                }
509                                Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
510                                Err(e) => Err(e).context(OpenDalSnafu),
511                            }
512                        } else {
513                            Ok(None)
514                        }
515                    } else {
516                        Err(e).context(OpenDalSnafu)
517                    }
518                }
519            }?;
520        Ok(checkpoint_data.map(|data| (version, data)))
521    }
522
523    /// Load the latest checkpoint.
524    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
525    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
526        let last_checkpoint_path = self.last_checkpoint_path();
527        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
528            Ok(data) => data,
529            Err(e) if e.kind() == ErrorKind::NotFound => {
530                return Ok(None);
531            }
532            Err(e) => {
533                return Err(e).context(OpenDalSnafu)?;
534            }
535        };
536
537        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
538
539        debug!(
540            "Load checkpoint in path: {}, metadata: {:?}",
541            last_checkpoint_path, checkpoint_metadata
542        );
543
544        self.load_checkpoint(checkpoint_metadata).await
545    }
546
547    #[cfg(test)]
548    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
549        self.object_store
550            .read(path)
551            .await
552            .context(OpenDalSnafu)
553            .map(|v| v.to_vec())
554    }
555
556    #[cfg(test)]
557    pub async fn write_last_checkpoint(
558        &mut self,
559        version: ManifestVersion,
560        bytes: &[u8],
561    ) -> Result<()> {
562        let path = self.checkpoint_file_path(version);
563        let data = self
564            .compress_type
565            .encode(bytes)
566            .await
567            .context(CompressObjectSnafu {
568                compress_type: self.compress_type,
569                path: &path,
570            })?;
571
572        let checkpoint_size = data.len();
573
574        self.object_store
575            .write(&path, data)
576            .await
577            .context(OpenDalSnafu)?;
578
579        self.set_checkpoint_file_size(version, checkpoint_size as u64);
580
581        let last_checkpoint_path = self.last_checkpoint_path();
582        let checkpoint_metadata = CheckpointMetadata {
583            size: bytes.len(),
584            version,
585            checksum: Some(1218259706),
586            extend_metadata: HashMap::new(),
587        };
588
589        debug!(
590            "Rewrite checkpoint in path: {},  metadata: {:?}",
591            last_checkpoint_path, checkpoint_metadata
592        );
593
594        let bytes = checkpoint_metadata.encode()?;
595
596        // Overwrite the last checkpoint with the modified content
597        self.object_store
598            .write(&last_checkpoint_path, bytes.clone())
599            .await
600            .context(OpenDalSnafu)?;
601        Ok(())
602    }
603
604    /// Compute the size(Byte) in manifest size map.
605    pub(crate) fn total_manifest_size(&self) -> u64 {
606        self.manifest_size_map.read().unwrap().values().sum()
607    }
608
609    /// Resets the size of all files.
610    pub(crate) fn reset_manifest_size(&mut self) {
611        self.manifest_size_map.write().unwrap().clear();
612        self.total_manifest_size.store(0, Ordering::Relaxed);
613    }
614
615    /// Set the size of the delta file by delta version.
616    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
617        let mut m = self.manifest_size_map.write().unwrap();
618        m.insert(FileKey::Delta(version), size);
619
620        self.inc_total_manifest_size(size);
621    }
622
623    /// Set the size of the checkpoint file by checkpoint version.
624    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
625        let mut m = self.manifest_size_map.write().unwrap();
626        m.insert(FileKey::Checkpoint(version), size);
627
628        self.inc_total_manifest_size(size);
629    }
630
631    fn unset_file_size(&self, key: &FileKey) {
632        let mut m = self.manifest_size_map.write().unwrap();
633        if let Some(val) = m.remove(key) {
634            debug!("Unset file size: {:?}, size: {}", key, val);
635            self.dec_total_manifest_size(val);
636        }
637    }
638
639    fn inc_total_manifest_size(&self, val: u64) {
640        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
641    }
642
643    fn dec_total_manifest_size(&self, val: u64) {
644        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
645    }
646}
647
648#[derive(Serialize, Deserialize, Debug)]
649struct CheckpointMetadata {
650    pub size: usize,
651    /// The latest version this checkpoint contains.
652    pub version: ManifestVersion,
653    pub checksum: Option<u32>,
654    pub extend_metadata: HashMap<String, String>,
655}
656
657impl CheckpointMetadata {
658    fn encode(&self) -> Result<Vec<u8>> {
659        Ok(serde_json::to_string(self)
660            .context(SerdeJsonSnafu)?
661            .into_bytes())
662    }
663
664    fn decode(bs: &[u8]) -> Result<Self> {
665        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
666
667        serde_json::from_str(data).context(SerdeJsonSnafu)
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use common_test_util::temp_dir::create_temp_dir;
674    use object_store::services::Fs;
675    use object_store::ObjectStore;
676
677    use super::*;
678
679    fn new_test_manifest_store() -> ManifestObjectStore {
680        common_telemetry::init_default_ut_logging();
681        let tmp_dir = create_temp_dir("test_manifest_log_store");
682        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
683        let object_store = ObjectStore::new(builder).unwrap().finish();
684        ManifestObjectStore::new(
685            "/",
686            object_store,
687            CompressionType::Uncompressed,
688            Default::default(),
689        )
690    }
691
692    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
693        CheckpointMetadata {
694            size: 0,
695            version,
696            checksum: None,
697            extend_metadata: Default::default(),
698        }
699    }
700
701    #[test]
702    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
703    fn test_compress_file_path_generation() {
704        let path = "/foo/bar/";
705        let version: ManifestVersion = 0;
706        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
707        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
708    }
709
710    #[tokio::test]
711    async fn test_manifest_log_store_uncompress() {
712        let mut log_store = new_test_manifest_store();
713        log_store.compress_type = CompressionType::Uncompressed;
714        test_manifest_log_store_case(log_store).await;
715    }
716
717    #[tokio::test]
718    async fn test_manifest_log_store_compress() {
719        let mut log_store = new_test_manifest_store();
720        log_store.compress_type = CompressionType::Gzip;
721        test_manifest_log_store_case(log_store).await;
722    }
723
724    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
725        for v in 0..5 {
726            log_store
727                .save(v, format!("hello, {v}").as_bytes())
728                .await
729                .unwrap();
730        }
731
732        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
733        let mut it = manifests.into_iter();
734        for v in 1..4 {
735            let (version, bytes) = it.next().unwrap();
736            assert_eq!(v, version);
737            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
738        }
739        assert!(it.next().is_none());
740
741        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
742        let mut it = manifests.into_iter();
743        for v in 0..5 {
744            let (version, bytes) = it.next().unwrap();
745            assert_eq!(v, version);
746            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
747        }
748        assert!(it.next().is_none());
749
750        // test checkpoint
751        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
752        log_store
753            .save_checkpoint(3, "checkpoint".as_bytes())
754            .await
755            .unwrap();
756
757        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
758        assert_eq!(checkpoint, "checkpoint".as_bytes());
759        assert_eq!(3, v);
760
761        //delete (,4) logs and keep checkpoint 3.
762        let _ = log_store.delete_until(4, true).await.unwrap();
763        let _ = log_store
764            .load_checkpoint(new_checkpoint_metadata_with_version(3))
765            .await
766            .unwrap()
767            .unwrap();
768        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
769        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
770        let mut it = manifests.into_iter();
771
772        let (version, bytes) = it.next().unwrap();
773        assert_eq!(4, version);
774        assert_eq!("hello, 4".as_bytes(), bytes);
775        assert!(it.next().is_none());
776
777        // delete all logs and checkpoints
778        let _ = log_store.delete_until(11, false).await.unwrap();
779        assert!(log_store
780            .load_checkpoint(new_checkpoint_metadata_with_version(3))
781            .await
782            .unwrap()
783            .is_none());
784        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
785        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
786        let mut it = manifests.into_iter();
787
788        assert!(it.next().is_none());
789    }
790
791    #[tokio::test]
792    // test ManifestObjectStore can read/delete previously uncompressed data correctly
793    async fn test_compress_backward_compatible() {
794        let mut log_store = new_test_manifest_store();
795
796        // write uncompress data to stimulate previously uncompressed data
797        log_store.compress_type = CompressionType::Uncompressed;
798        for v in 0..5 {
799            log_store
800                .save(v, format!("hello, {v}").as_bytes())
801                .await
802                .unwrap();
803        }
804        log_store
805            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
806            .await
807            .unwrap();
808
809        // change compress type
810        log_store.compress_type = CompressionType::Gzip;
811
812        // test load_last_checkpoint work correctly for previously uncompressed data
813        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
814        assert_eq!(v, 5);
815        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
816
817        // write compressed data to stimulate compress algorithm take effect
818        for v in 5..10 {
819            log_store
820                .save(v, format!("hello, {v}").as_bytes())
821                .await
822                .unwrap();
823        }
824        log_store
825            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
826            .await
827            .unwrap();
828
829        // test data reading
830        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
831        let mut it = manifests.into_iter();
832
833        for v in 0..10 {
834            let (version, bytes) = it.next().unwrap();
835            assert_eq!(v, version);
836            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
837        }
838        let (v, checkpoint) = log_store
839            .load_checkpoint(new_checkpoint_metadata_with_version(5))
840            .await
841            .unwrap()
842            .unwrap();
843        assert_eq!(v, 5);
844        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
845        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
846        assert_eq!(v, 10);
847        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
848
849        // Delete util 10, contain uncompressed/compressed data
850        // log 0, 1, 2, 7, 8, 9 will be delete
851        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
852        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
853        let mut it = manifests.into_iter();
854        assert!(it.next().is_none());
855    }
856
857    #[tokio::test]
858    async fn test_file_version() {
859        let version = file_version("00000000000000000007.checkpoint");
860        assert_eq!(version, 7);
861
862        let name = delta_file(version);
863        assert_eq!(name, "00000000000000000007.json");
864
865        let name = checkpoint_file(version);
866        assert_eq!(name, "00000000000000000007.checkpoint");
867    }
868
869    #[tokio::test]
870    async fn test_uncompressed_manifest_files_size() {
871        let mut log_store = new_test_manifest_store();
872        // write 5 manifest files with uncompressed(8B per file)
873        log_store.compress_type = CompressionType::Uncompressed;
874        for v in 0..5 {
875            log_store
876                .save(v, format!("hello, {v}").as_bytes())
877                .await
878                .unwrap();
879        }
880        // write 1 checkpoint file with uncompressed(23B)
881        log_store
882            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
883            .await
884            .unwrap();
885
886        // manifest files size
887        assert_eq!(log_store.total_manifest_size(), 63);
888
889        // delete 3 manifest files
890        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
891
892        // manifest files size after delete
893        assert_eq!(log_store.total_manifest_size(), 39);
894
895        // delete all manifest files
896        assert_eq!(
897            log_store
898                .delete_until(ManifestVersion::MAX, false)
899                .await
900                .unwrap(),
901            3
902        );
903
904        assert_eq!(log_store.total_manifest_size(), 0);
905    }
906
907    #[tokio::test]
908    async fn test_compressed_manifest_files_size() {
909        let mut log_store = new_test_manifest_store();
910        // Test with compressed manifest files
911        log_store.compress_type = CompressionType::Gzip;
912        // write 5 manifest files
913        for v in 0..5 {
914            log_store
915                .save(v, format!("hello, {v}").as_bytes())
916                .await
917                .unwrap();
918        }
919        log_store
920            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
921            .await
922            .unwrap();
923
924        // manifest files size
925        assert_eq!(log_store.total_manifest_size(), 181);
926
927        // delete 3 manifest files
928        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
929
930        // manifest files size after delete
931        assert_eq!(log_store.total_manifest_size(), 97);
932
933        // delete all manifest files
934        assert_eq!(
935            log_store
936                .delete_until(ManifestVersion::MAX, false)
937                .await
938                .unwrap(),
939            3
940        );
941
942        assert_eq!(log_store.total_manifest_size(), 0);
943    }
944}