mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::cache::manifest_cache::ManifestCache;
37use crate::error::{
38    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
39    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
40};
41
42lazy_static! {
43    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
44    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
45}
46
47const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
48const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
49/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
50/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
51const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
52const FETCH_MANIFEST_PARALLELISM: usize = 16;
53
54/// Returns the directory to the manifest files.
55pub fn manifest_dir(region_dir: &str) -> String {
56    join_dir(region_dir, "manifest")
57}
58
59/// Returns the [CompressionType] according to whether to compress manifest files.
60pub const fn manifest_compress_type(compress: bool) -> CompressionType {
61    if compress {
62        DEFAULT_MANIFEST_COMPRESSION_TYPE
63    } else {
64        FALL_BACK_COMPRESS_TYPE
65    }
66}
67
68pub fn delta_file(version: ManifestVersion) -> String {
69    format!("{version:020}.json")
70}
71
72pub fn checkpoint_file(version: ManifestVersion) -> String {
73    format!("{version:020}.checkpoint")
74}
75
76pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
77    if compress_type == CompressionType::Uncompressed {
78        format!("{}{}", path, file)
79    } else {
80        format!("{}{}.{}", path, file, compress_type.file_extension())
81    }
82}
83
84fn checkpoint_checksum(data: &[u8]) -> u32 {
85    let mut hasher = Hasher::new();
86    hasher.update(data);
87    hasher.finalize()
88}
89
90fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
91    if let Some(checksum) = wanted {
92        let calculated_checksum = checkpoint_checksum(data);
93        ensure!(
94            checksum == calculated_checksum,
95            ChecksumMismatchSnafu {
96                actual: calculated_checksum,
97                expected: checksum,
98            }
99        );
100    }
101    Ok(())
102}
103
104/// Return's the file manifest version from path
105///
106/// # Panics
107/// If the file path is not a valid delta or checkpoint file.
108pub fn file_version(path: &str) -> ManifestVersion {
109    let s = path.split('.').next().unwrap();
110    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
111}
112
113/// Return's the file compress algorithm by file extension.
114///
115/// for example file
116/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
117pub fn file_compress_type(path: &str) -> CompressionType {
118    let s = path.rsplit('.').next().unwrap_or("");
119    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
120}
121
122pub fn is_delta_file(file_name: &str) -> bool {
123    DELTA_RE.is_match(file_name)
124}
125
126pub fn is_checkpoint_file(file_name: &str) -> bool {
127    CHECKPOINT_RE.is_match(file_name)
128}
129
130/// Key to identify a manifest file.
131#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
132enum FileKey {
133    /// A delta file (`.json`).
134    Delta(ManifestVersion),
135    /// A checkpoint file (`.checkpoint`).
136    Checkpoint(ManifestVersion),
137}
138
139#[derive(Clone, Debug)]
140pub struct ManifestObjectStore {
141    object_store: ObjectStore,
142    compress_type: CompressionType,
143    path: String,
144    staging_path: String,
145    /// Stores the size of each manifest file.
146    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
147    total_manifest_size: Arc<AtomicU64>,
148    /// Optional manifest cache for local caching.
149    manifest_cache: Option<ManifestCache>,
150}
151
152impl ManifestObjectStore {
153    pub fn new(
154        path: &str,
155        object_store: ObjectStore,
156        compress_type: CompressionType,
157        total_manifest_size: Arc<AtomicU64>,
158        manifest_cache: Option<ManifestCache>,
159    ) -> Self {
160        let path = util::normalize_dir(path);
161        let staging_path = {
162            // Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
163            let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
164            util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
165        };
166        Self {
167            object_store,
168            compress_type,
169            path,
170            staging_path,
171            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
172            total_manifest_size,
173            manifest_cache,
174        }
175    }
176
177    /// Returns the delta file path under the **current** compression algorithm
178    fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
179        let base_path = if is_staging {
180            &self.staging_path
181        } else {
182            &self.path
183        };
184        gen_path(base_path, &delta_file(version), self.compress_type)
185    }
186
187    /// Returns the checkpoint file path under the **current** compression algorithm
188    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
189        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
190    }
191
192    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
193    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
194    pub(crate) fn last_checkpoint_path(&self) -> String {
195        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
196    }
197
198    /// Returns the manifest dir
199    pub(crate) fn manifest_dir(&self) -> &str {
200        &self.path
201    }
202
203    /// Returns an iterator of manifests from normal or staging directory.
204    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
205        let path = if is_staging {
206            &self.staging_path
207        } else {
208            &self.path
209        };
210        match self.object_store.lister_with(path).await {
211            Ok(streamer) => Ok(Some(streamer)),
212            Err(e) if e.kind() == ErrorKind::NotFound => {
213                debug!("Manifest directory does not exist: {}", path);
214                Ok(None)
215            }
216            Err(e) => Err(e).context(OpenDalSnafu)?,
217        }
218    }
219
220    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
221    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
222    /// Return an empty vector when directory is not found.
223    pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
224    where
225        F: Fn(Entry) -> Option<R>,
226    {
227        let Some(streamer) = self.manifest_lister(is_staging).await? else {
228            return Ok(vec![]);
229        };
230
231        streamer
232            .try_filter_map(|e| async { Ok(filter(e)) })
233            .try_collect::<Vec<_>>()
234            .await
235            .context(OpenDalSnafu)
236    }
237
238    /// Sorts the manifest files.
239    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
240        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
241    }
242
243    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
244    pub async fn scan(
245        &self,
246        start: ManifestVersion,
247        end: ManifestVersion,
248    ) -> Result<Vec<(ManifestVersion, Entry)>> {
249        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
250
251        let mut entries: Vec<(ManifestVersion, Entry)> = self
252            .get_paths(
253                |entry| {
254                    let file_name = entry.name();
255                    if is_delta_file(file_name) {
256                        let version = file_version(file_name);
257                        if start <= version && version < end {
258                            return Some((version, entry));
259                        }
260                    }
261                    None
262                },
263                false,
264            )
265            .await?;
266
267        Self::sort_manifests(&mut entries);
268
269        Ok(entries)
270    }
271
272    /// Fetches manifests in range [start_version, end_version).
273    ///
274    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
275    pub async fn fetch_manifests_strict_from(
276        &self,
277        start_version: ManifestVersion,
278        end_version: ManifestVersion,
279        region_id: RegionId,
280    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
281        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
282        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
283        debug!(
284            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
285            start_version,
286            end_version,
287            start_index,
288            region_id,
289            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
290        );
291        if let Some(start_index) = start_index {
292            Ok(manifests.split_off(start_index))
293        } else {
294            Ok(vec![])
295        }
296    }
297
298    /// Common implementation for fetching manifests from entries in parallel.
299    /// If `is_staging` is true, cache is skipped.
300    async fn fetch_manifests_from_entries(
301        &self,
302        entries: Vec<(ManifestVersion, Entry)>,
303        is_staging: bool,
304    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
305        if entries.is_empty() {
306            return Ok(vec![]);
307        }
308
309        // TODO(weny): Make it configurable.
310        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
311
312        let tasks = entries.iter().map(|(v, entry)| async {
313            // Safety: semaphore must exist.
314            let _permit = semaphore.acquire().await.unwrap();
315
316            let cache_key = entry.path();
317            // Try to get from cache first
318            if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
319                return Ok((*v, data));
320            }
321
322            // Fetch from remote object store
323            let compress_type = file_compress_type(entry.name());
324            let bytes = self
325                .object_store
326                .read(entry.path())
327                .await
328                .context(OpenDalSnafu)?;
329            let data = compress_type
330                .decode(bytes)
331                .await
332                .context(DecompressObjectSnafu {
333                    compress_type,
334                    path: entry.path(),
335                })?;
336
337            // Add to cache
338            self.put_to_cache(cache_key.to_string(), &data, is_staging)
339                .await;
340
341            Ok((*v, data))
342        });
343
344        try_join_all(tasks).await
345    }
346
347    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
348    ///
349    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
350    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
351    pub async fn fetch_manifests(
352        &self,
353        start_version: ManifestVersion,
354        end_version: ManifestVersion,
355    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
356        let manifests = self.scan(start_version, end_version).await?;
357        self.fetch_manifests_from_entries(manifests, false).await
358    }
359
360    /// Delete manifest files that version < end.
361    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
362    /// ### Return
363    /// The number of deleted files.
364    pub async fn delete_until(
365        &self,
366        end: ManifestVersion,
367        keep_last_checkpoint: bool,
368    ) -> Result<usize> {
369        // Stores (entry, is_checkpoint, version) in a Vec.
370        let entries: Vec<_> = self
371            .get_paths(
372                |entry| {
373                    let file_name = entry.name();
374                    let is_checkpoint = is_checkpoint_file(file_name);
375                    if is_delta_file(file_name) || is_checkpoint_file(file_name) {
376                        let version = file_version(file_name);
377                        if version < end {
378                            return Some((entry, is_checkpoint, version));
379                        }
380                    }
381                    None
382                },
383                false,
384            )
385            .await?;
386        let checkpoint_version = if keep_last_checkpoint {
387            // Note that the order of entries is unspecific.
388            entries
389                .iter()
390                .filter_map(
391                    |(_e, is_checkpoint, version)| {
392                        if *is_checkpoint { Some(version) } else { None }
393                    },
394                )
395                .max()
396        } else {
397            None
398        };
399        let del_entries: Vec<_> = entries
400            .iter()
401            .filter(|(_e, is_checkpoint, version)| {
402                if let Some(max_version) = checkpoint_version {
403                    if *is_checkpoint {
404                        // We need to keep the checkpoint file.
405                        version < max_version
406                    } else {
407                        // We can delete the log file with max_version as the checkpoint
408                        // file contains the log file's content.
409                        version <= max_version
410                    }
411                } else {
412                    true
413                }
414            })
415            .collect();
416        let paths = del_entries
417            .iter()
418            .map(|(e, _, _)| e.path().to_string())
419            .collect::<Vec<_>>();
420        let ret = paths.len();
421
422        debug!(
423            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
424            ret, self.path, end, checkpoint_version, paths,
425        );
426
427        // Remove from cache first
428        for (entry, _, _) in &del_entries {
429            self.remove_from_cache(entry.path()).await;
430        }
431
432        self.object_store
433            .delete_iter(paths)
434            .await
435            .context(OpenDalSnafu)?;
436
437        // delete manifest sizes
438        for (_, is_checkpoint, version) in &del_entries {
439            if *is_checkpoint {
440                self.unset_file_size(&FileKey::Checkpoint(*version));
441            } else {
442                self.unset_file_size(&FileKey::Delta(*version));
443            }
444        }
445
446        Ok(ret)
447    }
448
449    /// Save the delta manifest file.
450    pub async fn save(
451        &mut self,
452        version: ManifestVersion,
453        bytes: &[u8],
454        is_staging: bool,
455    ) -> Result<()> {
456        let path = self.delta_file_path(version, is_staging);
457        debug!("Save log to manifest storage, version: {}", version);
458        let data = self
459            .compress_type
460            .encode(bytes)
461            .await
462            .context(CompressObjectSnafu {
463                compress_type: self.compress_type,
464                path: &path,
465            })?;
466        let delta_size = data.len();
467
468        self.write_and_put_cache(&path, data, is_staging).await?;
469        self.set_delta_file_size(version, delta_size as u64);
470
471        Ok(())
472    }
473
474    /// Save the checkpoint manifest file.
475    pub(crate) async fn save_checkpoint(
476        &self,
477        version: ManifestVersion,
478        bytes: &[u8],
479    ) -> Result<()> {
480        let path = self.checkpoint_file_path(version);
481        let data = self
482            .compress_type
483            .encode(bytes)
484            .await
485            .context(CompressObjectSnafu {
486                compress_type: self.compress_type,
487                path: &path,
488            })?;
489        let checkpoint_size = data.len();
490        let checksum = checkpoint_checksum(bytes);
491
492        self.write_and_put_cache(&path, data, false).await?;
493        self.set_checkpoint_file_size(version, checkpoint_size as u64);
494
495        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
496        let last_checkpoint_path = self.last_checkpoint_path();
497
498        let checkpoint_metadata = CheckpointMetadata {
499            size: bytes.len(),
500            version,
501            checksum: Some(checksum),
502            extend_metadata: HashMap::new(),
503        };
504
505        debug!(
506            "Save checkpoint in path: {},  metadata: {:?}",
507            last_checkpoint_path, checkpoint_metadata
508        );
509
510        let bytes = checkpoint_metadata.encode()?;
511        self.object_store
512            .write(&last_checkpoint_path, bytes)
513            .await
514            .context(OpenDalSnafu)?;
515
516        Ok(())
517    }
518
519    async fn load_checkpoint(
520        &mut self,
521        metadata: CheckpointMetadata,
522    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
523        let version = metadata.version;
524        let path = self.checkpoint_file_path(version);
525
526        // Try to get from cache first
527        if let Some(data) = self.get_from_cache(&path, false).await {
528            verify_checksum(&data, metadata.checksum)?;
529            return Ok(Some((version, data)));
530        }
531
532        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
533        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
534        let checkpoint_data = match self.object_store.read(&path).await {
535            Ok(checkpoint) => {
536                let checkpoint_size = checkpoint.len();
537                let decompress_data =
538                    self.compress_type
539                        .decode(checkpoint)
540                        .await
541                        .with_context(|_| DecompressObjectSnafu {
542                            compress_type: self.compress_type,
543                            path: path.clone(),
544                        })?;
545                verify_checksum(&decompress_data, metadata.checksum)?;
546                // set the checkpoint size
547                self.set_checkpoint_file_size(version, checkpoint_size as u64);
548                // Add to cache
549                self.put_to_cache(path, &decompress_data, false).await;
550                Ok(Some(decompress_data))
551            }
552            Err(e) => {
553                if e.kind() == ErrorKind::NotFound {
554                    if self.compress_type != FALL_BACK_COMPRESS_TYPE {
555                        let fall_back_path = gen_path(
556                            &self.path,
557                            &checkpoint_file(version),
558                            FALL_BACK_COMPRESS_TYPE,
559                        );
560                        debug!(
561                            "Failed to load checkpoint from path: {}, fall back to path: {}",
562                            path, fall_back_path
563                        );
564
565                        // Try to get fallback from cache first
566                        if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
567                            verify_checksum(&data, metadata.checksum)?;
568                            return Ok(Some((version, data)));
569                        }
570
571                        match self.object_store.read(&fall_back_path).await {
572                            Ok(checkpoint) => {
573                                let checkpoint_size = checkpoint.len();
574                                let decompress_data = FALL_BACK_COMPRESS_TYPE
575                                    .decode(checkpoint)
576                                    .await
577                                    .with_context(|_| DecompressObjectSnafu {
578                                        compress_type: FALL_BACK_COMPRESS_TYPE,
579                                        path: fall_back_path.clone(),
580                                    })?;
581                                verify_checksum(&decompress_data, metadata.checksum)?;
582                                self.set_checkpoint_file_size(version, checkpoint_size as u64);
583                                // Add fallback to cache
584                                self.put_to_cache(fall_back_path, &decompress_data, false)
585                                    .await;
586                                Ok(Some(decompress_data))
587                            }
588                            Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
589                            Err(e) => Err(e).context(OpenDalSnafu),
590                        }
591                    } else {
592                        Ok(None)
593                    }
594                } else {
595                    Err(e).context(OpenDalSnafu)
596                }
597            }
598        }?;
599        Ok(checkpoint_data.map(|data| (version, data)))
600    }
601
602    /// Load the latest checkpoint.
603    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
604    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
605        let last_checkpoint_path = self.last_checkpoint_path();
606
607        // Fetch from remote object store without cache
608        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
609            Ok(data) => data.to_vec(),
610            Err(e) if e.kind() == ErrorKind::NotFound => {
611                return Ok(None);
612            }
613            Err(e) => {
614                return Err(e).context(OpenDalSnafu)?;
615            }
616        };
617
618        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
619
620        debug!(
621            "Load checkpoint in path: {}, metadata: {:?}",
622            last_checkpoint_path, checkpoint_metadata
623        );
624
625        self.load_checkpoint(checkpoint_metadata).await
626    }
627
628    #[cfg(test)]
629    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
630        self.object_store
631            .read(path)
632            .await
633            .context(OpenDalSnafu)
634            .map(|v| v.to_vec())
635    }
636
637    #[cfg(test)]
638    pub async fn write_last_checkpoint(
639        &mut self,
640        version: ManifestVersion,
641        bytes: &[u8],
642    ) -> Result<()> {
643        let path = self.checkpoint_file_path(version);
644        let data = self
645            .compress_type
646            .encode(bytes)
647            .await
648            .context(CompressObjectSnafu {
649                compress_type: self.compress_type,
650                path: &path,
651            })?;
652
653        let checkpoint_size = data.len();
654
655        self.object_store
656            .write(&path, data)
657            .await
658            .context(OpenDalSnafu)?;
659
660        self.set_checkpoint_file_size(version, checkpoint_size as u64);
661
662        let last_checkpoint_path = self.last_checkpoint_path();
663        let checkpoint_metadata = CheckpointMetadata {
664            size: bytes.len(),
665            version,
666            checksum: Some(1218259706),
667            extend_metadata: HashMap::new(),
668        };
669
670        debug!(
671            "Rewrite checkpoint in path: {},  metadata: {:?}",
672            last_checkpoint_path, checkpoint_metadata
673        );
674
675        let bytes = checkpoint_metadata.encode()?;
676
677        // Overwrite the last checkpoint with the modified content
678        self.object_store
679            .write(&last_checkpoint_path, bytes.clone())
680            .await
681            .context(OpenDalSnafu)?;
682        Ok(())
683    }
684
685    /// Compute the size(Byte) in manifest size map.
686    pub(crate) fn total_manifest_size(&self) -> u64 {
687        self.manifest_size_map.read().unwrap().values().sum()
688    }
689
690    /// Resets the size of all files.
691    pub(crate) fn reset_manifest_size(&mut self) {
692        self.manifest_size_map.write().unwrap().clear();
693        self.total_manifest_size.store(0, Ordering::Relaxed);
694    }
695
696    /// Set the size of the delta file by delta version.
697    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
698        let mut m = self.manifest_size_map.write().unwrap();
699        m.insert(FileKey::Delta(version), size);
700
701        self.inc_total_manifest_size(size);
702    }
703
704    /// Set the size of the checkpoint file by checkpoint version.
705    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
706        let mut m = self.manifest_size_map.write().unwrap();
707        m.insert(FileKey::Checkpoint(version), size);
708
709        self.inc_total_manifest_size(size);
710    }
711
712    fn unset_file_size(&self, key: &FileKey) {
713        let mut m = self.manifest_size_map.write().unwrap();
714        if let Some(val) = m.remove(key) {
715            debug!("Unset file size: {:?}, size: {}", key, val);
716            self.dec_total_manifest_size(val);
717        }
718    }
719
720    fn inc_total_manifest_size(&self, val: u64) {
721        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
722    }
723
724    fn dec_total_manifest_size(&self, val: u64) {
725        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
726    }
727
728    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
729    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
730        let manifest_entries = self
731            .get_paths(
732                |entry| {
733                    let file_name = entry.name();
734                    if is_delta_file(file_name) {
735                        let version = file_version(file_name);
736                        Some((version, entry))
737                    } else {
738                        None
739                    }
740                },
741                true,
742            )
743            .await?;
744
745        let mut sorted_entries = manifest_entries;
746        Self::sort_manifests(&mut sorted_entries);
747
748        self.fetch_manifests_from_entries(sorted_entries, true)
749            .await
750    }
751
752    /// Clear all staging manifest files.
753    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
754        self.object_store
755            .remove_all(&self.staging_path)
756            .await
757            .context(OpenDalSnafu)?;
758
759        debug!(
760            "Cleared all staging manifest files from {}",
761            self.staging_path
762        );
763
764        Ok(())
765    }
766
767    /// Gets a manifest file from cache.
768    /// Returns the file data if found in cache, None otherwise.
769    /// If `is_staging` is true, always returns None.
770    async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
771        if is_staging {
772            return None;
773        }
774        let cache = self.manifest_cache.as_ref()?;
775        cache.get_file(key).await
776    }
777
778    /// Puts a manifest file into cache.
779    /// If `is_staging` is true, does nothing.
780    async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
781        if is_staging {
782            return;
783        }
784        let Some(cache) = &self.manifest_cache else {
785            return;
786        };
787
788        cache.put_file(key, data.to_vec()).await;
789    }
790
791    /// Writes data to object store and puts it into cache.
792    /// If `is_staging` is true, cache is skipped.
793    async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
794        // Clone data for cache before writing, only if cache is enabled and not staging
795        let cache_data = if !is_staging && self.manifest_cache.is_some() {
796            Some(data.clone())
797        } else {
798            None
799        };
800
801        // Write to object store
802        self.object_store
803            .write(path, data)
804            .await
805            .context(OpenDalSnafu)?;
806
807        // Put to cache if we cloned the data
808        if let Some(data) = cache_data {
809            self.put_to_cache(path.to_string(), &data, is_staging).await;
810        }
811
812        Ok(())
813    }
814
815    /// Removes a manifest file from cache.
816    async fn remove_from_cache(&self, key: &str) {
817        let Some(cache) = &self.manifest_cache else {
818            return;
819        };
820
821        cache.remove(key).await;
822    }
823}
824
825#[derive(Serialize, Deserialize, Debug)]
826pub(crate) struct CheckpointMetadata {
827    pub size: usize,
828    /// The latest version this checkpoint contains.
829    pub version: ManifestVersion,
830    pub checksum: Option<u32>,
831    pub extend_metadata: HashMap<String, String>,
832}
833
834impl CheckpointMetadata {
835    fn encode(&self) -> Result<Vec<u8>> {
836        Ok(serde_json::to_string(self)
837            .context(SerdeJsonSnafu)?
838            .into_bytes())
839    }
840
841    fn decode(bs: &[u8]) -> Result<Self> {
842        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
843
844        serde_json::from_str(data).context(SerdeJsonSnafu)
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use common_test_util::temp_dir::create_temp_dir;
851    use object_store::ObjectStore;
852    use object_store::services::Fs;
853
854    use super::*;
855
856    fn new_test_manifest_store() -> ManifestObjectStore {
857        common_telemetry::init_default_ut_logging();
858        let tmp_dir = create_temp_dir("test_manifest_log_store");
859        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
860        let object_store = ObjectStore::new(builder).unwrap().finish();
861        ManifestObjectStore::new(
862            "/",
863            object_store,
864            CompressionType::Uncompressed,
865            Default::default(),
866            None,
867        )
868    }
869
870    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
871        CheckpointMetadata {
872            size: 0,
873            version,
874            checksum: None,
875            extend_metadata: Default::default(),
876        }
877    }
878
879    #[test]
880    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
881    fn test_compress_file_path_generation() {
882        let path = "/foo/bar/";
883        let version: ManifestVersion = 0;
884        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
885        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
886    }
887
888    #[tokio::test]
889    async fn test_manifest_log_store_uncompress() {
890        let mut log_store = new_test_manifest_store();
891        log_store.compress_type = CompressionType::Uncompressed;
892        test_manifest_log_store_case(log_store).await;
893    }
894
895    #[tokio::test]
896    async fn test_manifest_log_store_compress() {
897        let mut log_store = new_test_manifest_store();
898        log_store.compress_type = CompressionType::Gzip;
899        test_manifest_log_store_case(log_store).await;
900    }
901
902    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
903        for v in 0..5 {
904            log_store
905                .save(v, format!("hello, {v}").as_bytes(), false)
906                .await
907                .unwrap();
908        }
909
910        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
911        let mut it = manifests.into_iter();
912        for v in 1..4 {
913            let (version, bytes) = it.next().unwrap();
914            assert_eq!(v, version);
915            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
916        }
917        assert!(it.next().is_none());
918
919        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
920        let mut it = manifests.into_iter();
921        for v in 0..5 {
922            let (version, bytes) = it.next().unwrap();
923            assert_eq!(v, version);
924            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
925        }
926        assert!(it.next().is_none());
927
928        // test checkpoint
929        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
930        log_store
931            .save_checkpoint(3, "checkpoint".as_bytes())
932            .await
933            .unwrap();
934
935        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
936        assert_eq!(checkpoint, "checkpoint".as_bytes());
937        assert_eq!(3, v);
938
939        //delete (,4) logs and keep checkpoint 3.
940        let _ = log_store.delete_until(4, true).await.unwrap();
941        let _ = log_store
942            .load_checkpoint(new_checkpoint_metadata_with_version(3))
943            .await
944            .unwrap()
945            .unwrap();
946        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
947        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
948        let mut it = manifests.into_iter();
949
950        let (version, bytes) = it.next().unwrap();
951        assert_eq!(4, version);
952        assert_eq!("hello, 4".as_bytes(), bytes);
953        assert!(it.next().is_none());
954
955        // delete all logs and checkpoints
956        let _ = log_store.delete_until(11, false).await.unwrap();
957        assert!(
958            log_store
959                .load_checkpoint(new_checkpoint_metadata_with_version(3))
960                .await
961                .unwrap()
962                .is_none()
963        );
964        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
965        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
966        let mut it = manifests.into_iter();
967
968        assert!(it.next().is_none());
969    }
970
971    #[tokio::test]
972    // test ManifestObjectStore can read/delete previously uncompressed data correctly
973    async fn test_compress_backward_compatible() {
974        let mut log_store = new_test_manifest_store();
975
976        // write uncompress data to stimulate previously uncompressed data
977        log_store.compress_type = CompressionType::Uncompressed;
978        for v in 0..5 {
979            log_store
980                .save(v, format!("hello, {v}").as_bytes(), false)
981                .await
982                .unwrap();
983        }
984        log_store
985            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
986            .await
987            .unwrap();
988
989        // change compress type
990        log_store.compress_type = CompressionType::Gzip;
991
992        // test load_last_checkpoint work correctly for previously uncompressed data
993        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
994        assert_eq!(v, 5);
995        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
996
997        // write compressed data to stimulate compress algorithm take effect
998        for v in 5..10 {
999            log_store
1000                .save(v, format!("hello, {v}").as_bytes(), false)
1001                .await
1002                .unwrap();
1003        }
1004        log_store
1005            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
1006            .await
1007            .unwrap();
1008
1009        // test data reading
1010        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1011        let mut it = manifests.into_iter();
1012
1013        for v in 0..10 {
1014            let (version, bytes) = it.next().unwrap();
1015            assert_eq!(v, version);
1016            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
1017        }
1018        let (v, checkpoint) = log_store
1019            .load_checkpoint(new_checkpoint_metadata_with_version(5))
1020            .await
1021            .unwrap()
1022            .unwrap();
1023        assert_eq!(v, 5);
1024        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
1025        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
1026        assert_eq!(v, 10);
1027        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
1028
1029        // Delete util 10, contain uncompressed/compressed data
1030        // log 0, 1, 2, 7, 8, 9 will be delete
1031        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
1032        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1033        let mut it = manifests.into_iter();
1034        assert!(it.next().is_none());
1035    }
1036
1037    #[tokio::test]
1038    async fn test_file_version() {
1039        let version = file_version("00000000000000000007.checkpoint");
1040        assert_eq!(version, 7);
1041
1042        let name = delta_file(version);
1043        assert_eq!(name, "00000000000000000007.json");
1044
1045        let name = checkpoint_file(version);
1046        assert_eq!(name, "00000000000000000007.checkpoint");
1047    }
1048
1049    #[tokio::test]
1050    async fn test_uncompressed_manifest_files_size() {
1051        let mut log_store = new_test_manifest_store();
1052        // write 5 manifest files with uncompressed(8B per file)
1053        log_store.compress_type = CompressionType::Uncompressed;
1054        for v in 0..5 {
1055            log_store
1056                .save(v, format!("hello, {v}").as_bytes(), false)
1057                .await
1058                .unwrap();
1059        }
1060        // write 1 checkpoint file with uncompressed(23B)
1061        log_store
1062            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
1063            .await
1064            .unwrap();
1065
1066        // manifest files size
1067        assert_eq!(log_store.total_manifest_size(), 63);
1068
1069        // delete 3 manifest files
1070        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1071
1072        // manifest files size after delete
1073        assert_eq!(log_store.total_manifest_size(), 39);
1074
1075        // delete all manifest files
1076        assert_eq!(
1077            log_store
1078                .delete_until(ManifestVersion::MAX, false)
1079                .await
1080                .unwrap(),
1081            3
1082        );
1083
1084        assert_eq!(log_store.total_manifest_size(), 0);
1085    }
1086
1087    #[tokio::test]
1088    async fn test_compressed_manifest_files_size() {
1089        let mut log_store = new_test_manifest_store();
1090        // Test with compressed manifest files
1091        log_store.compress_type = CompressionType::Gzip;
1092        // write 5 manifest files
1093        for v in 0..5 {
1094            log_store
1095                .save(v, format!("hello, {v}").as_bytes(), false)
1096                .await
1097                .unwrap();
1098        }
1099        log_store
1100            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
1101            .await
1102            .unwrap();
1103
1104        // manifest files size
1105        assert_eq!(log_store.total_manifest_size(), 181);
1106
1107        // delete 3 manifest files
1108        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1109
1110        // manifest files size after delete
1111        assert_eq!(log_store.total_manifest_size(), 97);
1112
1113        // delete all manifest files
1114        assert_eq!(
1115            log_store
1116                .delete_until(ManifestVersion::MAX, false)
1117                .await
1118                .unwrap(),
1119            3
1120        );
1121
1122        assert_eq!(log_store.total_manifest_size(), 0);
1123    }
1124}