mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ResultExt, ensure};
31use store_api::ManifestVersion;
32use store_api::storage::RegionId;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
48/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
49const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52/// Returns the [CompressionType] according to whether to compress manifest files.
53pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54    if compress {
55        DEFAULT_MANIFEST_COMPRESSION_TYPE
56    } else {
57        FALL_BACK_COMPRESS_TYPE
58    }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62    format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66    format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70    if compress_type == CompressionType::Uncompressed {
71        format!("{}{}", path, file)
72    } else {
73        format!("{}{}.{}", path, file, compress_type.file_extension())
74    }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78    let mut hasher = Hasher::new();
79    hasher.update(data);
80    hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84    if let Some(checksum) = wanted {
85        let calculated_checksum = checkpoint_checksum(data);
86        ensure!(
87            checksum == calculated_checksum,
88            ChecksumMismatchSnafu {
89                actual: calculated_checksum,
90                expected: checksum,
91            }
92        );
93    }
94    Ok(())
95}
96
97/// Return's the file manifest version from path
98///
99/// # Panics
100/// If the file path is not a valid delta or checkpoint file.
101pub fn file_version(path: &str) -> ManifestVersion {
102    let s = path.split('.').next().unwrap();
103    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106/// Return's the file compress algorithm by file extension.
107///
108/// for example file
109/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
110pub fn file_compress_type(path: &str) -> CompressionType {
111    let s = path.rsplit('.').next().unwrap_or("");
112    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116    DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120    CHECKPOINT_RE.is_match(file_name)
121}
122
123/// Key to identify a manifest file.
124#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126    /// A delta file (`.json`).
127    Delta(ManifestVersion),
128    /// A checkpoint file (`.checkpoint`).
129    Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134    object_store: ObjectStore,
135    compress_type: CompressionType,
136    path: String,
137    staging_path: String,
138    /// Stores the size of each manifest file.
139    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
140    total_manifest_size: Arc<AtomicU64>,
141}
142
143impl ManifestObjectStore {
144    pub fn new(
145        path: &str,
146        object_store: ObjectStore,
147        compress_type: CompressionType,
148        total_manifest_size: Arc<AtomicU64>,
149    ) -> Self {
150        let path = util::normalize_dir(path);
151        let staging_path = {
152            // Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
153            let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
154            util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
155        };
156        Self {
157            object_store,
158            compress_type,
159            path,
160            staging_path,
161            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
162            total_manifest_size,
163        }
164    }
165
166    /// Returns the delta file path under the **current** compression algorithm
167    fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
168        let base_path = if is_staging {
169            &self.staging_path
170        } else {
171            &self.path
172        };
173        gen_path(base_path, &delta_file(version), self.compress_type)
174    }
175
176    /// Returns the checkpoint file path under the **current** compression algorithm
177    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
178        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
179    }
180
181    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
182    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
183    pub(crate) fn last_checkpoint_path(&self) -> String {
184        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
185    }
186
187    /// Returns the manifest dir
188    pub(crate) fn manifest_dir(&self) -> &str {
189        &self.path
190    }
191
192    /// Returns an iterator of manifests from normal or staging directory.
193    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
194        let path = if is_staging {
195            &self.staging_path
196        } else {
197            &self.path
198        };
199        match self.object_store.lister_with(path).await {
200            Ok(streamer) => Ok(Some(streamer)),
201            Err(e) if e.kind() == ErrorKind::NotFound => {
202                debug!("Manifest directory does not exist: {}", path);
203                Ok(None)
204            }
205            Err(e) => Err(e).context(OpenDalSnafu)?,
206        }
207    }
208
209    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
210    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
211    /// Return an empty vector when directory is not found.
212    pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
213    where
214        F: Fn(Entry) -> Option<R>,
215    {
216        let Some(streamer) = self.manifest_lister(is_staging).await? else {
217            return Ok(vec![]);
218        };
219
220        streamer
221            .try_filter_map(|e| async { Ok(filter(e)) })
222            .try_collect::<Vec<_>>()
223            .await
224            .context(OpenDalSnafu)
225    }
226
227    /// Sorts the manifest files.
228    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
229        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
230    }
231
232    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
233    pub async fn scan(
234        &self,
235        start: ManifestVersion,
236        end: ManifestVersion,
237    ) -> Result<Vec<(ManifestVersion, Entry)>> {
238        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
239
240        let mut entries: Vec<(ManifestVersion, Entry)> = self
241            .get_paths(
242                |entry| {
243                    let file_name = entry.name();
244                    if is_delta_file(file_name) {
245                        let version = file_version(file_name);
246                        if start <= version && version < end {
247                            return Some((version, entry));
248                        }
249                    }
250                    None
251                },
252                false,
253            )
254            .await?;
255
256        Self::sort_manifests(&mut entries);
257
258        Ok(entries)
259    }
260
261    /// Fetches manifests in range [start_version, end_version).
262    ///
263    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
264    pub async fn fetch_manifests_strict_from(
265        &self,
266        start_version: ManifestVersion,
267        end_version: ManifestVersion,
268        region_id: RegionId,
269    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
270        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
271        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
272        debug!(
273            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
274            start_version,
275            end_version,
276            start_index,
277            region_id,
278            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
279        );
280        if let Some(start_index) = start_index {
281            Ok(manifests.split_off(start_index))
282        } else {
283            Ok(vec![])
284        }
285    }
286
287    /// Common implementation for fetching manifests from entries in parallel.
288    async fn fetch_manifests_from_entries(
289        &self,
290        entries: Vec<(ManifestVersion, Entry)>,
291    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
292        if entries.is_empty() {
293            return Ok(vec![]);
294        }
295
296        // TODO(weny): Make it configurable.
297        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
298
299        let tasks = entries.iter().map(|(v, entry)| async {
300            // Safety: semaphore must exist.
301            let _permit = semaphore.acquire().await.unwrap();
302
303            let compress_type = file_compress_type(entry.name());
304            let bytes = self
305                .object_store
306                .read(entry.path())
307                .await
308                .context(OpenDalSnafu)?;
309            let data = compress_type
310                .decode(bytes)
311                .await
312                .context(DecompressObjectSnafu {
313                    compress_type,
314                    path: entry.path(),
315                })?;
316            Ok((*v, data))
317        });
318
319        try_join_all(tasks).await
320    }
321
322    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
323    ///
324    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
325    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
326    pub async fn fetch_manifests(
327        &self,
328        start_version: ManifestVersion,
329        end_version: ManifestVersion,
330    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
331        let manifests = self.scan(start_version, end_version).await?;
332        self.fetch_manifests_from_entries(manifests).await
333    }
334
335    /// Delete manifest files that version < end.
336    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
337    /// ### Return
338    /// The number of deleted files.
339    pub async fn delete_until(
340        &self,
341        end: ManifestVersion,
342        keep_last_checkpoint: bool,
343    ) -> Result<usize> {
344        // Stores (entry, is_checkpoint, version) in a Vec.
345        let entries: Vec<_> = self
346            .get_paths(
347                |entry| {
348                    let file_name = entry.name();
349                    let is_checkpoint = is_checkpoint_file(file_name);
350                    if is_delta_file(file_name) || is_checkpoint_file(file_name) {
351                        let version = file_version(file_name);
352                        if version < end {
353                            return Some((entry, is_checkpoint, version));
354                        }
355                    }
356                    None
357                },
358                false,
359            )
360            .await?;
361        let checkpoint_version = if keep_last_checkpoint {
362            // Note that the order of entries is unspecific.
363            entries
364                .iter()
365                .filter_map(
366                    |(_e, is_checkpoint, version)| {
367                        if *is_checkpoint { Some(version) } else { None }
368                    },
369                )
370                .max()
371        } else {
372            None
373        };
374        let del_entries: Vec<_> = entries
375            .iter()
376            .filter(|(_e, is_checkpoint, version)| {
377                if let Some(max_version) = checkpoint_version {
378                    if *is_checkpoint {
379                        // We need to keep the checkpoint file.
380                        version < max_version
381                    } else {
382                        // We can delete the log file with max_version as the checkpoint
383                        // file contains the log file's content.
384                        version <= max_version
385                    }
386                } else {
387                    true
388                }
389            })
390            .collect();
391        let paths = del_entries
392            .iter()
393            .map(|(e, _, _)| e.path().to_string())
394            .collect::<Vec<_>>();
395        let ret = paths.len();
396
397        debug!(
398            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
399            ret, self.path, end, checkpoint_version, paths,
400        );
401
402        self.object_store
403            .delete_iter(paths)
404            .await
405            .context(OpenDalSnafu)?;
406
407        // delete manifest sizes
408        for (_, is_checkpoint, version) in &del_entries {
409            if *is_checkpoint {
410                self.unset_file_size(&FileKey::Checkpoint(*version));
411            } else {
412                self.unset_file_size(&FileKey::Delta(*version));
413            }
414        }
415
416        Ok(ret)
417    }
418
419    /// Save the delta manifest file.
420    pub async fn save(
421        &mut self,
422        version: ManifestVersion,
423        bytes: &[u8],
424        is_staging: bool,
425    ) -> Result<()> {
426        let path = self.delta_file_path(version, is_staging);
427        debug!("Save log to manifest storage, version: {}", version);
428        let data = self
429            .compress_type
430            .encode(bytes)
431            .await
432            .context(CompressObjectSnafu {
433                compress_type: self.compress_type,
434                path: &path,
435            })?;
436        let delta_size = data.len();
437        self.object_store
438            .write(&path, data)
439            .await
440            .context(OpenDalSnafu)?;
441        self.set_delta_file_size(version, delta_size as u64);
442        Ok(())
443    }
444
445    /// Save the checkpoint manifest file.
446    pub(crate) async fn save_checkpoint(
447        &self,
448        version: ManifestVersion,
449        bytes: &[u8],
450    ) -> Result<()> {
451        let path = self.checkpoint_file_path(version);
452        let data = self
453            .compress_type
454            .encode(bytes)
455            .await
456            .context(CompressObjectSnafu {
457                compress_type: self.compress_type,
458                path: &path,
459            })?;
460        let checkpoint_size = data.len();
461        let checksum = checkpoint_checksum(bytes);
462        self.object_store
463            .write(&path, data)
464            .await
465            .context(OpenDalSnafu)?;
466        self.set_checkpoint_file_size(version, checkpoint_size as u64);
467
468        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
469        let last_checkpoint_path = self.last_checkpoint_path();
470
471        let checkpoint_metadata = CheckpointMetadata {
472            size: bytes.len(),
473            version,
474            checksum: Some(checksum),
475            extend_metadata: HashMap::new(),
476        };
477
478        debug!(
479            "Save checkpoint in path: {},  metadata: {:?}",
480            last_checkpoint_path, checkpoint_metadata
481        );
482
483        let bytes = checkpoint_metadata.encode()?;
484        self.object_store
485            .write(&last_checkpoint_path, bytes)
486            .await
487            .context(OpenDalSnafu)?;
488
489        Ok(())
490    }
491
492    async fn load_checkpoint(
493        &mut self,
494        metadata: CheckpointMetadata,
495    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
496        let version = metadata.version;
497        let path = self.checkpoint_file_path(version);
498        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
499        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
500        let checkpoint_data =
501            match self.object_store.read(&path).await {
502                Ok(checkpoint) => {
503                    let checkpoint_size = checkpoint.len();
504                    let decompress_data = self.compress_type.decode(checkpoint).await.context(
505                        DecompressObjectSnafu {
506                            compress_type: self.compress_type,
507                            path,
508                        },
509                    )?;
510                    verify_checksum(&decompress_data, metadata.checksum)?;
511                    // set the checkpoint size
512                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
513                    Ok(Some(decompress_data))
514                }
515                Err(e) => {
516                    if e.kind() == ErrorKind::NotFound {
517                        if self.compress_type != FALL_BACK_COMPRESS_TYPE {
518                            let fall_back_path = gen_path(
519                                &self.path,
520                                &checkpoint_file(version),
521                                FALL_BACK_COMPRESS_TYPE,
522                            );
523                            debug!(
524                                "Failed to load checkpoint from path: {}, fall back to path: {}",
525                                path, fall_back_path
526                            );
527                            match self.object_store.read(&fall_back_path).await {
528                                Ok(checkpoint) => {
529                                    let checkpoint_size = checkpoint.len();
530                                    let decompress_data = FALL_BACK_COMPRESS_TYPE
531                                        .decode(checkpoint)
532                                        .await
533                                        .context(DecompressObjectSnafu {
534                                            compress_type: FALL_BACK_COMPRESS_TYPE,
535                                            path,
536                                        })?;
537                                    verify_checksum(&decompress_data, metadata.checksum)?;
538                                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
539                                    Ok(Some(decompress_data))
540                                }
541                                Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
542                                Err(e) => Err(e).context(OpenDalSnafu),
543                            }
544                        } else {
545                            Ok(None)
546                        }
547                    } else {
548                        Err(e).context(OpenDalSnafu)
549                    }
550                }
551            }?;
552        Ok(checkpoint_data.map(|data| (version, data)))
553    }
554
555    /// Load the latest checkpoint.
556    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
557    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
558        let last_checkpoint_path = self.last_checkpoint_path();
559        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
560            Ok(data) => data,
561            Err(e) if e.kind() == ErrorKind::NotFound => {
562                return Ok(None);
563            }
564            Err(e) => {
565                return Err(e).context(OpenDalSnafu)?;
566            }
567        };
568
569        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
570
571        debug!(
572            "Load checkpoint in path: {}, metadata: {:?}",
573            last_checkpoint_path, checkpoint_metadata
574        );
575
576        self.load_checkpoint(checkpoint_metadata).await
577    }
578
579    #[cfg(test)]
580    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
581        self.object_store
582            .read(path)
583            .await
584            .context(OpenDalSnafu)
585            .map(|v| v.to_vec())
586    }
587
588    #[cfg(test)]
589    pub async fn write_last_checkpoint(
590        &mut self,
591        version: ManifestVersion,
592        bytes: &[u8],
593    ) -> Result<()> {
594        let path = self.checkpoint_file_path(version);
595        let data = self
596            .compress_type
597            .encode(bytes)
598            .await
599            .context(CompressObjectSnafu {
600                compress_type: self.compress_type,
601                path: &path,
602            })?;
603
604        let checkpoint_size = data.len();
605
606        self.object_store
607            .write(&path, data)
608            .await
609            .context(OpenDalSnafu)?;
610
611        self.set_checkpoint_file_size(version, checkpoint_size as u64);
612
613        let last_checkpoint_path = self.last_checkpoint_path();
614        let checkpoint_metadata = CheckpointMetadata {
615            size: bytes.len(),
616            version,
617            checksum: Some(1218259706),
618            extend_metadata: HashMap::new(),
619        };
620
621        debug!(
622            "Rewrite checkpoint in path: {},  metadata: {:?}",
623            last_checkpoint_path, checkpoint_metadata
624        );
625
626        let bytes = checkpoint_metadata.encode()?;
627
628        // Overwrite the last checkpoint with the modified content
629        self.object_store
630            .write(&last_checkpoint_path, bytes.clone())
631            .await
632            .context(OpenDalSnafu)?;
633        Ok(())
634    }
635
636    /// Compute the size(Byte) in manifest size map.
637    pub(crate) fn total_manifest_size(&self) -> u64 {
638        self.manifest_size_map.read().unwrap().values().sum()
639    }
640
641    /// Resets the size of all files.
642    pub(crate) fn reset_manifest_size(&mut self) {
643        self.manifest_size_map.write().unwrap().clear();
644        self.total_manifest_size.store(0, Ordering::Relaxed);
645    }
646
647    /// Set the size of the delta file by delta version.
648    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
649        let mut m = self.manifest_size_map.write().unwrap();
650        m.insert(FileKey::Delta(version), size);
651
652        self.inc_total_manifest_size(size);
653    }
654
655    /// Set the size of the checkpoint file by checkpoint version.
656    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
657        let mut m = self.manifest_size_map.write().unwrap();
658        m.insert(FileKey::Checkpoint(version), size);
659
660        self.inc_total_manifest_size(size);
661    }
662
663    fn unset_file_size(&self, key: &FileKey) {
664        let mut m = self.manifest_size_map.write().unwrap();
665        if let Some(val) = m.remove(key) {
666            debug!("Unset file size: {:?}, size: {}", key, val);
667            self.dec_total_manifest_size(val);
668        }
669    }
670
671    fn inc_total_manifest_size(&self, val: u64) {
672        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
673    }
674
675    fn dec_total_manifest_size(&self, val: u64) {
676        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
677    }
678
679    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
680    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
681        let manifest_entries = self
682            .get_paths(
683                |entry| {
684                    let file_name = entry.name();
685                    if is_delta_file(file_name) {
686                        let version = file_version(file_name);
687                        Some((version, entry))
688                    } else {
689                        None
690                    }
691                },
692                true,
693            )
694            .await?;
695
696        let mut sorted_entries = manifest_entries;
697        Self::sort_manifests(&mut sorted_entries);
698
699        self.fetch_manifests_from_entries(sorted_entries).await
700    }
701
702    /// Clear all staging manifest files.
703    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
704        self.object_store
705            .remove_all(&self.staging_path)
706            .await
707            .context(OpenDalSnafu)?;
708
709        debug!(
710            "Cleared all staging manifest files from {}",
711            self.staging_path
712        );
713
714        Ok(())
715    }
716}
717
718#[derive(Serialize, Deserialize, Debug)]
719pub(crate) struct CheckpointMetadata {
720    pub size: usize,
721    /// The latest version this checkpoint contains.
722    pub version: ManifestVersion,
723    pub checksum: Option<u32>,
724    pub extend_metadata: HashMap<String, String>,
725}
726
727impl CheckpointMetadata {
728    fn encode(&self) -> Result<Vec<u8>> {
729        Ok(serde_json::to_string(self)
730            .context(SerdeJsonSnafu)?
731            .into_bytes())
732    }
733
734    fn decode(bs: &[u8]) -> Result<Self> {
735        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
736
737        serde_json::from_str(data).context(SerdeJsonSnafu)
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use common_test_util::temp_dir::create_temp_dir;
744    use object_store::ObjectStore;
745    use object_store::services::Fs;
746
747    use super::*;
748
749    fn new_test_manifest_store() -> ManifestObjectStore {
750        common_telemetry::init_default_ut_logging();
751        let tmp_dir = create_temp_dir("test_manifest_log_store");
752        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
753        let object_store = ObjectStore::new(builder).unwrap().finish();
754        ManifestObjectStore::new(
755            "/",
756            object_store,
757            CompressionType::Uncompressed,
758            Default::default(),
759        )
760    }
761
762    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
763        CheckpointMetadata {
764            size: 0,
765            version,
766            checksum: None,
767            extend_metadata: Default::default(),
768        }
769    }
770
771    #[test]
772    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
773    fn test_compress_file_path_generation() {
774        let path = "/foo/bar/";
775        let version: ManifestVersion = 0;
776        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
777        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
778    }
779
780    #[tokio::test]
781    async fn test_manifest_log_store_uncompress() {
782        let mut log_store = new_test_manifest_store();
783        log_store.compress_type = CompressionType::Uncompressed;
784        test_manifest_log_store_case(log_store).await;
785    }
786
787    #[tokio::test]
788    async fn test_manifest_log_store_compress() {
789        let mut log_store = new_test_manifest_store();
790        log_store.compress_type = CompressionType::Gzip;
791        test_manifest_log_store_case(log_store).await;
792    }
793
794    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
795        for v in 0..5 {
796            log_store
797                .save(v, format!("hello, {v}").as_bytes(), false)
798                .await
799                .unwrap();
800        }
801
802        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
803        let mut it = manifests.into_iter();
804        for v in 1..4 {
805            let (version, bytes) = it.next().unwrap();
806            assert_eq!(v, version);
807            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
808        }
809        assert!(it.next().is_none());
810
811        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
812        let mut it = manifests.into_iter();
813        for v in 0..5 {
814            let (version, bytes) = it.next().unwrap();
815            assert_eq!(v, version);
816            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
817        }
818        assert!(it.next().is_none());
819
820        // test checkpoint
821        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
822        log_store
823            .save_checkpoint(3, "checkpoint".as_bytes())
824            .await
825            .unwrap();
826
827        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
828        assert_eq!(checkpoint, "checkpoint".as_bytes());
829        assert_eq!(3, v);
830
831        //delete (,4) logs and keep checkpoint 3.
832        let _ = log_store.delete_until(4, true).await.unwrap();
833        let _ = log_store
834            .load_checkpoint(new_checkpoint_metadata_with_version(3))
835            .await
836            .unwrap()
837            .unwrap();
838        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
839        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
840        let mut it = manifests.into_iter();
841
842        let (version, bytes) = it.next().unwrap();
843        assert_eq!(4, version);
844        assert_eq!("hello, 4".as_bytes(), bytes);
845        assert!(it.next().is_none());
846
847        // delete all logs and checkpoints
848        let _ = log_store.delete_until(11, false).await.unwrap();
849        assert!(
850            log_store
851                .load_checkpoint(new_checkpoint_metadata_with_version(3))
852                .await
853                .unwrap()
854                .is_none()
855        );
856        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
857        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
858        let mut it = manifests.into_iter();
859
860        assert!(it.next().is_none());
861    }
862
863    #[tokio::test]
864    // test ManifestObjectStore can read/delete previously uncompressed data correctly
865    async fn test_compress_backward_compatible() {
866        let mut log_store = new_test_manifest_store();
867
868        // write uncompress data to stimulate previously uncompressed data
869        log_store.compress_type = CompressionType::Uncompressed;
870        for v in 0..5 {
871            log_store
872                .save(v, format!("hello, {v}").as_bytes(), false)
873                .await
874                .unwrap();
875        }
876        log_store
877            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
878            .await
879            .unwrap();
880
881        // change compress type
882        log_store.compress_type = CompressionType::Gzip;
883
884        // test load_last_checkpoint work correctly for previously uncompressed data
885        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
886        assert_eq!(v, 5);
887        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
888
889        // write compressed data to stimulate compress algorithm take effect
890        for v in 5..10 {
891            log_store
892                .save(v, format!("hello, {v}").as_bytes(), false)
893                .await
894                .unwrap();
895        }
896        log_store
897            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
898            .await
899            .unwrap();
900
901        // test data reading
902        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
903        let mut it = manifests.into_iter();
904
905        for v in 0..10 {
906            let (version, bytes) = it.next().unwrap();
907            assert_eq!(v, version);
908            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
909        }
910        let (v, checkpoint) = log_store
911            .load_checkpoint(new_checkpoint_metadata_with_version(5))
912            .await
913            .unwrap()
914            .unwrap();
915        assert_eq!(v, 5);
916        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
917        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
918        assert_eq!(v, 10);
919        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
920
921        // Delete util 10, contain uncompressed/compressed data
922        // log 0, 1, 2, 7, 8, 9 will be delete
923        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
924        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
925        let mut it = manifests.into_iter();
926        assert!(it.next().is_none());
927    }
928
929    #[tokio::test]
930    async fn test_file_version() {
931        let version = file_version("00000000000000000007.checkpoint");
932        assert_eq!(version, 7);
933
934        let name = delta_file(version);
935        assert_eq!(name, "00000000000000000007.json");
936
937        let name = checkpoint_file(version);
938        assert_eq!(name, "00000000000000000007.checkpoint");
939    }
940
941    #[tokio::test]
942    async fn test_uncompressed_manifest_files_size() {
943        let mut log_store = new_test_manifest_store();
944        // write 5 manifest files with uncompressed(8B per file)
945        log_store.compress_type = CompressionType::Uncompressed;
946        for v in 0..5 {
947            log_store
948                .save(v, format!("hello, {v}").as_bytes(), false)
949                .await
950                .unwrap();
951        }
952        // write 1 checkpoint file with uncompressed(23B)
953        log_store
954            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
955            .await
956            .unwrap();
957
958        // manifest files size
959        assert_eq!(log_store.total_manifest_size(), 63);
960
961        // delete 3 manifest files
962        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
963
964        // manifest files size after delete
965        assert_eq!(log_store.total_manifest_size(), 39);
966
967        // delete all manifest files
968        assert_eq!(
969            log_store
970                .delete_until(ManifestVersion::MAX, false)
971                .await
972                .unwrap(),
973            3
974        );
975
976        assert_eq!(log_store.total_manifest_size(), 0);
977    }
978
979    #[tokio::test]
980    async fn test_compressed_manifest_files_size() {
981        let mut log_store = new_test_manifest_store();
982        // Test with compressed manifest files
983        log_store.compress_type = CompressionType::Gzip;
984        // write 5 manifest files
985        for v in 0..5 {
986            log_store
987                .save(v, format!("hello, {v}").as_bytes(), false)
988                .await
989                .unwrap();
990        }
991        log_store
992            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
993            .await
994            .unwrap();
995
996        // manifest files size
997        assert_eq!(log_store.total_manifest_size(), 181);
998
999        // delete 3 manifest files
1000        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1001
1002        // manifest files size after delete
1003        assert_eq!(log_store.total_manifest_size(), 97);
1004
1005        // delete all manifest files
1006        assert_eq!(
1007            log_store
1008                .delete_until(ManifestVersion::MAX, false)
1009                .await
1010                .unwrap(),
1011            3
1012        );
1013
1014        assert_eq!(log_store.total_manifest_size(), 0);
1015    }
1016}