mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::error::{
37    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
38    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
39};
40
41lazy_static! {
42    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
43    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
44}
45
46const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
47const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
48/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
49/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
50const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
51const FETCH_MANIFEST_PARALLELISM: usize = 16;
52
53/// Returns the directory to the manifest files.
54pub fn manifest_dir(region_dir: &str) -> String {
55    join_dir(region_dir, "manifest")
56}
57
58/// Returns the [CompressionType] according to whether to compress manifest files.
59pub const fn manifest_compress_type(compress: bool) -> CompressionType {
60    if compress {
61        DEFAULT_MANIFEST_COMPRESSION_TYPE
62    } else {
63        FALL_BACK_COMPRESS_TYPE
64    }
65}
66
67pub fn delta_file(version: ManifestVersion) -> String {
68    format!("{version:020}.json")
69}
70
71pub fn checkpoint_file(version: ManifestVersion) -> String {
72    format!("{version:020}.checkpoint")
73}
74
75pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
76    if compress_type == CompressionType::Uncompressed {
77        format!("{}{}", path, file)
78    } else {
79        format!("{}{}.{}", path, file, compress_type.file_extension())
80    }
81}
82
83fn checkpoint_checksum(data: &[u8]) -> u32 {
84    let mut hasher = Hasher::new();
85    hasher.update(data);
86    hasher.finalize()
87}
88
89fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
90    if let Some(checksum) = wanted {
91        let calculated_checksum = checkpoint_checksum(data);
92        ensure!(
93            checksum == calculated_checksum,
94            ChecksumMismatchSnafu {
95                actual: calculated_checksum,
96                expected: checksum,
97            }
98        );
99    }
100    Ok(())
101}
102
103/// Return's the file manifest version from path
104///
105/// # Panics
106/// If the file path is not a valid delta or checkpoint file.
107pub fn file_version(path: &str) -> ManifestVersion {
108    let s = path.split('.').next().unwrap();
109    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
110}
111
112/// Return's the file compress algorithm by file extension.
113///
114/// for example file
115/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
116pub fn file_compress_type(path: &str) -> CompressionType {
117    let s = path.rsplit('.').next().unwrap_or("");
118    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
119}
120
121pub fn is_delta_file(file_name: &str) -> bool {
122    DELTA_RE.is_match(file_name)
123}
124
125pub fn is_checkpoint_file(file_name: &str) -> bool {
126    CHECKPOINT_RE.is_match(file_name)
127}
128
129/// Key to identify a manifest file.
130#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
131enum FileKey {
132    /// A delta file (`.json`).
133    Delta(ManifestVersion),
134    /// A checkpoint file (`.checkpoint`).
135    Checkpoint(ManifestVersion),
136}
137
138#[derive(Clone, Debug)]
139pub struct ManifestObjectStore {
140    object_store: ObjectStore,
141    compress_type: CompressionType,
142    path: String,
143    staging_path: String,
144    /// Stores the size of each manifest file.
145    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
146    total_manifest_size: Arc<AtomicU64>,
147}
148
149impl ManifestObjectStore {
150    pub fn new(
151        path: &str,
152        object_store: ObjectStore,
153        compress_type: CompressionType,
154        total_manifest_size: Arc<AtomicU64>,
155    ) -> Self {
156        let path = util::normalize_dir(path);
157        let staging_path = {
158            // Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
159            let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
160            util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
161        };
162        Self {
163            object_store,
164            compress_type,
165            path,
166            staging_path,
167            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
168            total_manifest_size,
169        }
170    }
171
172    /// Returns the delta file path under the **current** compression algorithm
173    fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
174        let base_path = if is_staging {
175            &self.staging_path
176        } else {
177            &self.path
178        };
179        gen_path(base_path, &delta_file(version), self.compress_type)
180    }
181
182    /// Returns the checkpoint file path under the **current** compression algorithm
183    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
184        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
185    }
186
187    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
188    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
189    pub(crate) fn last_checkpoint_path(&self) -> String {
190        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
191    }
192
193    /// Returns the manifest dir
194    pub(crate) fn manifest_dir(&self) -> &str {
195        &self.path
196    }
197
198    /// Returns an iterator of manifests from normal or staging directory.
199    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
200        let path = if is_staging {
201            &self.staging_path
202        } else {
203            &self.path
204        };
205        match self.object_store.lister_with(path).await {
206            Ok(streamer) => Ok(Some(streamer)),
207            Err(e) if e.kind() == ErrorKind::NotFound => {
208                debug!("Manifest directory does not exist: {}", path);
209                Ok(None)
210            }
211            Err(e) => Err(e).context(OpenDalSnafu)?,
212        }
213    }
214
215    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
216    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
217    /// Return an empty vector when directory is not found.
218    pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
219    where
220        F: Fn(Entry) -> Option<R>,
221    {
222        let Some(streamer) = self.manifest_lister(is_staging).await? else {
223            return Ok(vec![]);
224        };
225
226        streamer
227            .try_filter_map(|e| async { Ok(filter(e)) })
228            .try_collect::<Vec<_>>()
229            .await
230            .context(OpenDalSnafu)
231    }
232
233    /// Sorts the manifest files.
234    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
235        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
236    }
237
238    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
239    pub async fn scan(
240        &self,
241        start: ManifestVersion,
242        end: ManifestVersion,
243    ) -> Result<Vec<(ManifestVersion, Entry)>> {
244        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
245
246        let mut entries: Vec<(ManifestVersion, Entry)> = self
247            .get_paths(
248                |entry| {
249                    let file_name = entry.name();
250                    if is_delta_file(file_name) {
251                        let version = file_version(file_name);
252                        if start <= version && version < end {
253                            return Some((version, entry));
254                        }
255                    }
256                    None
257                },
258                false,
259            )
260            .await?;
261
262        Self::sort_manifests(&mut entries);
263
264        Ok(entries)
265    }
266
267    /// Fetches manifests in range [start_version, end_version).
268    ///
269    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
270    pub async fn fetch_manifests_strict_from(
271        &self,
272        start_version: ManifestVersion,
273        end_version: ManifestVersion,
274        region_id: RegionId,
275    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
276        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
277        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
278        debug!(
279            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
280            start_version,
281            end_version,
282            start_index,
283            region_id,
284            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
285        );
286        if let Some(start_index) = start_index {
287            Ok(manifests.split_off(start_index))
288        } else {
289            Ok(vec![])
290        }
291    }
292
293    /// Common implementation for fetching manifests from entries in parallel.
294    async fn fetch_manifests_from_entries(
295        &self,
296        entries: Vec<(ManifestVersion, Entry)>,
297    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
298        if entries.is_empty() {
299            return Ok(vec![]);
300        }
301
302        // TODO(weny): Make it configurable.
303        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
304
305        let tasks = entries.iter().map(|(v, entry)| async {
306            // Safety: semaphore must exist.
307            let _permit = semaphore.acquire().await.unwrap();
308
309            let compress_type = file_compress_type(entry.name());
310            let bytes = self
311                .object_store
312                .read(entry.path())
313                .await
314                .context(OpenDalSnafu)?;
315            let data = compress_type
316                .decode(bytes)
317                .await
318                .context(DecompressObjectSnafu {
319                    compress_type,
320                    path: entry.path(),
321                })?;
322            Ok((*v, data))
323        });
324
325        try_join_all(tasks).await
326    }
327
328    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
329    ///
330    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
331    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
332    pub async fn fetch_manifests(
333        &self,
334        start_version: ManifestVersion,
335        end_version: ManifestVersion,
336    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
337        let manifests = self.scan(start_version, end_version).await?;
338        self.fetch_manifests_from_entries(manifests).await
339    }
340
341    /// Delete manifest files that version < end.
342    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
343    /// ### Return
344    /// The number of deleted files.
345    pub async fn delete_until(
346        &self,
347        end: ManifestVersion,
348        keep_last_checkpoint: bool,
349    ) -> Result<usize> {
350        // Stores (entry, is_checkpoint, version) in a Vec.
351        let entries: Vec<_> = self
352            .get_paths(
353                |entry| {
354                    let file_name = entry.name();
355                    let is_checkpoint = is_checkpoint_file(file_name);
356                    if is_delta_file(file_name) || is_checkpoint_file(file_name) {
357                        let version = file_version(file_name);
358                        if version < end {
359                            return Some((entry, is_checkpoint, version));
360                        }
361                    }
362                    None
363                },
364                false,
365            )
366            .await?;
367        let checkpoint_version = if keep_last_checkpoint {
368            // Note that the order of entries is unspecific.
369            entries
370                .iter()
371                .filter_map(
372                    |(_e, is_checkpoint, version)| {
373                        if *is_checkpoint { Some(version) } else { None }
374                    },
375                )
376                .max()
377        } else {
378            None
379        };
380        let del_entries: Vec<_> = entries
381            .iter()
382            .filter(|(_e, is_checkpoint, version)| {
383                if let Some(max_version) = checkpoint_version {
384                    if *is_checkpoint {
385                        // We need to keep the checkpoint file.
386                        version < max_version
387                    } else {
388                        // We can delete the log file with max_version as the checkpoint
389                        // file contains the log file's content.
390                        version <= max_version
391                    }
392                } else {
393                    true
394                }
395            })
396            .collect();
397        let paths = del_entries
398            .iter()
399            .map(|(e, _, _)| e.path().to_string())
400            .collect::<Vec<_>>();
401        let ret = paths.len();
402
403        debug!(
404            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
405            ret, self.path, end, checkpoint_version, paths,
406        );
407
408        self.object_store
409            .delete_iter(paths)
410            .await
411            .context(OpenDalSnafu)?;
412
413        // delete manifest sizes
414        for (_, is_checkpoint, version) in &del_entries {
415            if *is_checkpoint {
416                self.unset_file_size(&FileKey::Checkpoint(*version));
417            } else {
418                self.unset_file_size(&FileKey::Delta(*version));
419            }
420        }
421
422        Ok(ret)
423    }
424
425    /// Save the delta manifest file.
426    pub async fn save(
427        &mut self,
428        version: ManifestVersion,
429        bytes: &[u8],
430        is_staging: bool,
431    ) -> Result<()> {
432        let path = self.delta_file_path(version, is_staging);
433        debug!("Save log to manifest storage, version: {}", version);
434        let data = self
435            .compress_type
436            .encode(bytes)
437            .await
438            .context(CompressObjectSnafu {
439                compress_type: self.compress_type,
440                path: &path,
441            })?;
442        let delta_size = data.len();
443        self.object_store
444            .write(&path, data)
445            .await
446            .context(OpenDalSnafu)?;
447        self.set_delta_file_size(version, delta_size as u64);
448        Ok(())
449    }
450
451    /// Save the checkpoint manifest file.
452    pub(crate) async fn save_checkpoint(
453        &self,
454        version: ManifestVersion,
455        bytes: &[u8],
456    ) -> Result<()> {
457        let path = self.checkpoint_file_path(version);
458        let data = self
459            .compress_type
460            .encode(bytes)
461            .await
462            .context(CompressObjectSnafu {
463                compress_type: self.compress_type,
464                path: &path,
465            })?;
466        let checkpoint_size = data.len();
467        let checksum = checkpoint_checksum(bytes);
468        self.object_store
469            .write(&path, data)
470            .await
471            .context(OpenDalSnafu)?;
472        self.set_checkpoint_file_size(version, checkpoint_size as u64);
473
474        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
475        let last_checkpoint_path = self.last_checkpoint_path();
476
477        let checkpoint_metadata = CheckpointMetadata {
478            size: bytes.len(),
479            version,
480            checksum: Some(checksum),
481            extend_metadata: HashMap::new(),
482        };
483
484        debug!(
485            "Save checkpoint in path: {},  metadata: {:?}",
486            last_checkpoint_path, checkpoint_metadata
487        );
488
489        let bytes = checkpoint_metadata.encode()?;
490        self.object_store
491            .write(&last_checkpoint_path, bytes)
492            .await
493            .context(OpenDalSnafu)?;
494
495        Ok(())
496    }
497
498    async fn load_checkpoint(
499        &mut self,
500        metadata: CheckpointMetadata,
501    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
502        let version = metadata.version;
503        let path = self.checkpoint_file_path(version);
504        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
505        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
506        let checkpoint_data =
507            match self.object_store.read(&path).await {
508                Ok(checkpoint) => {
509                    let checkpoint_size = checkpoint.len();
510                    let decompress_data = self.compress_type.decode(checkpoint).await.context(
511                        DecompressObjectSnafu {
512                            compress_type: self.compress_type,
513                            path,
514                        },
515                    )?;
516                    verify_checksum(&decompress_data, metadata.checksum)?;
517                    // set the checkpoint size
518                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
519                    Ok(Some(decompress_data))
520                }
521                Err(e) => {
522                    if e.kind() == ErrorKind::NotFound {
523                        if self.compress_type != FALL_BACK_COMPRESS_TYPE {
524                            let fall_back_path = gen_path(
525                                &self.path,
526                                &checkpoint_file(version),
527                                FALL_BACK_COMPRESS_TYPE,
528                            );
529                            debug!(
530                                "Failed to load checkpoint from path: {}, fall back to path: {}",
531                                path, fall_back_path
532                            );
533                            match self.object_store.read(&fall_back_path).await {
534                                Ok(checkpoint) => {
535                                    let checkpoint_size = checkpoint.len();
536                                    let decompress_data = FALL_BACK_COMPRESS_TYPE
537                                        .decode(checkpoint)
538                                        .await
539                                        .context(DecompressObjectSnafu {
540                                            compress_type: FALL_BACK_COMPRESS_TYPE,
541                                            path,
542                                        })?;
543                                    verify_checksum(&decompress_data, metadata.checksum)?;
544                                    self.set_checkpoint_file_size(version, checkpoint_size as u64);
545                                    Ok(Some(decompress_data))
546                                }
547                                Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
548                                Err(e) => Err(e).context(OpenDalSnafu),
549                            }
550                        } else {
551                            Ok(None)
552                        }
553                    } else {
554                        Err(e).context(OpenDalSnafu)
555                    }
556                }
557            }?;
558        Ok(checkpoint_data.map(|data| (version, data)))
559    }
560
561    /// Load the latest checkpoint.
562    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
563    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
564        let last_checkpoint_path = self.last_checkpoint_path();
565        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
566            Ok(data) => data,
567            Err(e) if e.kind() == ErrorKind::NotFound => {
568                return Ok(None);
569            }
570            Err(e) => {
571                return Err(e).context(OpenDalSnafu)?;
572            }
573        };
574
575        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
576
577        debug!(
578            "Load checkpoint in path: {}, metadata: {:?}",
579            last_checkpoint_path, checkpoint_metadata
580        );
581
582        self.load_checkpoint(checkpoint_metadata).await
583    }
584
585    #[cfg(test)]
586    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
587        self.object_store
588            .read(path)
589            .await
590            .context(OpenDalSnafu)
591            .map(|v| v.to_vec())
592    }
593
594    #[cfg(test)]
595    pub async fn write_last_checkpoint(
596        &mut self,
597        version: ManifestVersion,
598        bytes: &[u8],
599    ) -> Result<()> {
600        let path = self.checkpoint_file_path(version);
601        let data = self
602            .compress_type
603            .encode(bytes)
604            .await
605            .context(CompressObjectSnafu {
606                compress_type: self.compress_type,
607                path: &path,
608            })?;
609
610        let checkpoint_size = data.len();
611
612        self.object_store
613            .write(&path, data)
614            .await
615            .context(OpenDalSnafu)?;
616
617        self.set_checkpoint_file_size(version, checkpoint_size as u64);
618
619        let last_checkpoint_path = self.last_checkpoint_path();
620        let checkpoint_metadata = CheckpointMetadata {
621            size: bytes.len(),
622            version,
623            checksum: Some(1218259706),
624            extend_metadata: HashMap::new(),
625        };
626
627        debug!(
628            "Rewrite checkpoint in path: {},  metadata: {:?}",
629            last_checkpoint_path, checkpoint_metadata
630        );
631
632        let bytes = checkpoint_metadata.encode()?;
633
634        // Overwrite the last checkpoint with the modified content
635        self.object_store
636            .write(&last_checkpoint_path, bytes.clone())
637            .await
638            .context(OpenDalSnafu)?;
639        Ok(())
640    }
641
642    /// Compute the size(Byte) in manifest size map.
643    pub(crate) fn total_manifest_size(&self) -> u64 {
644        self.manifest_size_map.read().unwrap().values().sum()
645    }
646
647    /// Resets the size of all files.
648    pub(crate) fn reset_manifest_size(&mut self) {
649        self.manifest_size_map.write().unwrap().clear();
650        self.total_manifest_size.store(0, Ordering::Relaxed);
651    }
652
653    /// Set the size of the delta file by delta version.
654    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
655        let mut m = self.manifest_size_map.write().unwrap();
656        m.insert(FileKey::Delta(version), size);
657
658        self.inc_total_manifest_size(size);
659    }
660
661    /// Set the size of the checkpoint file by checkpoint version.
662    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
663        let mut m = self.manifest_size_map.write().unwrap();
664        m.insert(FileKey::Checkpoint(version), size);
665
666        self.inc_total_manifest_size(size);
667    }
668
669    fn unset_file_size(&self, key: &FileKey) {
670        let mut m = self.manifest_size_map.write().unwrap();
671        if let Some(val) = m.remove(key) {
672            debug!("Unset file size: {:?}, size: {}", key, val);
673            self.dec_total_manifest_size(val);
674        }
675    }
676
677    fn inc_total_manifest_size(&self, val: u64) {
678        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
679    }
680
681    fn dec_total_manifest_size(&self, val: u64) {
682        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
683    }
684
685    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
686    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
687        let manifest_entries = self
688            .get_paths(
689                |entry| {
690                    let file_name = entry.name();
691                    if is_delta_file(file_name) {
692                        let version = file_version(file_name);
693                        Some((version, entry))
694                    } else {
695                        None
696                    }
697                },
698                true,
699            )
700            .await?;
701
702        let mut sorted_entries = manifest_entries;
703        Self::sort_manifests(&mut sorted_entries);
704
705        self.fetch_manifests_from_entries(sorted_entries).await
706    }
707
708    /// Clear all staging manifest files.
709    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
710        self.object_store
711            .remove_all(&self.staging_path)
712            .await
713            .context(OpenDalSnafu)?;
714
715        debug!(
716            "Cleared all staging manifest files from {}",
717            self.staging_path
718        );
719
720        Ok(())
721    }
722}
723
724#[derive(Serialize, Deserialize, Debug)]
725pub(crate) struct CheckpointMetadata {
726    pub size: usize,
727    /// The latest version this checkpoint contains.
728    pub version: ManifestVersion,
729    pub checksum: Option<u32>,
730    pub extend_metadata: HashMap<String, String>,
731}
732
733impl CheckpointMetadata {
734    fn encode(&self) -> Result<Vec<u8>> {
735        Ok(serde_json::to_string(self)
736            .context(SerdeJsonSnafu)?
737            .into_bytes())
738    }
739
740    fn decode(bs: &[u8]) -> Result<Self> {
741        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
742
743        serde_json::from_str(data).context(SerdeJsonSnafu)
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use common_test_util::temp_dir::create_temp_dir;
750    use object_store::ObjectStore;
751    use object_store::services::Fs;
752
753    use super::*;
754
755    fn new_test_manifest_store() -> ManifestObjectStore {
756        common_telemetry::init_default_ut_logging();
757        let tmp_dir = create_temp_dir("test_manifest_log_store");
758        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
759        let object_store = ObjectStore::new(builder).unwrap().finish();
760        ManifestObjectStore::new(
761            "/",
762            object_store,
763            CompressionType::Uncompressed,
764            Default::default(),
765        )
766    }
767
768    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
769        CheckpointMetadata {
770            size: 0,
771            version,
772            checksum: None,
773            extend_metadata: Default::default(),
774        }
775    }
776
777    #[test]
778    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
779    fn test_compress_file_path_generation() {
780        let path = "/foo/bar/";
781        let version: ManifestVersion = 0;
782        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
783        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
784    }
785
786    #[tokio::test]
787    async fn test_manifest_log_store_uncompress() {
788        let mut log_store = new_test_manifest_store();
789        log_store.compress_type = CompressionType::Uncompressed;
790        test_manifest_log_store_case(log_store).await;
791    }
792
793    #[tokio::test]
794    async fn test_manifest_log_store_compress() {
795        let mut log_store = new_test_manifest_store();
796        log_store.compress_type = CompressionType::Gzip;
797        test_manifest_log_store_case(log_store).await;
798    }
799
800    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
801        for v in 0..5 {
802            log_store
803                .save(v, format!("hello, {v}").as_bytes(), false)
804                .await
805                .unwrap();
806        }
807
808        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
809        let mut it = manifests.into_iter();
810        for v in 1..4 {
811            let (version, bytes) = it.next().unwrap();
812            assert_eq!(v, version);
813            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
814        }
815        assert!(it.next().is_none());
816
817        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
818        let mut it = manifests.into_iter();
819        for v in 0..5 {
820            let (version, bytes) = it.next().unwrap();
821            assert_eq!(v, version);
822            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
823        }
824        assert!(it.next().is_none());
825
826        // test checkpoint
827        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
828        log_store
829            .save_checkpoint(3, "checkpoint".as_bytes())
830            .await
831            .unwrap();
832
833        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
834        assert_eq!(checkpoint, "checkpoint".as_bytes());
835        assert_eq!(3, v);
836
837        //delete (,4) logs and keep checkpoint 3.
838        let _ = log_store.delete_until(4, true).await.unwrap();
839        let _ = log_store
840            .load_checkpoint(new_checkpoint_metadata_with_version(3))
841            .await
842            .unwrap()
843            .unwrap();
844        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
845        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
846        let mut it = manifests.into_iter();
847
848        let (version, bytes) = it.next().unwrap();
849        assert_eq!(4, version);
850        assert_eq!("hello, 4".as_bytes(), bytes);
851        assert!(it.next().is_none());
852
853        // delete all logs and checkpoints
854        let _ = log_store.delete_until(11, false).await.unwrap();
855        assert!(
856            log_store
857                .load_checkpoint(new_checkpoint_metadata_with_version(3))
858                .await
859                .unwrap()
860                .is_none()
861        );
862        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
863        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
864        let mut it = manifests.into_iter();
865
866        assert!(it.next().is_none());
867    }
868
869    #[tokio::test]
870    // test ManifestObjectStore can read/delete previously uncompressed data correctly
871    async fn test_compress_backward_compatible() {
872        let mut log_store = new_test_manifest_store();
873
874        // write uncompress data to stimulate previously uncompressed data
875        log_store.compress_type = CompressionType::Uncompressed;
876        for v in 0..5 {
877            log_store
878                .save(v, format!("hello, {v}").as_bytes(), false)
879                .await
880                .unwrap();
881        }
882        log_store
883            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
884            .await
885            .unwrap();
886
887        // change compress type
888        log_store.compress_type = CompressionType::Gzip;
889
890        // test load_last_checkpoint work correctly for previously uncompressed data
891        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
892        assert_eq!(v, 5);
893        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
894
895        // write compressed data to stimulate compress algorithm take effect
896        for v in 5..10 {
897            log_store
898                .save(v, format!("hello, {v}").as_bytes(), false)
899                .await
900                .unwrap();
901        }
902        log_store
903            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
904            .await
905            .unwrap();
906
907        // test data reading
908        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
909        let mut it = manifests.into_iter();
910
911        for v in 0..10 {
912            let (version, bytes) = it.next().unwrap();
913            assert_eq!(v, version);
914            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
915        }
916        let (v, checkpoint) = log_store
917            .load_checkpoint(new_checkpoint_metadata_with_version(5))
918            .await
919            .unwrap()
920            .unwrap();
921        assert_eq!(v, 5);
922        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
923        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
924        assert_eq!(v, 10);
925        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
926
927        // Delete util 10, contain uncompressed/compressed data
928        // log 0, 1, 2, 7, 8, 9 will be delete
929        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
930        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
931        let mut it = manifests.into_iter();
932        assert!(it.next().is_none());
933    }
934
935    #[tokio::test]
936    async fn test_file_version() {
937        let version = file_version("00000000000000000007.checkpoint");
938        assert_eq!(version, 7);
939
940        let name = delta_file(version);
941        assert_eq!(name, "00000000000000000007.json");
942
943        let name = checkpoint_file(version);
944        assert_eq!(name, "00000000000000000007.checkpoint");
945    }
946
947    #[tokio::test]
948    async fn test_uncompressed_manifest_files_size() {
949        let mut log_store = new_test_manifest_store();
950        // write 5 manifest files with uncompressed(8B per file)
951        log_store.compress_type = CompressionType::Uncompressed;
952        for v in 0..5 {
953            log_store
954                .save(v, format!("hello, {v}").as_bytes(), false)
955                .await
956                .unwrap();
957        }
958        // write 1 checkpoint file with uncompressed(23B)
959        log_store
960            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
961            .await
962            .unwrap();
963
964        // manifest files size
965        assert_eq!(log_store.total_manifest_size(), 63);
966
967        // delete 3 manifest files
968        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
969
970        // manifest files size after delete
971        assert_eq!(log_store.total_manifest_size(), 39);
972
973        // delete all manifest files
974        assert_eq!(
975            log_store
976                .delete_until(ManifestVersion::MAX, false)
977                .await
978                .unwrap(),
979            3
980        );
981
982        assert_eq!(log_store.total_manifest_size(), 0);
983    }
984
985    #[tokio::test]
986    async fn test_compressed_manifest_files_size() {
987        let mut log_store = new_test_manifest_store();
988        // Test with compressed manifest files
989        log_store.compress_type = CompressionType::Gzip;
990        // write 5 manifest files
991        for v in 0..5 {
992            log_store
993                .save(v, format!("hello, {v}").as_bytes(), false)
994                .await
995                .unwrap();
996        }
997        log_store
998            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
999            .await
1000            .unwrap();
1001
1002        // manifest files size
1003        assert_eq!(log_store.total_manifest_size(), 181);
1004
1005        // delete 3 manifest files
1006        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1007
1008        // manifest files size after delete
1009        assert_eq!(log_store.total_manifest_size(), 97);
1010
1011        // delete all manifest files
1012        assert_eq!(
1013            log_store
1014                .delete_until(ManifestVersion::MAX, false)
1015                .await
1016                .unwrap(),
1017            3
1018        );
1019
1020        assert_eq!(log_store.total_manifest_size(), 0);
1021    }
1022}