mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::cache::manifest_cache::ManifestCache;
37use crate::error::{
38    ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
39    OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
40};
41
42lazy_static! {
43    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
44    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
45}
46
47const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
48const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
49/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
50/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
51const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
52const FETCH_MANIFEST_PARALLELISM: usize = 16;
53
54/// Returns the directory to the manifest files.
55pub fn manifest_dir(region_dir: &str) -> String {
56    join_dir(region_dir, "manifest")
57}
58
59/// Returns the [CompressionType] according to whether to compress manifest files.
60pub const fn manifest_compress_type(compress: bool) -> CompressionType {
61    if compress {
62        DEFAULT_MANIFEST_COMPRESSION_TYPE
63    } else {
64        FALL_BACK_COMPRESS_TYPE
65    }
66}
67
68pub fn delta_file(version: ManifestVersion) -> String {
69    format!("{version:020}.json")
70}
71
72pub fn checkpoint_file(version: ManifestVersion) -> String {
73    format!("{version:020}.checkpoint")
74}
75
76pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
77    if compress_type == CompressionType::Uncompressed {
78        format!("{}{}", path, file)
79    } else {
80        format!("{}{}.{}", path, file, compress_type.file_extension())
81    }
82}
83
84fn checkpoint_checksum(data: &[u8]) -> u32 {
85    let mut hasher = Hasher::new();
86    hasher.update(data);
87    hasher.finalize()
88}
89
90fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
91    if let Some(checksum) = wanted {
92        let calculated_checksum = checkpoint_checksum(data);
93        ensure!(
94            checksum == calculated_checksum,
95            ChecksumMismatchSnafu {
96                actual: calculated_checksum,
97                expected: checksum,
98            }
99        );
100    }
101    Ok(())
102}
103
104/// Return's the file manifest version from path
105///
106/// # Panics
107/// If the file path is not a valid delta or checkpoint file.
108pub fn file_version(path: &str) -> ManifestVersion {
109    let s = path.split('.').next().unwrap();
110    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
111}
112
113/// Return's the file compress algorithm by file extension.
114///
115/// for example file
116/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
117pub fn file_compress_type(path: &str) -> CompressionType {
118    let s = path.rsplit('.').next().unwrap_or("");
119    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
120}
121
122pub fn is_delta_file(file_name: &str) -> bool {
123    DELTA_RE.is_match(file_name)
124}
125
126pub fn is_checkpoint_file(file_name: &str) -> bool {
127    CHECKPOINT_RE.is_match(file_name)
128}
129
130/// Key to identify a manifest file.
131#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
132enum FileKey {
133    /// A delta file (`.json`).
134    Delta(ManifestVersion),
135    /// A checkpoint file (`.checkpoint`).
136    Checkpoint(ManifestVersion),
137}
138
139#[derive(Clone, Debug)]
140pub struct ManifestObjectStore {
141    object_store: ObjectStore,
142    compress_type: CompressionType,
143    path: String,
144    staging_path: String,
145    /// Stores the size of each manifest file.
146    manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
147    total_manifest_size: Arc<AtomicU64>,
148    /// Optional manifest cache for local caching.
149    manifest_cache: Option<ManifestCache>,
150}
151
152impl ManifestObjectStore {
153    pub fn new(
154        path: &str,
155        object_store: ObjectStore,
156        compress_type: CompressionType,
157        total_manifest_size: Arc<AtomicU64>,
158        manifest_cache: Option<ManifestCache>,
159    ) -> Self {
160        common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
161
162        let path = util::normalize_dir(path);
163        let staging_path = {
164            // Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
165            let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
166            util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
167        };
168        Self {
169            object_store,
170            compress_type,
171            path,
172            staging_path,
173            manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
174            total_manifest_size,
175            manifest_cache,
176        }
177    }
178
179    /// Returns the delta file path under the **current** compression algorithm
180    fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
181        let base_path = if is_staging {
182            &self.staging_path
183        } else {
184            &self.path
185        };
186        gen_path(base_path, &delta_file(version), self.compress_type)
187    }
188
189    /// Returns the checkpoint file path under the **current** compression algorithm
190    fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
191        gen_path(&self.path, &checkpoint_file(version), self.compress_type)
192    }
193
194    /// Returns the last checkpoint path, because the last checkpoint is not compressed,
195    /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
196    pub(crate) fn last_checkpoint_path(&self) -> String {
197        format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
198    }
199
200    /// Returns the manifest dir
201    pub(crate) fn manifest_dir(&self) -> &str {
202        &self.path
203    }
204
205    /// Returns an iterator of manifests from normal or staging directory.
206    pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
207        let path = if is_staging {
208            &self.staging_path
209        } else {
210            &self.path
211        };
212        match self.object_store.lister_with(path).await {
213            Ok(streamer) => Ok(Some(streamer)),
214            Err(e) if e.kind() == ErrorKind::NotFound => {
215                debug!("Manifest directory does not exist: {}", path);
216                Ok(None)
217            }
218            Err(e) => Err(e).context(OpenDalSnafu)?,
219        }
220    }
221
222    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
223    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
224    /// Return an empty vector when directory is not found.
225    pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
226    where
227        F: Fn(Entry) -> Option<R>,
228    {
229        let Some(streamer) = self.manifest_lister(is_staging).await? else {
230            return Ok(vec![]);
231        };
232
233        streamer
234            .try_filter_map(|e| async { Ok(filter(e)) })
235            .try_collect::<Vec<_>>()
236            .await
237            .context(OpenDalSnafu)
238    }
239
240    /// Sorts the manifest files.
241    fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
242        entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
243    }
244
245    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
246    pub async fn scan(
247        &self,
248        start: ManifestVersion,
249        end: ManifestVersion,
250    ) -> Result<Vec<(ManifestVersion, Entry)>> {
251        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
252
253        let mut entries: Vec<(ManifestVersion, Entry)> = self
254            .get_paths(
255                |entry| {
256                    let file_name = entry.name();
257                    if is_delta_file(file_name) {
258                        let version = file_version(file_name);
259                        if start <= version && version < end {
260                            return Some((version, entry));
261                        }
262                    }
263                    None
264                },
265                false,
266            )
267            .await?;
268
269        Self::sort_manifests(&mut entries);
270
271        Ok(entries)
272    }
273
274    /// Fetches manifests in range [start_version, end_version).
275    ///
276    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
277    pub async fn fetch_manifests_strict_from(
278        &self,
279        start_version: ManifestVersion,
280        end_version: ManifestVersion,
281        region_id: RegionId,
282    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
283        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
284        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
285        debug!(
286            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
287            start_version,
288            end_version,
289            start_index,
290            region_id,
291            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
292        );
293        if let Some(start_index) = start_index {
294            Ok(manifests.split_off(start_index))
295        } else {
296            Ok(vec![])
297        }
298    }
299
300    /// Common implementation for fetching manifests from entries in parallel.
301    /// If `is_staging` is true, cache is skipped.
302    async fn fetch_manifests_from_entries(
303        &self,
304        entries: Vec<(ManifestVersion, Entry)>,
305        is_staging: bool,
306    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
307        if entries.is_empty() {
308            return Ok(vec![]);
309        }
310
311        // TODO(weny): Make it configurable.
312        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
313
314        let tasks = entries.iter().map(|(v, entry)| async {
315            // Safety: semaphore must exist.
316            let _permit = semaphore.acquire().await.unwrap();
317
318            let cache_key = entry.path();
319            // Try to get from cache first
320            if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
321                return Ok((*v, data));
322            }
323
324            // Fetch from remote object store
325            let compress_type = file_compress_type(entry.name());
326            let bytes = self
327                .object_store
328                .read(entry.path())
329                .await
330                .context(OpenDalSnafu)?;
331            let data = compress_type
332                .decode(bytes)
333                .await
334                .context(DecompressObjectSnafu {
335                    compress_type,
336                    path: entry.path(),
337                })?;
338
339            // Add to cache
340            self.put_to_cache(cache_key.to_string(), &data, is_staging)
341                .await;
342
343            Ok((*v, data))
344        });
345
346        try_join_all(tasks).await
347    }
348
349    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
350    ///
351    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
352    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
353    pub async fn fetch_manifests(
354        &self,
355        start_version: ManifestVersion,
356        end_version: ManifestVersion,
357    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
358        let manifests = self.scan(start_version, end_version).await?;
359        self.fetch_manifests_from_entries(manifests, false).await
360    }
361
362    /// Delete manifest files that version < end.
363    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
364    /// ### Return
365    /// The number of deleted files.
366    pub async fn delete_until(
367        &self,
368        end: ManifestVersion,
369        keep_last_checkpoint: bool,
370    ) -> Result<usize> {
371        // Stores (entry, is_checkpoint, version) in a Vec.
372        let entries: Vec<_> = self
373            .get_paths(
374                |entry| {
375                    let file_name = entry.name();
376                    let is_checkpoint = is_checkpoint_file(file_name);
377                    if is_delta_file(file_name) || is_checkpoint_file(file_name) {
378                        let version = file_version(file_name);
379                        if version < end {
380                            return Some((entry, is_checkpoint, version));
381                        }
382                    }
383                    None
384                },
385                false,
386            )
387            .await?;
388        let checkpoint_version = if keep_last_checkpoint {
389            // Note that the order of entries is unspecific.
390            entries
391                .iter()
392                .filter_map(
393                    |(_e, is_checkpoint, version)| {
394                        if *is_checkpoint { Some(version) } else { None }
395                    },
396                )
397                .max()
398        } else {
399            None
400        };
401        let del_entries: Vec<_> = entries
402            .iter()
403            .filter(|(_e, is_checkpoint, version)| {
404                if let Some(max_version) = checkpoint_version {
405                    if *is_checkpoint {
406                        // We need to keep the checkpoint file.
407                        version < max_version
408                    } else {
409                        // We can delete the log file with max_version as the checkpoint
410                        // file contains the log file's content.
411                        version <= max_version
412                    }
413                } else {
414                    true
415                }
416            })
417            .collect();
418        let paths = del_entries
419            .iter()
420            .map(|(e, _, _)| e.path().to_string())
421            .collect::<Vec<_>>();
422        let ret = paths.len();
423
424        debug!(
425            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
426            ret, self.path, end, checkpoint_version, paths,
427        );
428
429        // Remove from cache first
430        for (entry, _, _) in &del_entries {
431            self.remove_from_cache(entry.path()).await;
432        }
433
434        self.object_store
435            .delete_iter(paths)
436            .await
437            .context(OpenDalSnafu)?;
438
439        // delete manifest sizes
440        for (_, is_checkpoint, version) in &del_entries {
441            if *is_checkpoint {
442                self.unset_file_size(&FileKey::Checkpoint(*version));
443            } else {
444                self.unset_file_size(&FileKey::Delta(*version));
445            }
446        }
447
448        Ok(ret)
449    }
450
451    /// Save the delta manifest file.
452    pub async fn save(
453        &mut self,
454        version: ManifestVersion,
455        bytes: &[u8],
456        is_staging: bool,
457    ) -> Result<()> {
458        let path = self.delta_file_path(version, is_staging);
459        debug!("Save log to manifest storage, version: {}", version);
460        let data = self
461            .compress_type
462            .encode(bytes)
463            .await
464            .context(CompressObjectSnafu {
465                compress_type: self.compress_type,
466                path: &path,
467            })?;
468        let delta_size = data.len();
469
470        self.write_and_put_cache(&path, data, is_staging).await?;
471        self.set_delta_file_size(version, delta_size as u64);
472
473        Ok(())
474    }
475
476    /// Save the checkpoint manifest file.
477    pub(crate) async fn save_checkpoint(
478        &self,
479        version: ManifestVersion,
480        bytes: &[u8],
481    ) -> Result<()> {
482        let path = self.checkpoint_file_path(version);
483        let data = self
484            .compress_type
485            .encode(bytes)
486            .await
487            .context(CompressObjectSnafu {
488                compress_type: self.compress_type,
489                path: &path,
490            })?;
491        let checkpoint_size = data.len();
492        let checksum = checkpoint_checksum(bytes);
493
494        self.write_and_put_cache(&path, data, false).await?;
495        self.set_checkpoint_file_size(version, checkpoint_size as u64);
496
497        // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
498        let last_checkpoint_path = self.last_checkpoint_path();
499
500        let checkpoint_metadata = CheckpointMetadata {
501            size: bytes.len(),
502            version,
503            checksum: Some(checksum),
504            extend_metadata: HashMap::new(),
505        };
506
507        debug!(
508            "Save checkpoint in path: {},  metadata: {:?}",
509            last_checkpoint_path, checkpoint_metadata
510        );
511
512        let bytes = checkpoint_metadata.encode()?;
513        self.object_store
514            .write(&last_checkpoint_path, bytes)
515            .await
516            .context(OpenDalSnafu)?;
517
518        Ok(())
519    }
520
521    async fn load_checkpoint(
522        &mut self,
523        metadata: CheckpointMetadata,
524    ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
525        let version = metadata.version;
526        let path = self.checkpoint_file_path(version);
527
528        // Try to get from cache first
529        if let Some(data) = self.get_from_cache(&path, false).await {
530            verify_checksum(&data, metadata.checksum)?;
531            return Ok(Some((version, data)));
532        }
533
534        // Due to backward compatibility, it is possible that the user's checkpoint not compressed,
535        // so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
536        let checkpoint_data = match self.object_store.read(&path).await {
537            Ok(checkpoint) => {
538                let checkpoint_size = checkpoint.len();
539                let decompress_data =
540                    self.compress_type
541                        .decode(checkpoint)
542                        .await
543                        .with_context(|_| DecompressObjectSnafu {
544                            compress_type: self.compress_type,
545                            path: path.clone(),
546                        })?;
547                verify_checksum(&decompress_data, metadata.checksum)?;
548                // set the checkpoint size
549                self.set_checkpoint_file_size(version, checkpoint_size as u64);
550                // Add to cache
551                self.put_to_cache(path, &decompress_data, false).await;
552                Ok(Some(decompress_data))
553            }
554            Err(e) => {
555                if e.kind() == ErrorKind::NotFound {
556                    if self.compress_type != FALL_BACK_COMPRESS_TYPE {
557                        let fall_back_path = gen_path(
558                            &self.path,
559                            &checkpoint_file(version),
560                            FALL_BACK_COMPRESS_TYPE,
561                        );
562                        debug!(
563                            "Failed to load checkpoint from path: {}, fall back to path: {}",
564                            path, fall_back_path
565                        );
566
567                        // Try to get fallback from cache first
568                        if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
569                            verify_checksum(&data, metadata.checksum)?;
570                            return Ok(Some((version, data)));
571                        }
572
573                        match self.object_store.read(&fall_back_path).await {
574                            Ok(checkpoint) => {
575                                let checkpoint_size = checkpoint.len();
576                                let decompress_data = FALL_BACK_COMPRESS_TYPE
577                                    .decode(checkpoint)
578                                    .await
579                                    .with_context(|_| DecompressObjectSnafu {
580                                        compress_type: FALL_BACK_COMPRESS_TYPE,
581                                        path: fall_back_path.clone(),
582                                    })?;
583                                verify_checksum(&decompress_data, metadata.checksum)?;
584                                self.set_checkpoint_file_size(version, checkpoint_size as u64);
585                                // Add fallback to cache
586                                self.put_to_cache(fall_back_path, &decompress_data, false)
587                                    .await;
588                                Ok(Some(decompress_data))
589                            }
590                            Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
591                            Err(e) => Err(e).context(OpenDalSnafu),
592                        }
593                    } else {
594                        Ok(None)
595                    }
596                } else {
597                    Err(e).context(OpenDalSnafu)
598                }
599            }
600        }?;
601        Ok(checkpoint_data.map(|data| (version, data)))
602    }
603
604    /// Load the latest checkpoint.
605    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
606    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
607        let last_checkpoint_path = self.last_checkpoint_path();
608
609        // Fetch from remote object store without cache
610        let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
611            Ok(data) => data.to_vec(),
612            Err(e) if e.kind() == ErrorKind::NotFound => {
613                return Ok(None);
614            }
615            Err(e) => {
616                return Err(e).context(OpenDalSnafu)?;
617            }
618        };
619
620        let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
621
622        debug!(
623            "Load checkpoint in path: {}, metadata: {:?}",
624            last_checkpoint_path, checkpoint_metadata
625        );
626
627        self.load_checkpoint(checkpoint_metadata).await
628    }
629
630    #[cfg(test)]
631    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
632        self.object_store
633            .read(path)
634            .await
635            .context(OpenDalSnafu)
636            .map(|v| v.to_vec())
637    }
638
639    #[cfg(test)]
640    pub async fn write_last_checkpoint(
641        &mut self,
642        version: ManifestVersion,
643        bytes: &[u8],
644    ) -> Result<()> {
645        let path = self.checkpoint_file_path(version);
646        let data = self
647            .compress_type
648            .encode(bytes)
649            .await
650            .context(CompressObjectSnafu {
651                compress_type: self.compress_type,
652                path: &path,
653            })?;
654
655        let checkpoint_size = data.len();
656
657        self.object_store
658            .write(&path, data)
659            .await
660            .context(OpenDalSnafu)?;
661
662        self.set_checkpoint_file_size(version, checkpoint_size as u64);
663
664        let last_checkpoint_path = self.last_checkpoint_path();
665        let checkpoint_metadata = CheckpointMetadata {
666            size: bytes.len(),
667            version,
668            checksum: Some(1218259706),
669            extend_metadata: HashMap::new(),
670        };
671
672        debug!(
673            "Rewrite checkpoint in path: {},  metadata: {:?}",
674            last_checkpoint_path, checkpoint_metadata
675        );
676
677        let bytes = checkpoint_metadata.encode()?;
678
679        // Overwrite the last checkpoint with the modified content
680        self.object_store
681            .write(&last_checkpoint_path, bytes.clone())
682            .await
683            .context(OpenDalSnafu)?;
684        Ok(())
685    }
686
687    /// Compute the size(Byte) in manifest size map.
688    pub(crate) fn total_manifest_size(&self) -> u64 {
689        self.manifest_size_map.read().unwrap().values().sum()
690    }
691
692    /// Resets the size of all files.
693    pub(crate) fn reset_manifest_size(&mut self) {
694        self.manifest_size_map.write().unwrap().clear();
695        self.total_manifest_size.store(0, Ordering::Relaxed);
696    }
697
698    /// Set the size of the delta file by delta version.
699    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
700        let mut m = self.manifest_size_map.write().unwrap();
701        m.insert(FileKey::Delta(version), size);
702
703        self.inc_total_manifest_size(size);
704    }
705
706    /// Set the size of the checkpoint file by checkpoint version.
707    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
708        let mut m = self.manifest_size_map.write().unwrap();
709        m.insert(FileKey::Checkpoint(version), size);
710
711        self.inc_total_manifest_size(size);
712    }
713
714    fn unset_file_size(&self, key: &FileKey) {
715        let mut m = self.manifest_size_map.write().unwrap();
716        if let Some(val) = m.remove(key) {
717            debug!("Unset file size: {:?}, size: {}", key, val);
718            self.dec_total_manifest_size(val);
719        }
720    }
721
722    fn inc_total_manifest_size(&self, val: u64) {
723        self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
724    }
725
726    fn dec_total_manifest_size(&self, val: u64) {
727        self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
728    }
729
730    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
731    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
732        let manifest_entries = self
733            .get_paths(
734                |entry| {
735                    let file_name = entry.name();
736                    if is_delta_file(file_name) {
737                        let version = file_version(file_name);
738                        Some((version, entry))
739                    } else {
740                        None
741                    }
742                },
743                true,
744            )
745            .await?;
746
747        let mut sorted_entries = manifest_entries;
748        Self::sort_manifests(&mut sorted_entries);
749
750        self.fetch_manifests_from_entries(sorted_entries, true)
751            .await
752    }
753
754    /// Clear all staging manifest files.
755    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
756        self.object_store
757            .remove_all(&self.staging_path)
758            .await
759            .context(OpenDalSnafu)?;
760
761        debug!(
762            "Cleared all staging manifest files from {}",
763            self.staging_path
764        );
765
766        Ok(())
767    }
768
769    /// Gets a manifest file from cache.
770    /// Returns the file data if found in cache, None otherwise.
771    /// If `is_staging` is true, always returns None.
772    async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
773        if is_staging {
774            return None;
775        }
776        let cache = self.manifest_cache.as_ref()?;
777        cache.get_file(key).await
778    }
779
780    /// Puts a manifest file into cache.
781    /// If `is_staging` is true, does nothing.
782    async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
783        if is_staging {
784            return;
785        }
786        let Some(cache) = &self.manifest_cache else {
787            return;
788        };
789
790        cache.put_file(key, data.to_vec()).await;
791    }
792
793    /// Writes data to object store and puts it into cache.
794    /// If `is_staging` is true, cache is skipped.
795    async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
796        // Clone data for cache before writing, only if cache is enabled and not staging
797        let cache_data = if !is_staging && self.manifest_cache.is_some() {
798            Some(data.clone())
799        } else {
800            None
801        };
802
803        // Write to object store
804        self.object_store
805            .write(path, data)
806            .await
807            .context(OpenDalSnafu)?;
808
809        // Put to cache if we cloned the data
810        if let Some(data) = cache_data {
811            self.put_to_cache(path.to_string(), &data, is_staging).await;
812        }
813
814        Ok(())
815    }
816
817    /// Removes a manifest file from cache.
818    async fn remove_from_cache(&self, key: &str) {
819        let Some(cache) = &self.manifest_cache else {
820            return;
821        };
822
823        cache.remove(key).await;
824    }
825}
826
827#[derive(Serialize, Deserialize, Debug)]
828pub(crate) struct CheckpointMetadata {
829    pub size: usize,
830    /// The latest version this checkpoint contains.
831    pub version: ManifestVersion,
832    pub checksum: Option<u32>,
833    pub extend_metadata: HashMap<String, String>,
834}
835
836impl CheckpointMetadata {
837    fn encode(&self) -> Result<Vec<u8>> {
838        Ok(serde_json::to_string(self)
839            .context(SerdeJsonSnafu)?
840            .into_bytes())
841    }
842
843    fn decode(bs: &[u8]) -> Result<Self> {
844        let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
845
846        serde_json::from_str(data).context(SerdeJsonSnafu)
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use common_test_util::temp_dir::create_temp_dir;
853    use object_store::ObjectStore;
854    use object_store::services::Fs;
855
856    use super::*;
857
858    fn new_test_manifest_store() -> ManifestObjectStore {
859        common_telemetry::init_default_ut_logging();
860        let tmp_dir = create_temp_dir("test_manifest_log_store");
861        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
862        let object_store = ObjectStore::new(builder).unwrap().finish();
863        ManifestObjectStore::new(
864            "/",
865            object_store,
866            CompressionType::Uncompressed,
867            Default::default(),
868            None,
869        )
870    }
871
872    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
873        CheckpointMetadata {
874            size: 0,
875            version,
876            checksum: None,
877            extend_metadata: Default::default(),
878        }
879    }
880
881    #[test]
882    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
883    fn test_compress_file_path_generation() {
884        let path = "/foo/bar/";
885        let version: ManifestVersion = 0;
886        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
887        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
888    }
889
890    #[tokio::test]
891    async fn test_manifest_log_store_uncompress() {
892        let mut log_store = new_test_manifest_store();
893        log_store.compress_type = CompressionType::Uncompressed;
894        test_manifest_log_store_case(log_store).await;
895    }
896
897    #[tokio::test]
898    async fn test_manifest_log_store_compress() {
899        let mut log_store = new_test_manifest_store();
900        log_store.compress_type = CompressionType::Gzip;
901        test_manifest_log_store_case(log_store).await;
902    }
903
904    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
905        for v in 0..5 {
906            log_store
907                .save(v, format!("hello, {v}").as_bytes(), false)
908                .await
909                .unwrap();
910        }
911
912        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
913        let mut it = manifests.into_iter();
914        for v in 1..4 {
915            let (version, bytes) = it.next().unwrap();
916            assert_eq!(v, version);
917            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
918        }
919        assert!(it.next().is_none());
920
921        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
922        let mut it = manifests.into_iter();
923        for v in 0..5 {
924            let (version, bytes) = it.next().unwrap();
925            assert_eq!(v, version);
926            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
927        }
928        assert!(it.next().is_none());
929
930        // test checkpoint
931        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
932        log_store
933            .save_checkpoint(3, "checkpoint".as_bytes())
934            .await
935            .unwrap();
936
937        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
938        assert_eq!(checkpoint, "checkpoint".as_bytes());
939        assert_eq!(3, v);
940
941        //delete (,4) logs and keep checkpoint 3.
942        let _ = log_store.delete_until(4, true).await.unwrap();
943        let _ = log_store
944            .load_checkpoint(new_checkpoint_metadata_with_version(3))
945            .await
946            .unwrap()
947            .unwrap();
948        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
949        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
950        let mut it = manifests.into_iter();
951
952        let (version, bytes) = it.next().unwrap();
953        assert_eq!(4, version);
954        assert_eq!("hello, 4".as_bytes(), bytes);
955        assert!(it.next().is_none());
956
957        // delete all logs and checkpoints
958        let _ = log_store.delete_until(11, false).await.unwrap();
959        assert!(
960            log_store
961                .load_checkpoint(new_checkpoint_metadata_with_version(3))
962                .await
963                .unwrap()
964                .is_none()
965        );
966        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
967        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
968        let mut it = manifests.into_iter();
969
970        assert!(it.next().is_none());
971    }
972
973    #[tokio::test]
974    // test ManifestObjectStore can read/delete previously uncompressed data correctly
975    async fn test_compress_backward_compatible() {
976        let mut log_store = new_test_manifest_store();
977
978        // write uncompress data to stimulate previously uncompressed data
979        log_store.compress_type = CompressionType::Uncompressed;
980        for v in 0..5 {
981            log_store
982                .save(v, format!("hello, {v}").as_bytes(), false)
983                .await
984                .unwrap();
985        }
986        log_store
987            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
988            .await
989            .unwrap();
990
991        // change compress type
992        log_store.compress_type = CompressionType::Gzip;
993
994        // test load_last_checkpoint work correctly for previously uncompressed data
995        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
996        assert_eq!(v, 5);
997        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
998
999        // write compressed data to stimulate compress algorithm take effect
1000        for v in 5..10 {
1001            log_store
1002                .save(v, format!("hello, {v}").as_bytes(), false)
1003                .await
1004                .unwrap();
1005        }
1006        log_store
1007            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
1008            .await
1009            .unwrap();
1010
1011        // test data reading
1012        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1013        let mut it = manifests.into_iter();
1014
1015        for v in 0..10 {
1016            let (version, bytes) = it.next().unwrap();
1017            assert_eq!(v, version);
1018            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
1019        }
1020        let (v, checkpoint) = log_store
1021            .load_checkpoint(new_checkpoint_metadata_with_version(5))
1022            .await
1023            .unwrap()
1024            .unwrap();
1025        assert_eq!(v, 5);
1026        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
1027        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
1028        assert_eq!(v, 10);
1029        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
1030
1031        // Delete util 10, contain uncompressed/compressed data
1032        // log 0, 1, 2, 7, 8, 9 will be delete
1033        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
1034        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1035        let mut it = manifests.into_iter();
1036        assert!(it.next().is_none());
1037    }
1038
1039    #[tokio::test]
1040    async fn test_file_version() {
1041        let version = file_version("00000000000000000007.checkpoint");
1042        assert_eq!(version, 7);
1043
1044        let name = delta_file(version);
1045        assert_eq!(name, "00000000000000000007.json");
1046
1047        let name = checkpoint_file(version);
1048        assert_eq!(name, "00000000000000000007.checkpoint");
1049    }
1050
1051    #[tokio::test]
1052    async fn test_uncompressed_manifest_files_size() {
1053        let mut log_store = new_test_manifest_store();
1054        // write 5 manifest files with uncompressed(8B per file)
1055        log_store.compress_type = CompressionType::Uncompressed;
1056        for v in 0..5 {
1057            log_store
1058                .save(v, format!("hello, {v}").as_bytes(), false)
1059                .await
1060                .unwrap();
1061        }
1062        // write 1 checkpoint file with uncompressed(23B)
1063        log_store
1064            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
1065            .await
1066            .unwrap();
1067
1068        // manifest files size
1069        assert_eq!(log_store.total_manifest_size(), 63);
1070
1071        // delete 3 manifest files
1072        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1073
1074        // manifest files size after delete
1075        assert_eq!(log_store.total_manifest_size(), 39);
1076
1077        // delete all manifest files
1078        assert_eq!(
1079            log_store
1080                .delete_until(ManifestVersion::MAX, false)
1081                .await
1082                .unwrap(),
1083            3
1084        );
1085
1086        assert_eq!(log_store.total_manifest_size(), 0);
1087    }
1088
1089    #[tokio::test]
1090    async fn test_compressed_manifest_files_size() {
1091        let mut log_store = new_test_manifest_store();
1092        // Test with compressed manifest files
1093        log_store.compress_type = CompressionType::Gzip;
1094        // write 5 manifest files
1095        for v in 0..5 {
1096            log_store
1097                .save(v, format!("hello, {v}").as_bytes(), false)
1098                .await
1099                .unwrap();
1100        }
1101        log_store
1102            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
1103            .await
1104            .unwrap();
1105
1106        // manifest files size
1107        assert_eq!(log_store.total_manifest_size(), 181);
1108
1109        // delete 3 manifest files
1110        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1111
1112        // manifest files size after delete
1113        assert_eq!(log_store.total_manifest_size(), 97);
1114
1115        // delete all manifest files
1116        assert_eq!(
1117            log_store
1118                .delete_until(ManifestVersion::MAX, false)
1119                .await
1120                .unwrap(),
1121            3
1122        );
1123
1124        assert_eq!(log_store.total_manifest_size(), 0);
1125    }
1126}