Skip to main content

mito2/manifest/
storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod checkpoint;
16pub(crate) mod delta;
17pub(crate) mod size_tracker;
18pub(crate) mod staging;
19pub(crate) mod utils;
20
21use std::iter::Iterator;
22use std::str::FromStr;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use common_datasource::compression::CompressionType;
27use common_telemetry::{debug, warn};
28use crc32fast::Hasher;
29use lazy_static::lazy_static;
30use object_store::util::join_dir;
31use object_store::{Lister, ObjectStore, util};
32use regex::Regex;
33#[cfg(test)]
34use snafu::ResultExt;
35use snafu::ensure;
36use store_api::ManifestVersion;
37use store_api::storage::RegionId;
38
39use crate::cache::manifest_cache::ManifestCache;
40#[cfg(test)]
41use crate::error::OpenDalSnafu;
42use crate::error::{ChecksumMismatchSnafu, Result};
43use crate::manifest::storage::checkpoint::CheckpointStorage;
44use crate::manifest::storage::delta::DeltaStorage;
45use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
46use crate::manifest::storage::staging::StagingStorage;
47use crate::manifest::storage::utils::remove_from_cache;
48
49lazy_static! {
50    static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
51    static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
52}
53
54pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
55const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
56/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
57/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
58pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
59const FETCH_MANIFEST_PARALLELISM: usize = 16;
60
61/// Returns the directory to the manifest files.
62pub fn manifest_dir(region_dir: &str) -> String {
63    join_dir(region_dir, "manifest")
64}
65
66/// Returns the [CompressionType] according to whether to compress manifest files.
67pub const fn manifest_compress_type(compress: bool) -> CompressionType {
68    if compress {
69        DEFAULT_MANIFEST_COMPRESSION_TYPE
70    } else {
71        FALL_BACK_COMPRESS_TYPE
72    }
73}
74
75pub fn delta_file(version: ManifestVersion) -> String {
76    format!("{version:020}.json")
77}
78
79pub fn checkpoint_file(version: ManifestVersion) -> String {
80    format!("{version:020}.checkpoint")
81}
82
83/// Returns a lexicographic `start_after` key for an object-store `list`
84/// request over the manifest directory at `path`.
85///
86/// `path` must be the same directory prefix passed to `lister_with(path)`
87/// and must end with `/`. OpenDAL resolves `start_after` against the
88/// operator root, not relative to the listed path, so the caller must
89/// supply the full prefix — otherwise the bound is compared against keys
90/// that already share a longer prefix and is silently a no-op.
91pub(crate) fn list_start_after(path: &str, version: ManifestVersion) -> String {
92    debug_assert!(
93        path.ends_with('/'),
94        "list_start_after: path must end with '/', got {path:?}",
95    );
96    // Manifest files are named `{version:020}.{json,checkpoint}[.gz]` and sort lexicographically;
97    // `{path}{version:020}` is a strict prefix of `{path}{version:020}.{json,checkpoint}[.gz]`.
98    format!("{path}{version:020}")
99}
100
101pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
102    if compress_type == CompressionType::Uncompressed {
103        format!("{}{}", path, file)
104    } else {
105        format!("{}{}.{}", path, file, compress_type.file_extension())
106    }
107}
108
109pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
110    let mut hasher = Hasher::new();
111    hasher.update(data);
112    hasher.finalize()
113}
114
115pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
116    if let Some(checksum) = wanted {
117        let calculated_checksum = checkpoint_checksum(data);
118        ensure!(
119            checksum == calculated_checksum,
120            ChecksumMismatchSnafu {
121                actual: calculated_checksum,
122                expected: checksum,
123            }
124        );
125    }
126    Ok(())
127}
128
129/// Return's the file manifest version from path
130///
131/// # Panics
132/// If the file path is not a valid delta or checkpoint file.
133pub fn file_version(path: &str) -> ManifestVersion {
134    let s = path.split('.').next().unwrap();
135    s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
136}
137
138/// Return's the file compress algorithm by file extension.
139///
140/// for example file
141/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
142pub fn file_compress_type(path: &str) -> CompressionType {
143    let s = path.rsplit('.').next().unwrap_or("");
144    CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
145}
146
147pub fn is_delta_file(file_name: &str) -> bool {
148    DELTA_RE.is_match(file_name)
149}
150
151pub fn is_checkpoint_file(file_name: &str) -> bool {
152    CHECKPOINT_RE.is_match(file_name)
153}
154
155#[derive(Clone, Debug)]
156pub struct ManifestObjectStore {
157    object_store: ObjectStore,
158    path: String,
159    /// Optional manifest cache for local caching.
160    manifest_cache: Option<ManifestCache>,
161    // Tracks the size of each file in the manifest directory.
162    size_tracker: SizeTracker,
163    // The checkpoint file storage.
164    checkpoint_storage: CheckpointStorage<CheckpointTracker>,
165    // The delta file storage.
166    delta_storage: DeltaStorage<DeltaTracker>,
167    /// The staging file storage.
168    staging_storage: StagingStorage,
169}
170
171impl ManifestObjectStore {
172    pub fn new(
173        path: &str,
174        object_store: ObjectStore,
175        compress_type: CompressionType,
176        total_manifest_size: Arc<AtomicU64>,
177        manifest_cache: Option<ManifestCache>,
178    ) -> Self {
179        common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
180
181        let path = util::normalize_dir(path);
182        let size_tracker = SizeTracker::new(total_manifest_size);
183        let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
184        let delta_tracker = Arc::new(size_tracker.manifest_tracker());
185        let checkpoint_storage = CheckpointStorage::new(
186            path.clone(),
187            object_store.clone(),
188            compress_type,
189            manifest_cache.clone(),
190            checkpoint_tracker,
191        );
192        let delta_storage = DeltaStorage::new(
193            path.clone(),
194            object_store.clone(),
195            compress_type,
196            manifest_cache.clone(),
197            delta_tracker,
198        );
199        let staging_storage =
200            StagingStorage::new(path.clone(), object_store.clone(), compress_type);
201
202        Self {
203            object_store,
204            path,
205            manifest_cache,
206            size_tracker,
207            checkpoint_storage,
208            delta_storage,
209            staging_storage,
210        }
211    }
212
213    /// Returns the manifest dir
214    pub(crate) fn manifest_dir(&self) -> &str {
215        &self.path
216    }
217
218    /// Returns an iterator of manifests from normal or staging directory.
219    ///
220    /// `start_after` is forwarded to the non-staging lister to skip entries
221    /// whose name is lexicographically less than or equal to it. It is
222    /// ignored for the staging directory.
223    pub(crate) async fn manifest_lister(
224        &self,
225        is_staging: bool,
226        start_after: Option<&str>,
227    ) -> Result<Option<Lister>> {
228        if is_staging {
229            self.staging_storage.manifest_lister().await
230        } else {
231            self.delta_storage.manifest_lister(start_after).await
232        }
233    }
234
235    /// Fetches manifests in range [start_version, end_version).
236    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
237    pub async fn fetch_manifests_strict_from(
238        &self,
239        start_version: ManifestVersion,
240        end_version: ManifestVersion,
241        region_id: RegionId,
242    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
243        self.delta_storage
244            .fetch_manifests_strict_from(start_version, end_version, region_id)
245            .await
246    }
247
248    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
249    ///
250    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
251    /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
252    pub async fn fetch_manifests(
253        &self,
254        start_version: ManifestVersion,
255        end_version: ManifestVersion,
256    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
257        self.delta_storage
258            .fetch_manifests(start_version, end_version)
259            .await
260    }
261
262    /// Delete manifest files that version < end.
263    /// If keep_last_checkpoint is true, the last checkpoint file will be kept.
264    /// ### Return
265    /// The number of deleted files.
266    pub async fn delete_until(
267        &self,
268        end: ManifestVersion,
269        keep_last_checkpoint: bool,
270    ) -> Result<usize> {
271        // Stores (entry, is_checkpoint, version) in a Vec.
272        //
273        // `start_after` is intentionally `None` here: a previous deletion
274        // may have been interrupted and left stale files at versions below
275        // the current checkpoint; we need the lister to surface them so
276        // cleanup can finish.
277        let entries: Vec<_> = self
278            .delta_storage
279            .get_paths(None, |entry| {
280                let file_name = entry.name();
281                let is_checkpoint = is_checkpoint_file(file_name);
282                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
283                    let version = file_version(file_name);
284                    if version < end {
285                        return Some((entry, is_checkpoint, version));
286                    }
287                }
288                None
289            })
290            .await?;
291        let checkpoint_version = if keep_last_checkpoint {
292            // Note that the order of entries is unspecific.
293            entries
294                .iter()
295                .filter_map(
296                    |(_e, is_checkpoint, version)| {
297                        if *is_checkpoint { Some(version) } else { None }
298                    },
299                )
300                .max()
301        } else {
302            None
303        };
304        let del_entries: Vec<_> = entries
305            .iter()
306            .filter(|(_e, is_checkpoint, version)| {
307                if let Some(max_version) = checkpoint_version {
308                    if *is_checkpoint {
309                        // We need to keep the checkpoint file.
310                        version < max_version
311                    } else {
312                        // We can delete the log file with max_version as the checkpoint
313                        // file contains the log file's content.
314                        version <= max_version
315                    }
316                } else {
317                    true
318                }
319            })
320            .collect();
321        let paths = del_entries
322            .iter()
323            .map(|(e, _, _)| e.path().to_string())
324            .collect::<Vec<_>>();
325        let total = paths.len();
326
327        debug!(
328            "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
329            total, self.path, end, checkpoint_version, paths,
330        );
331
332        // Remove from cache first
333        for (entry, _, _) in &del_entries {
334            remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
335        }
336
337        // Try batch delete first. On failure, fall back to per-file deletes.
338        // This is a workaround for S3-compatible object stores that do not support batch delete. See issue #7986.
339        let mut succeeded = vec![false; del_entries.len()];
340        match self.object_store.delete_iter(paths.clone()).await {
341            Ok(()) => succeeded.fill(true),
342            Err(batch_err) => {
343                warn!(
344                    batch_err;
345                    "Batch delete failed for manifest path {}, falling back to per-file delete for {} paths",
346                    self.path, total,
347                );
348                for (i, path) in paths.iter().enumerate() {
349                    if let Err(e) = self.object_store.delete(path).await {
350                        warn!(
351                            e;
352                            "Failed to delete manifest file {} under {}, aborting fallback, {} files will be retried on next checkpoint",
353                            path, self.path, total - i,
354                        );
355                        break;
356                    }
357                    succeeded[i] = true;
358                }
359            }
360        }
361
362        let mut deleted = 0usize;
363        for (i, (_, is_checkpoint, version)) in del_entries.iter().enumerate() {
364            if !succeeded[i] {
365                continue;
366            }
367            deleted += 1;
368            if *is_checkpoint {
369                self.size_tracker
370                    .remove(&size_tracker::FileKey::Checkpoint(*version));
371            } else {
372                self.size_tracker
373                    .remove(&size_tracker::FileKey::Delta(*version));
374            }
375        }
376
377        Ok(deleted)
378    }
379
380    /// Save the delta manifest file.
381    pub async fn save(
382        &mut self,
383        version: ManifestVersion,
384        bytes: &[u8],
385        is_staging: bool,
386    ) -> Result<()> {
387        if is_staging {
388            self.staging_storage.save(version, bytes).await
389        } else {
390            self.delta_storage.save(version, bytes).await
391        }
392    }
393
394    /// Save the checkpoint manifest file.
395    pub(crate) async fn save_checkpoint(
396        &self,
397        version: ManifestVersion,
398        bytes: &[u8],
399    ) -> Result<()> {
400        self.checkpoint_storage
401            .save_checkpoint(version, bytes)
402            .await
403    }
404
405    /// Load the latest checkpoint.
406    /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
407    pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
408        self.checkpoint_storage.load_last_checkpoint().await
409    }
410
411    /// Compute the size(Byte) in manifest size map.
412    pub(crate) fn total_manifest_size(&self) -> u64 {
413        self.size_tracker.total()
414    }
415
416    /// Resets the size of all files.
417    pub(crate) fn reset_manifest_size(&mut self) {
418        self.size_tracker.reset();
419    }
420
421    /// Set the size of the delta file by delta version.
422    pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
423        self.size_tracker.record_delta(version, size);
424    }
425
426    /// Set the size of the checkpoint file by checkpoint version.
427    pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
428        self.size_tracker.record_checkpoint(version, size);
429    }
430
431    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
432    pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
433        self.staging_storage.fetch_manifests().await
434    }
435
436    /// Clear all staging manifest files.
437    pub async fn clear_staging_manifests(&mut self) -> Result<()> {
438        self.staging_storage.clear().await
439    }
440
441    /// Returns the staging storage.
442    pub(crate) fn staging_storage(&self) -> &StagingStorage {
443        &self.staging_storage
444    }
445}
446
447#[cfg(test)]
448impl ManifestObjectStore {
449    pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
450        self.object_store
451            .read(path)
452            .await
453            .context(OpenDalSnafu)
454            .map(|v| v.to_vec())
455    }
456
457    pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
458        &self.checkpoint_storage
459    }
460
461    pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
462        &self.delta_storage
463    }
464
465    pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
466        self.checkpoint_storage.set_compress_type(compress_type);
467        self.delta_storage.set_compress_type(compress_type);
468        self.staging_storage.set_compress_type(compress_type);
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use common_test_util::temp_dir::create_temp_dir;
475    use object_store::ObjectStore;
476    use object_store::services::Fs;
477
478    use super::*;
479    use crate::manifest::storage::checkpoint::CheckpointMetadata;
480
481    fn new_test_manifest_store() -> ManifestObjectStore {
482        new_test_manifest_store_at("/")
483    }
484
485    fn new_test_manifest_store_at(path: &str) -> ManifestObjectStore {
486        common_telemetry::init_default_ut_logging();
487        let tmp_dir = create_temp_dir("test_manifest_log_store");
488        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
489        let object_store = ObjectStore::new(builder).unwrap().finish();
490        ManifestObjectStore::new(
491            path,
492            object_store,
493            CompressionType::Uncompressed,
494            Default::default(),
495            None,
496        )
497    }
498
499    fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
500        CheckpointMetadata {
501            size: 0,
502            version,
503            checksum: None,
504            extend_metadata: Default::default(),
505        }
506    }
507
508    #[test]
509    // Define this test mainly to prevent future unintentional changes may break the backward compatibility.
510    fn test_compress_file_path_generation() {
511        let path = "/foo/bar/";
512        let version: ManifestVersion = 0;
513        let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
514        assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
515    }
516
517    #[tokio::test]
518    async fn test_manifest_log_store_uncompress() {
519        let mut log_store = new_test_manifest_store();
520        log_store.set_compress_type(CompressionType::Uncompressed);
521        test_manifest_log_store_case(log_store).await;
522    }
523
524    #[tokio::test]
525    async fn test_manifest_log_store_compress() {
526        let mut log_store = new_test_manifest_store();
527        log_store.set_compress_type(CompressionType::Gzip);
528        test_manifest_log_store_case(log_store).await;
529    }
530
531    async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
532        for v in 0..5 {
533            log_store
534                .save(v, format!("hello, {v}").as_bytes(), false)
535                .await
536                .unwrap();
537        }
538
539        let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
540        let mut it = manifests.into_iter();
541        for v in 1..4 {
542            let (version, bytes) = it.next().unwrap();
543            assert_eq!(v, version);
544            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
545        }
546        assert!(it.next().is_none());
547
548        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
549        let mut it = manifests.into_iter();
550        for v in 0..5 {
551            let (version, bytes) = it.next().unwrap();
552            assert_eq!(v, version);
553            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
554        }
555        assert!(it.next().is_none());
556
557        // test checkpoint
558        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
559        log_store
560            .save_checkpoint(3, "checkpoint".as_bytes())
561            .await
562            .unwrap();
563
564        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
565        assert_eq!(checkpoint, "checkpoint".as_bytes());
566        assert_eq!(3, v);
567
568        //delete (,4) logs and keep checkpoint 3.
569        let _ = log_store.delete_until(4, true).await.unwrap();
570        let _ = log_store
571            .checkpoint_storage
572            .load_checkpoint(new_checkpoint_metadata_with_version(3))
573            .await
574            .unwrap()
575            .unwrap();
576        let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
577        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
578        let mut it = manifests.into_iter();
579
580        let (version, bytes) = it.next().unwrap();
581        assert_eq!(4, version);
582        assert_eq!("hello, 4".as_bytes(), bytes);
583        assert!(it.next().is_none());
584
585        // delete all logs and checkpoints
586        let _ = log_store.delete_until(11, false).await.unwrap();
587        assert!(
588            log_store
589                .checkpoint_storage
590                .load_checkpoint(new_checkpoint_metadata_with_version(3))
591                .await
592                .unwrap()
593                .is_none()
594        );
595        assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
596        let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
597        let mut it = manifests.into_iter();
598
599        assert!(it.next().is_none());
600    }
601
602    #[tokio::test]
603    // test ManifestObjectStore can read/delete previously uncompressed data correctly
604    async fn test_compress_backward_compatible() {
605        let mut log_store = new_test_manifest_store();
606
607        // write uncompress data to stimulate previously uncompressed data
608        log_store.set_compress_type(CompressionType::Uncompressed);
609        for v in 0..5 {
610            log_store
611                .save(v, format!("hello, {v}").as_bytes(), false)
612                .await
613                .unwrap();
614        }
615        log_store
616            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
617            .await
618            .unwrap();
619
620        // change compress type
621        log_store.set_compress_type(CompressionType::Gzip);
622
623        // test load_last_checkpoint work correctly for previously uncompressed data
624        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
625        assert_eq!(v, 5);
626        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
627
628        // write compressed data to stimulate compress algorithm take effect
629        for v in 5..10 {
630            log_store
631                .save(v, format!("hello, {v}").as_bytes(), false)
632                .await
633                .unwrap();
634        }
635        log_store
636            .save_checkpoint(10, "checkpoint_compressed".as_bytes())
637            .await
638            .unwrap();
639
640        // test data reading
641        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
642        let mut it = manifests.into_iter();
643
644        for v in 0..10 {
645            let (version, bytes) = it.next().unwrap();
646            assert_eq!(v, version);
647            assert_eq!(format!("hello, {v}").as_bytes(), bytes);
648        }
649        let (v, checkpoint) = log_store
650            .checkpoint_storage
651            .load_checkpoint(new_checkpoint_metadata_with_version(5))
652            .await
653            .unwrap()
654            .unwrap();
655        assert_eq!(v, 5);
656        assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
657        let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
658        assert_eq!(v, 10);
659        assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
660
661        // Delete util 10, contain uncompressed/compressed data
662        // log 0, 1, 2, 7, 8, 9 will be delete
663        assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
664        let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
665        let mut it = manifests.into_iter();
666        assert!(it.next().is_none());
667    }
668
669    #[tokio::test]
670    async fn test_file_version() {
671        let version = file_version("00000000000000000007.checkpoint");
672        assert_eq!(version, 7);
673
674        let name = delta_file(version);
675        assert_eq!(name, "00000000000000000007.json");
676
677        let name = checkpoint_file(version);
678        assert_eq!(name, "00000000000000000007.checkpoint");
679    }
680
681    #[tokio::test]
682    async fn test_uncompressed_manifest_files_size() {
683        let mut log_store = new_test_manifest_store();
684        // write 5 manifest files with uncompressed(8B per file)
685        log_store.set_compress_type(CompressionType::Uncompressed);
686        for v in 0..5 {
687            log_store
688                .save(v, format!("hello, {v}").as_bytes(), false)
689                .await
690                .unwrap();
691        }
692        // write 1 checkpoint file with uncompressed(23B)
693        log_store
694            .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
695            .await
696            .unwrap();
697
698        // manifest files size
699        assert_eq!(log_store.total_manifest_size(), 63);
700
701        // delete 3 manifest files
702        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
703
704        // manifest files size after delete
705        assert_eq!(log_store.total_manifest_size(), 39);
706
707        // delete all manifest files
708        assert_eq!(
709            log_store
710                .delete_until(ManifestVersion::MAX, false)
711                .await
712                .unwrap(),
713            3
714        );
715
716        assert_eq!(log_store.total_manifest_size(), 0);
717    }
718
719    #[tokio::test]
720    async fn test_compressed_manifest_files_size() {
721        let mut log_store = new_test_manifest_store();
722        // Test with compressed manifest files
723        log_store.set_compress_type(CompressionType::Gzip);
724        // write 5 manifest files
725        for v in 0..5 {
726            log_store
727                .save(v, format!("hello, {v}").as_bytes(), false)
728                .await
729                .unwrap();
730        }
731        log_store
732            .save_checkpoint(5, "checkpoint_compressed".as_bytes())
733            .await
734            .unwrap();
735
736        // manifest files size
737        assert_eq!(log_store.total_manifest_size(), 181);
738
739        // delete 3 manifest files
740        assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
741
742        // manifest files size after delete
743        assert_eq!(log_store.total_manifest_size(), 97);
744
745        // delete all manifest files
746        assert_eq!(
747            log_store
748                .delete_until(ManifestVersion::MAX, false)
749                .await
750                .unwrap(),
751            3
752        );
753
754        assert_eq!(log_store.total_manifest_size(), 0);
755    }
756
757    #[tokio::test]
758    async fn test_scan_with_start_after_uncompress() {
759        let mut log_store = new_test_manifest_store();
760        log_store.set_compress_type(CompressionType::Uncompressed);
761        test_scan_with_start_after_case(log_store).await;
762    }
763
764    #[tokio::test]
765    async fn test_scan_with_start_after_compress() {
766        let mut log_store = new_test_manifest_store();
767        log_store.set_compress_type(CompressionType::Gzip);
768        test_scan_with_start_after_case(log_store).await;
769    }
770
771    // OpenDAL resolves `start_after` against the operator
772    // root, so the bound must embed the manifest directory prefix. Running the
773    // same assertions against a non-root path exercises that composition.
774    #[tokio::test]
775    async fn test_scan_with_start_after_nested_path() {
776        let mut log_store = new_test_manifest_store_at("/nested/region-1/");
777        log_store.set_compress_type(CompressionType::Uncompressed);
778        test_scan_with_start_after_case(log_store).await;
779    }
780
781    async fn test_scan_with_start_after_case(mut log_store: ManifestObjectStore) {
782        for v in 0..10 {
783            log_store
784                .save(v, format!("hello, {v}").as_bytes(), false)
785                .await
786                .unwrap();
787        }
788        // A checkpoint at version 5 shares the directory; scan must still
789        // return only delta files in range.
790        log_store
791            .save_checkpoint(5, "checkpoint".as_bytes())
792            .await
793            .unwrap();
794
795        // start > 0: `start_after` must skip pre-start deltas without losing any.
796        let entries = log_store.delta_storage.scan(3, 10).await.unwrap();
797        let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
798        assert_eq!(versions, vec![3, 4, 5, 6, 7, 8, 9]);
799
800        // start == 0: `start_after` is skipped; every delta is returned.
801        let entries = log_store.delta_storage.scan(0, 10).await.unwrap();
802        let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
803        assert_eq!(versions, (0..10).collect::<Vec<_>>());
804
805        // Upper bound exclusive.
806        let entries = log_store.delta_storage.scan(7, 9).await.unwrap();
807        let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
808        assert_eq!(versions, vec![7, 8]);
809
810        // Start beyond any existing file returns empty.
811        let entries = log_store
812            .delta_storage
813            .scan(10, ManifestVersion::MAX)
814            .await
815            .unwrap();
816        assert!(entries.is_empty());
817    }
818}