mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::error::{
27    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31    RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39use crate::sst::FormatType;
40
41/// Options for [RegionManifestManager].
42#[derive(Debug, Clone)]
43pub struct RegionManifestOptions {
44    /// Directory to store manifest.
45    pub manifest_dir: String,
46    pub object_store: ObjectStore,
47    pub compress_type: CompressionType,
48    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
49    /// Set to 0 to disable checkpoint.
50    pub checkpoint_distance: u64,
51    pub remove_file_options: RemoveFileOptions,
52}
53
54/// Options for updating `removed_files` field in [RegionManifest].
55#[derive(Debug, Clone)]
56pub struct RemoveFileOptions {
57    /// Number of removed files to keep in manifest's `removed_files` field before also
58    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
59    pub keep_count: usize,
60    /// Duration to keep removed files in manifest's `removed_files` field before also
61    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
62    pub keep_ttl: std::time::Duration,
63}
64
65#[cfg(any(test, feature = "test"))]
66impl Default for RemoveFileOptions {
67    fn default() -> Self {
68        Self {
69            keep_count: 256,
70            keep_ttl: std::time::Duration::from_secs(3600),
71        }
72    }
73}
74
75// rewrite note:
76// trait Checkpoint -> struct RegionCheckpoint
77// trait MetaAction -> struct RegionMetaActionList
78// trait MetaActionIterator -> struct MetaActionIteratorImpl
79
80#[cfg_attr(doc, aquamarine::aquamarine)]
81/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
82/// metadata.
83///
84/// ```mermaid
85/// classDiagram
86/// class RegionManifestManager {
87///     -ManifestObjectStore store
88///     -RegionManifestOptions options
89///     -RegionManifest manifest
90///     +new() RegionManifestManager
91///     +open() Option~RegionManifestManager~
92///     +stop()
93///     +update(RegionMetaActionList action_list) ManifestVersion
94///     +manifest() RegionManifest
95/// }
96/// class ManifestObjectStore {
97///     -ObjectStore object_store
98/// }
99/// class RegionChange {
100///     -RegionMetadataRef metadata
101/// }
102/// class RegionEdit {
103///     -VersionNumber region_version
104///     -Vec~FileMeta~ files_to_add
105///     -Vec~FileMeta~ files_to_remove
106///     -SequenceNumber flushed_sequence
107/// }
108/// class RegionRemove {
109///     -RegionId region_id
110/// }
111/// RegionManifestManager o-- ManifestObjectStore
112/// RegionManifestManager o-- RegionManifest
113/// RegionManifestManager o-- RegionManifestOptions
114/// RegionManifestManager -- RegionMetaActionList
115/// RegionManifestManager -- RegionCheckpoint
116/// ManifestObjectStore o-- ObjectStore
117/// RegionMetaActionList o-- RegionMetaAction
118/// RegionMetaAction o-- ProtocolAction
119/// RegionMetaAction o-- RegionChange
120/// RegionMetaAction o-- RegionEdit
121/// RegionMetaAction o-- RegionRemove
122/// RegionChange o-- RegionMetadata
123/// RegionEdit o-- FileMeta
124///
125/// class RegionManifest {
126///     -RegionMetadataRef metadata
127///     -HashMap<FileId, FileMeta> files
128///     -ManifestVersion manifest_version
129/// }
130/// class RegionMetadata
131/// class FileMeta
132/// RegionManifest o-- RegionMetadata
133/// RegionManifest o-- FileMeta
134///
135/// class RegionCheckpoint {
136///     -ManifestVersion last_version
137///     -Option~RegionManifest~ checkpoint
138/// }
139/// RegionCheckpoint o-- RegionManifest
140/// ```
141#[derive(Debug)]
142pub struct RegionManifestManager {
143    store: ManifestObjectStore,
144    last_version: Arc<AtomicU64>,
145    checkpointer: Checkpointer,
146    manifest: Arc<RegionManifest>,
147    stopped: bool,
148}
149
150impl RegionManifestManager {
151    /// Constructs a region's manifest and persist it.
152    pub async fn new(
153        metadata: RegionMetadataRef,
154        flushed_entry_id: u64,
155        options: RegionManifestOptions,
156        total_manifest_size: Arc<AtomicU64>,
157        manifest_version: Arc<AtomicU64>,
158        sst_format: FormatType,
159    ) -> Result<Self> {
160        // construct storage
161        let mut store = ManifestObjectStore::new(
162            &options.manifest_dir,
163            options.object_store.clone(),
164            options.compress_type,
165            total_manifest_size,
166        );
167
168        info!(
169            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
170            options.manifest_dir, metadata, flushed_entry_id
171        );
172
173        let version = MIN_VERSION;
174        let mut manifest_builder = RegionManifestBuilder::default();
175        // set the initial metadata.
176        manifest_builder.apply_change(
177            version,
178            RegionChange {
179                metadata: metadata.clone(),
180                sst_format,
181            },
182        );
183        let manifest = manifest_builder.try_build()?;
184        let region_id = metadata.region_id;
185
186        debug!(
187            "Build region manifest in {}, manifest: {:?}",
188            options.manifest_dir, manifest
189        );
190
191        let mut actions = vec![RegionMetaAction::Change(RegionChange {
192            metadata,
193            sst_format,
194        })];
195        if flushed_entry_id > 0 {
196            actions.push(RegionMetaAction::Edit(RegionEdit {
197                files_to_add: vec![],
198                files_to_remove: vec![],
199                timestamp_ms: None,
200                compaction_time_window: None,
201                flushed_entry_id: Some(flushed_entry_id),
202                flushed_sequence: None,
203                committed_sequence: None,
204            }));
205        }
206
207        // Persist region change.
208        let action_list = RegionMetaActionList::new(actions);
209
210        // New region is not in staging mode.
211        // TODO(ruihang): add staging mode support if needed.
212        store.save(version, &action_list.encode()?, false).await?;
213
214        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
215        manifest_version.store(version, Ordering::Relaxed);
216        Ok(Self {
217            store,
218            last_version: manifest_version,
219            checkpointer,
220            manifest: Arc::new(manifest),
221            stopped: false,
222        })
223    }
224
225    /// Opens an existing manifest.
226    ///
227    /// Returns `Ok(None)` if no such manifest.
228    pub async fn open(
229        options: RegionManifestOptions,
230        total_manifest_size: Arc<AtomicU64>,
231        manifest_version: Arc<AtomicU64>,
232    ) -> Result<Option<Self>> {
233        let _t = MANIFEST_OP_ELAPSED
234            .with_label_values(&["open"])
235            .start_timer();
236
237        // construct storage
238        let mut store = ManifestObjectStore::new(
239            &options.manifest_dir,
240            options.object_store.clone(),
241            options.compress_type,
242            total_manifest_size,
243        );
244
245        // recover from storage
246        // construct manifest builder
247        // calculate the manifest size from the latest checkpoint
248        let mut version = MIN_VERSION;
249        let checkpoint = Self::last_checkpoint(&mut store).await?;
250        let last_checkpoint_version = checkpoint
251            .as_ref()
252            .map(|(checkpoint, _)| checkpoint.last_version)
253            .unwrap_or(MIN_VERSION);
254        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
255            info!(
256                "Recover region manifest {} from checkpoint version {}",
257                options.manifest_dir, checkpoint.last_version
258            );
259            version = version.max(checkpoint.last_version + 1);
260            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
261        } else {
262            info!(
263                "Checkpoint not found in {}, build manifest from scratch",
264                options.manifest_dir
265            );
266            RegionManifestBuilder::default()
267        };
268
269        // apply actions from storage
270        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
271
272        for (manifest_version, raw_action_list) in manifests {
273            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
274            // set manifest size after last checkpoint
275            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
276            for action in action_list.actions {
277                match action {
278                    RegionMetaAction::Change(action) => {
279                        manifest_builder.apply_change(manifest_version, action);
280                    }
281                    RegionMetaAction::Edit(action) => {
282                        manifest_builder.apply_edit(manifest_version, action);
283                    }
284                    RegionMetaAction::Remove(_) => {
285                        debug!(
286                            "Unhandled action in {}, action: {:?}",
287                            options.manifest_dir, action
288                        );
289                    }
290                    RegionMetaAction::Truncate(action) => {
291                        manifest_builder.apply_truncate(manifest_version, action);
292                    }
293                }
294            }
295        }
296
297        // set the initial metadata if necessary
298        if !manifest_builder.contains_metadata() {
299            debug!("No region manifest in {}", options.manifest_dir);
300            return Ok(None);
301        }
302
303        let manifest = manifest_builder.try_build()?;
304        debug!(
305            "Recovered region manifest from {}, manifest: {:?}",
306            options.manifest_dir, manifest
307        );
308        let version = manifest.manifest_version;
309
310        let checkpointer = Checkpointer::new(
311            manifest.metadata.region_id,
312            options,
313            store.clone(),
314            last_checkpoint_version,
315        );
316        manifest_version.store(version, Ordering::Relaxed);
317        Ok(Some(Self {
318            store,
319            last_version: manifest_version,
320            checkpointer,
321            manifest: Arc::new(manifest),
322            stopped: false,
323        }))
324    }
325
326    /// Stops the manager.
327    pub async fn stop(&mut self) {
328        self.stopped = true;
329    }
330
331    /// Installs the manifest changes from the current version to the target version (inclusive).
332    ///
333    /// Returns installed version.
334    /// **Note**: This function is not guaranteed to install the target version strictly.
335    /// The installed version may be greater than the target version.
336    pub async fn install_manifest_to(
337        &mut self,
338        target_version: ManifestVersion,
339    ) -> Result<ManifestVersion> {
340        let _t = MANIFEST_OP_ELAPSED
341            .with_label_values(&["install_manifest_to"])
342            .start_timer();
343
344        let last_version = self.last_version();
345        // Case 1: If the target version is less than the current version, return the current version.
346        if last_version >= target_version {
347            debug!(
348                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
349                target_version, last_version, self.manifest.metadata.region_id
350            );
351            return Ok(last_version);
352        }
353
354        ensure!(
355            !self.stopped,
356            RegionStoppedSnafu {
357                region_id: self.manifest.metadata.region_id,
358            }
359        );
360
361        let region_id = self.manifest.metadata.region_id;
362        // Fetches manifests from the last version strictly.
363        let mut manifests = self
364            .store
365            // Invariant: last_version < target_version.
366            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
367            .await?;
368
369        // Case 2: No manifests in range: [current_version+1, target_version+1)
370        //
371        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
372        //                                                                    [Leader region]
373        // [Current Version]......[Target Version]
374        // [Follower region]
375        if manifests.is_empty() {
376            info!(
377                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
378                last_version, self.manifest.metadata.region_id
379            );
380            let last_version = self.install_last_checkpoint().await?;
381            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
382            if last_version >= target_version {
383                return Ok(last_version);
384            }
385
386            // Fetches manifests from the installed version strictly.
387            manifests = self
388                .store
389                // Invariant: last_version < target_version.
390                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
391                .await?;
392        }
393
394        if manifests.is_empty() {
395            return NoManifestsSnafu {
396                region_id: self.manifest.metadata.region_id,
397                start_version: last_version + 1,
398                end_version: target_version + 1,
399                last_version,
400            }
401            .fail();
402        }
403
404        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
405        let mut manifest_builder =
406            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
407
408        for (manifest_version, raw_action_list) in manifests {
409            self.store
410                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
411            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
412            for action in action_list.actions {
413                match action {
414                    RegionMetaAction::Change(action) => {
415                        manifest_builder.apply_change(manifest_version, action);
416                    }
417                    RegionMetaAction::Edit(action) => {
418                        manifest_builder.apply_edit(manifest_version, action);
419                    }
420                    RegionMetaAction::Remove(_) => {
421                        debug!(
422                            "Unhandled action for region {}, action: {:?}",
423                            self.manifest.metadata.region_id, action
424                        );
425                    }
426                    RegionMetaAction::Truncate(action) => {
427                        manifest_builder.apply_truncate(manifest_version, action);
428                    }
429                }
430            }
431        }
432
433        let new_manifest = manifest_builder.try_build()?;
434        ensure!(
435            new_manifest.manifest_version >= target_version,
436            InstallManifestToSnafu {
437                region_id: self.manifest.metadata.region_id,
438                target_version,
439                available_version: new_manifest.manifest_version,
440                last_version,
441            }
442        );
443
444        let version = self.last_version();
445        self.manifest = Arc::new(new_manifest);
446        let last_version = self.set_version(self.manifest.manifest_version);
447        info!(
448            "Install manifest changes from {} to {}, region: {}",
449            version, last_version, self.manifest.metadata.region_id
450        );
451
452        Ok(last_version)
453    }
454
455    /// Installs the last checkpoint.
456    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
457        let last_version = self.last_version();
458        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
459        else {
460            return NoCheckpointSnafu {
461                region_id: self.manifest.metadata.region_id,
462                last_version,
463            }
464            .fail();
465        };
466        self.store.reset_manifest_size();
467        self.store
468            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
469        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
470        let manifest = builder.try_build()?;
471        let last_version = self.set_version(manifest.manifest_version);
472        self.manifest = Arc::new(manifest);
473        info!(
474            "Installed region manifest from checkpoint: {}, region: {}",
475            checkpoint.last_version, self.manifest.metadata.region_id
476        );
477
478        Ok(last_version)
479    }
480
481    /// Updates the manifest. Returns the current manifest version number.
482    pub async fn update(
483        &mut self,
484        action_list: RegionMetaActionList,
485        region_state: RegionRoleState,
486    ) -> Result<ManifestVersion> {
487        let _t = MANIFEST_OP_ELAPSED
488            .with_label_values(&["update"])
489            .start_timer();
490
491        ensure!(
492            !self.stopped,
493            RegionStoppedSnafu {
494                region_id: self.manifest.metadata.region_id,
495            }
496        );
497
498        let version = self.increase_version();
499        let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
500        self.store
501            .save(version, &action_list.encode()?, is_staging)
502            .await?;
503
504        let mut manifest_builder =
505            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
506        for action in action_list.actions {
507            match action {
508                RegionMetaAction::Change(action) => {
509                    manifest_builder.apply_change(version, action);
510                }
511                RegionMetaAction::Edit(action) => {
512                    manifest_builder.apply_edit(version, action);
513                }
514                RegionMetaAction::Remove(_) => {
515                    debug!(
516                        "Unhandled action for region {}, action: {:?}",
517                        self.manifest.metadata.region_id, action
518                    );
519                }
520                RegionMetaAction::Truncate(action) => {
521                    manifest_builder.apply_truncate(version, action);
522                }
523            }
524        }
525        let new_manifest = manifest_builder.try_build()?;
526        let updated_manifest = self
527            .checkpointer
528            .update_manifest_removed_files(new_manifest)?;
529        self.manifest = Arc::new(updated_manifest);
530
531        self.checkpointer
532            .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
533
534        Ok(version)
535    }
536
537    /// Retrieves the current [RegionManifest].
538    pub fn manifest(&self) -> Arc<RegionManifest> {
539        self.manifest.clone()
540    }
541
542    /// Returns total manifest size.
543    pub fn manifest_usage(&self) -> u64 {
544        self.store.total_manifest_size()
545    }
546
547    /// Returns true if a newer version manifest file is found.
548    ///
549    /// It is typically used in read-only regions to catch up with manifest.
550    /// It doesn't lock the manifest directory in the object store so the result
551    /// may be inaccurate if there are concurrent writes.
552    pub async fn has_update(&self) -> Result<bool> {
553        let last_version = self.last_version();
554
555        let streamer =
556            self.store
557                .manifest_lister(false)
558                .await?
559                .context(error::EmptyManifestDirSnafu {
560                    manifest_dir: self.store.manifest_dir(),
561                })?;
562
563        let need_update = streamer
564            .try_any(|entry| async move {
565                let file_name = entry.name();
566                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
567                    let version = file_version(file_name);
568                    if version > last_version {
569                        return true;
570                    }
571                }
572                false
573            })
574            .await
575            .context(error::OpenDalSnafu)?;
576
577        Ok(need_update)
578    }
579
580    /// Increases last version and returns the increased version.
581    fn increase_version(&mut self) -> ManifestVersion {
582        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
583        previous + 1
584    }
585
586    /// Sets the last version.
587    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
588        self.last_version.store(version, Ordering::Relaxed);
589        version
590    }
591
592    fn last_version(&self) -> ManifestVersion {
593        self.last_version.load(Ordering::Relaxed)
594    }
595
596    /// Fetches the last [RegionCheckpoint] from storage.
597    ///
598    /// If the checkpoint is not found, returns `None`.
599    /// Otherwise, returns the checkpoint and the size of the checkpoint.
600    pub(crate) async fn last_checkpoint(
601        store: &mut ManifestObjectStore,
602    ) -> Result<Option<(RegionCheckpoint, u64)>> {
603        let last_checkpoint = store.load_last_checkpoint().await?;
604
605        if let Some((_, bytes)) = last_checkpoint {
606            let checkpoint = RegionCheckpoint::decode(&bytes)?;
607            Ok(Some((checkpoint, bytes.len() as u64)))
608        } else {
609            Ok(None)
610        }
611    }
612
613    pub fn store(&self) -> ManifestObjectStore {
614        self.store.clone()
615    }
616
617    #[cfg(test)]
618    pub(crate) fn checkpointer(&self) -> &Checkpointer {
619        &self.checkpointer
620    }
621
622    /// Merge all staged manifest actions into a single action list ready for submission.
623    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
624    pub(crate) async fn merge_staged_actions(
625        &mut self,
626        region_state: RegionRoleState,
627    ) -> Result<Option<RegionMetaActionList>> {
628        // Only merge if we're in staging mode
629        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
630            return Ok(None);
631        }
632
633        // Fetch all staging manifests
634        let staging_manifests = self.store.fetch_staging_manifests().await?;
635
636        if staging_manifests.is_empty() {
637            info!(
638                "No staging manifests to merge for region {}",
639                self.manifest.metadata.region_id
640            );
641            return Ok(None);
642        }
643
644        info!(
645            "Merging {} staging manifests for region {}",
646            staging_manifests.len(),
647            self.manifest.metadata.region_id
648        );
649
650        // Start with current manifest state as the base
651        let mut merged_actions = Vec::new();
652        let mut latest_version = self.last_version();
653
654        // Apply all staging actions in order
655        for (manifest_version, raw_action_list) in staging_manifests {
656            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
657
658            for action in action_list.actions {
659                merged_actions.push(action);
660            }
661
662            latest_version = latest_version.max(manifest_version);
663        }
664
665        if merged_actions.is_empty() {
666            return Ok(None);
667        }
668
669        info!(
670            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
671            merged_actions.len(),
672            self.manifest.metadata.region_id,
673            latest_version
674        );
675
676        Ok(Some(RegionMetaActionList::new(merged_actions)))
677    }
678}
679
680#[cfg(test)]
681impl RegionManifestManager {
682    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
683        let manifest = self.manifest();
684        assert_eq!(manifest.metadata, *expect);
685        assert_eq!(self.manifest.manifest_version, self.last_version());
686        assert_eq!(last_version, self.last_version());
687    }
688}
689
690#[cfg(test)]
691mod test {
692    use std::time::Duration;
693
694    use api::v1::SemanticType;
695    use common_datasource::compression::CompressionType;
696    use common_test_util::temp_dir::create_temp_dir;
697    use datatypes::prelude::ConcreteDataType;
698    use datatypes::schema::ColumnSchema;
699    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
700
701    use super::*;
702    use crate::manifest::action::{RegionChange, RegionEdit};
703    use crate::manifest::tests::utils::basic_region_metadata;
704    use crate::test_util::TestEnv;
705
706    #[tokio::test]
707    async fn create_manifest_manager() {
708        let metadata = Arc::new(basic_region_metadata());
709        let env = TestEnv::new().await;
710        let manager = env
711            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
712            .await
713            .unwrap()
714            .unwrap();
715
716        manager.validate_manifest(&metadata, 0);
717    }
718
719    #[tokio::test]
720    async fn open_manifest_manager() {
721        let env = TestEnv::new().await;
722        // Try to opens an empty manifest.
723        assert!(
724            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
725                .await
726                .unwrap()
727                .is_none()
728        );
729
730        // Creates a manifest.
731        let metadata = Arc::new(basic_region_metadata());
732        let mut manager = env
733            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
734            .await
735            .unwrap()
736            .unwrap();
737        // Stops it.
738        manager.stop().await;
739
740        // Open it.
741        let manager = env
742            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
743            .await
744            .unwrap()
745            .unwrap();
746
747        manager.validate_manifest(&metadata, 0);
748    }
749
750    #[tokio::test]
751    async fn manifest_with_partition_expr_roundtrip() {
752        let env = TestEnv::new().await;
753        let expr_json =
754            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
755        let mut metadata = basic_region_metadata();
756        metadata.partition_expr = Some(expr_json.to_string());
757        let metadata = Arc::new(metadata);
758        let mut manager = env
759            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
760            .await
761            .unwrap()
762            .unwrap();
763
764        // persisted manifest should contain the same partition_expr JSON
765        let manifest = manager.manifest();
766        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
767
768        manager.stop().await;
769
770        // Reopen and check again
771        let manager = env
772            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
773            .await
774            .unwrap()
775            .unwrap();
776        let manifest = manager.manifest();
777        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
778    }
779
780    #[tokio::test]
781    async fn region_change_add_column() {
782        let metadata = Arc::new(basic_region_metadata());
783        let env = TestEnv::new().await;
784        let mut manager = env
785            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
786            .await
787            .unwrap()
788            .unwrap();
789
790        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
791        new_metadata_builder.push_column_metadata(ColumnMetadata {
792            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
793            semantic_type: SemanticType::Field,
794            column_id: 252,
795        });
796        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
797
798        let action_list =
799            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
800                metadata: new_metadata.clone(),
801                sst_format: FormatType::PrimaryKey,
802            }));
803
804        let current_version = manager
805            .update(
806                action_list,
807                RegionRoleState::Leader(RegionLeaderState::Writable),
808            )
809            .await
810            .unwrap();
811        assert_eq!(current_version, 1);
812        manager.validate_manifest(&new_metadata, 1);
813
814        // Reopen the manager.
815        manager.stop().await;
816        let manager = env
817            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
818            .await
819            .unwrap()
820            .unwrap();
821        manager.validate_manifest(&new_metadata, 1);
822    }
823
824    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
825    async fn manifest_dir_usage(path: &str) -> u64 {
826        let mut size = 0;
827        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
828        while let Ok(dir_entry) = read_dir.next_entry().await {
829            let Some(entry) = dir_entry else {
830                break;
831            };
832            if entry.file_type().await.unwrap().is_file() {
833                let file_name = entry.file_name().into_string().unwrap();
834                if file_name.contains(".checkpoint") || file_name.contains(".json") {
835                    let file_size = entry.metadata().await.unwrap().len() as usize;
836                    debug!("File: {file_name:?}, size: {file_size}");
837                    size += file_size;
838                }
839            }
840        }
841        size as u64
842    }
843
844    #[tokio::test]
845    async fn test_manifest_size() {
846        let metadata = Arc::new(basic_region_metadata());
847        let data_home = create_temp_dir("");
848        let data_home_path = data_home.path().to_str().unwrap().to_string();
849        let env = TestEnv::with_data_home(data_home).await;
850
851        let manifest_dir = format!("{}/manifest", data_home_path);
852
853        let mut manager = env
854            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
855            .await
856            .unwrap()
857            .unwrap();
858
859        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
860        new_metadata_builder.push_column_metadata(ColumnMetadata {
861            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
862            semantic_type: SemanticType::Field,
863            column_id: 252,
864        });
865        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
866
867        let action_list =
868            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
869                metadata: new_metadata.clone(),
870                sst_format: FormatType::PrimaryKey,
871            }));
872
873        let current_version = manager
874            .update(
875                action_list,
876                RegionRoleState::Leader(RegionLeaderState::Writable),
877            )
878            .await
879            .unwrap();
880        assert_eq!(current_version, 1);
881        manager.validate_manifest(&new_metadata, 1);
882
883        // get manifest size
884        let manifest_size = manager.manifest_usage();
885        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
886
887        // update 10 times nop_action to trigger checkpoint
888        for _ in 0..10 {
889            manager
890                .update(
891                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
892                        files_to_add: vec![],
893                        files_to_remove: vec![],
894                        timestamp_ms: None,
895                        compaction_time_window: None,
896                        flushed_entry_id: None,
897                        flushed_sequence: None,
898                        committed_sequence: None,
899                    })]),
900                    RegionRoleState::Leader(RegionLeaderState::Writable),
901                )
902                .await
903                .unwrap();
904        }
905
906        while manager.checkpointer.is_doing_checkpoint() {
907            tokio::time::sleep(Duration::from_millis(10)).await;
908        }
909
910        // check manifest size again
911        let manifest_size = manager.manifest_usage();
912        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
913
914        // Reopen the manager,
915        // we just calculate the size from the latest checkpoint file
916        manager.stop().await;
917        let manager = env
918            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
919            .await
920            .unwrap()
921            .unwrap();
922        manager.validate_manifest(&new_metadata, 11);
923
924        // get manifest size again
925        let manifest_size = manager.manifest_usage();
926        assert_eq!(manifest_size, 1748);
927    }
928}