mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::error::{
27    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31    RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35    ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39
40/// Options for [RegionManifestManager].
41#[derive(Debug, Clone)]
42pub struct RegionManifestOptions {
43    /// Directory to store manifest.
44    pub manifest_dir: String,
45    pub object_store: ObjectStore,
46    pub compress_type: CompressionType,
47    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
48    /// Set to 0 to disable checkpoint.
49    pub checkpoint_distance: u64,
50    pub remove_file_options: RemoveFileOptions,
51}
52
53/// Options for updating `removed_files` field in [RegionManifest].
54#[derive(Debug, Clone)]
55pub struct RemoveFileOptions {
56    /// Number of removed files to keep in manifest's `removed_files` field before also
57    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
58    pub keep_count: usize,
59    /// Duration to keep removed files in manifest's `removed_files` field before also
60    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
61    pub keep_ttl: std::time::Duration,
62}
63
64#[cfg(any(test, feature = "test"))]
65impl Default for RemoveFileOptions {
66    fn default() -> Self {
67        Self {
68            keep_count: 256,
69            keep_ttl: std::time::Duration::from_secs(3600),
70        }
71    }
72}
73
74// rewrite note:
75// trait Checkpoint -> struct RegionCheckpoint
76// trait MetaAction -> struct RegionMetaActionList
77// trait MetaActionIterator -> struct MetaActionIteratorImpl
78
79#[cfg_attr(doc, aquamarine::aquamarine)]
80/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
81/// metadata.
82///
83/// ```mermaid
84/// classDiagram
85/// class RegionManifestManager {
86///     -ManifestObjectStore store
87///     -RegionManifestOptions options
88///     -RegionManifest manifest
89///     +new() RegionManifestManager
90///     +open() Option~RegionManifestManager~
91///     +stop()
92///     +update(RegionMetaActionList action_list) ManifestVersion
93///     +manifest() RegionManifest
94/// }
95/// class ManifestObjectStore {
96///     -ObjectStore object_store
97/// }
98/// class RegionChange {
99///     -RegionMetadataRef metadata
100/// }
101/// class RegionEdit {
102///     -VersionNumber regoin_version
103///     -Vec~FileMeta~ files_to_add
104///     -Vec~FileMeta~ files_to_remove
105///     -SequenceNumber flushed_sequence
106/// }
107/// class RegionRemove {
108///     -RegionId region_id
109/// }
110/// RegionManifestManager o-- ManifestObjectStore
111/// RegionManifestManager o-- RegionManifest
112/// RegionManifestManager o-- RegionManifestOptions
113/// RegionManifestManager -- RegionMetaActionList
114/// RegionManifestManager -- RegionCheckpoint
115/// ManifestObjectStore o-- ObjectStore
116/// RegionMetaActionList o-- RegionMetaAction
117/// RegionMetaAction o-- ProtocolAction
118/// RegionMetaAction o-- RegionChange
119/// RegionMetaAction o-- RegionEdit
120/// RegionMetaAction o-- RegionRemove
121/// RegionChange o-- RegionMetadata
122/// RegionEdit o-- FileMeta
123///
124/// class RegionManifest {
125///     -RegionMetadataRef metadata
126///     -HashMap<FileId, FileMeta> files
127///     -ManifestVersion manifest_version
128/// }
129/// class RegionMetadata
130/// class FileMeta
131/// RegionManifest o-- RegionMetadata
132/// RegionManifest o-- FileMeta
133///
134/// class RegionCheckpoint {
135///     -ManifestVersion last_version
136///     -Option~RegionManifest~ checkpoint
137/// }
138/// RegionCheckpoint o-- RegionManifest
139/// ```
140#[derive(Debug)]
141pub struct RegionManifestManager {
142    store: ManifestObjectStore,
143    last_version: Arc<AtomicU64>,
144    checkpointer: Checkpointer,
145    manifest: Arc<RegionManifest>,
146    stopped: bool,
147}
148
149impl RegionManifestManager {
150    /// Constructs a region's manifest and persist it.
151    pub async fn new(
152        metadata: RegionMetadataRef,
153        flushed_entry_id: u64,
154        options: RegionManifestOptions,
155        total_manifest_size: Arc<AtomicU64>,
156        manifest_version: Arc<AtomicU64>,
157    ) -> Result<Self> {
158        // construct storage
159        let mut store = ManifestObjectStore::new(
160            &options.manifest_dir,
161            options.object_store.clone(),
162            options.compress_type,
163            total_manifest_size,
164        );
165
166        info!(
167            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
168            options.manifest_dir, metadata, flushed_entry_id
169        );
170
171        let version = MIN_VERSION;
172        let mut manifest_builder = RegionManifestBuilder::default();
173        // set the initial metadata.
174        manifest_builder.apply_change(
175            version,
176            RegionChange {
177                metadata: metadata.clone(),
178            },
179        );
180        let manifest = manifest_builder.try_build()?;
181        let region_id = metadata.region_id;
182
183        debug!(
184            "Build region manifest in {}, manifest: {:?}",
185            options.manifest_dir, manifest
186        );
187
188        let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
189        if flushed_entry_id > 0 {
190            actions.push(RegionMetaAction::Edit(RegionEdit {
191                files_to_add: vec![],
192                files_to_remove: vec![],
193                timestamp_ms: None,
194                compaction_time_window: None,
195                flushed_entry_id: Some(flushed_entry_id),
196                flushed_sequence: None,
197                committed_sequence: None,
198            }));
199        }
200
201        // Persist region change.
202        let action_list = RegionMetaActionList::new(actions);
203
204        // New region is not in staging mode.
205        // TODO(ruihang): add staging mode support if needed.
206        store.save(version, &action_list.encode()?, false).await?;
207
208        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
209        manifest_version.store(version, Ordering::Relaxed);
210        Ok(Self {
211            store,
212            last_version: manifest_version,
213            checkpointer,
214            manifest: Arc::new(manifest),
215            stopped: false,
216        })
217    }
218
219    /// Opens an existing manifest.
220    ///
221    /// Returns `Ok(None)` if no such manifest.
222    pub async fn open(
223        options: RegionManifestOptions,
224        total_manifest_size: Arc<AtomicU64>,
225        manifest_version: Arc<AtomicU64>,
226    ) -> Result<Option<Self>> {
227        let _t = MANIFEST_OP_ELAPSED
228            .with_label_values(&["open"])
229            .start_timer();
230
231        // construct storage
232        let mut store = ManifestObjectStore::new(
233            &options.manifest_dir,
234            options.object_store.clone(),
235            options.compress_type,
236            total_manifest_size,
237        );
238
239        // recover from storage
240        // construct manifest builder
241        // calculate the manifest size from the latest checkpoint
242        let mut version = MIN_VERSION;
243        let checkpoint = Self::last_checkpoint(&mut store).await?;
244        let last_checkpoint_version = checkpoint
245            .as_ref()
246            .map(|(checkpoint, _)| checkpoint.last_version)
247            .unwrap_or(MIN_VERSION);
248        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
249            info!(
250                "Recover region manifest {} from checkpoint version {}",
251                options.manifest_dir, checkpoint.last_version
252            );
253            version = version.max(checkpoint.last_version + 1);
254            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
255        } else {
256            info!(
257                "Checkpoint not found in {}, build manifest from scratch",
258                options.manifest_dir
259            );
260            RegionManifestBuilder::default()
261        };
262
263        // apply actions from storage
264        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
265
266        for (manifest_version, raw_action_list) in manifests {
267            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
268            // set manifest size after last checkpoint
269            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
270            for action in action_list.actions {
271                match action {
272                    RegionMetaAction::Change(action) => {
273                        manifest_builder.apply_change(manifest_version, action);
274                    }
275                    RegionMetaAction::Edit(action) => {
276                        manifest_builder.apply_edit(manifest_version, action);
277                    }
278                    RegionMetaAction::Remove(_) => {
279                        debug!(
280                            "Unhandled action in {}, action: {:?}",
281                            options.manifest_dir, action
282                        );
283                    }
284                    RegionMetaAction::Truncate(action) => {
285                        manifest_builder.apply_truncate(manifest_version, action);
286                    }
287                }
288            }
289        }
290
291        // set the initial metadata if necessary
292        if !manifest_builder.contains_metadata() {
293            debug!("No region manifest in {}", options.manifest_dir);
294            return Ok(None);
295        }
296
297        let manifest = manifest_builder.try_build()?;
298        debug!(
299            "Recovered region manifest from {}, manifest: {:?}",
300            options.manifest_dir, manifest
301        );
302        let version = manifest.manifest_version;
303
304        let checkpointer = Checkpointer::new(
305            manifest.metadata.region_id,
306            options,
307            store.clone(),
308            last_checkpoint_version,
309        );
310        manifest_version.store(version, Ordering::Relaxed);
311        Ok(Some(Self {
312            store,
313            last_version: manifest_version,
314            checkpointer,
315            manifest: Arc::new(manifest),
316            stopped: false,
317        }))
318    }
319
320    /// Stops the manager.
321    pub async fn stop(&mut self) {
322        self.stopped = true;
323    }
324
325    /// Installs the manifest changes from the current version to the target version (inclusive).
326    ///
327    /// Returns installed version.
328    /// **Note**: This function is not guaranteed to install the target version strictly.
329    /// The installed version may be greater than the target version.
330    pub async fn install_manifest_to(
331        &mut self,
332        target_version: ManifestVersion,
333    ) -> Result<ManifestVersion> {
334        let _t = MANIFEST_OP_ELAPSED
335            .with_label_values(&["install_manifest_to"])
336            .start_timer();
337
338        let last_version = self.last_version();
339        // Case 1: If the target version is less than the current version, return the current version.
340        if last_version >= target_version {
341            debug!(
342                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
343                target_version, last_version, self.manifest.metadata.region_id
344            );
345            return Ok(last_version);
346        }
347
348        ensure!(
349            !self.stopped,
350            RegionStoppedSnafu {
351                region_id: self.manifest.metadata.region_id,
352            }
353        );
354
355        let region_id = self.manifest.metadata.region_id;
356        // Fetches manifests from the last version strictly.
357        let mut manifests = self
358            .store
359            // Invariant: last_version < target_version.
360            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
361            .await?;
362
363        // Case 2: No manifests in range: [current_version+1, target_version+1)
364        //
365        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
366        //                                                                    [Leader region]
367        // [Current Version]......[Target Version]
368        // [Follower region]
369        if manifests.is_empty() {
370            info!(
371                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
372                last_version, self.manifest.metadata.region_id
373            );
374            let last_version = self.install_last_checkpoint().await?;
375            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
376            if last_version >= target_version {
377                return Ok(last_version);
378            }
379
380            // Fetches manifests from the installed version strictly.
381            manifests = self
382                .store
383                // Invariant: last_version < target_version.
384                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
385                .await?;
386        }
387
388        if manifests.is_empty() {
389            return NoManifestsSnafu {
390                region_id: self.manifest.metadata.region_id,
391                start_version: last_version + 1,
392                end_version: target_version + 1,
393                last_version,
394            }
395            .fail();
396        }
397
398        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
399        let mut manifest_builder =
400            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
401
402        for (manifest_version, raw_action_list) in manifests {
403            self.store
404                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
405            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
406            for action in action_list.actions {
407                match action {
408                    RegionMetaAction::Change(action) => {
409                        manifest_builder.apply_change(manifest_version, action);
410                    }
411                    RegionMetaAction::Edit(action) => {
412                        manifest_builder.apply_edit(manifest_version, action);
413                    }
414                    RegionMetaAction::Remove(_) => {
415                        debug!(
416                            "Unhandled action for region {}, action: {:?}",
417                            self.manifest.metadata.region_id, action
418                        );
419                    }
420                    RegionMetaAction::Truncate(action) => {
421                        manifest_builder.apply_truncate(manifest_version, action);
422                    }
423                }
424            }
425        }
426
427        let new_manifest = manifest_builder.try_build()?;
428        ensure!(
429            new_manifest.manifest_version >= target_version,
430            InstallManifestToSnafu {
431                region_id: self.manifest.metadata.region_id,
432                target_version,
433                available_version: new_manifest.manifest_version,
434                last_version,
435            }
436        );
437
438        let version = self.last_version();
439        self.manifest = Arc::new(new_manifest);
440        let last_version = self.set_version(self.manifest.manifest_version);
441        info!(
442            "Install manifest changes from {} to {}, region: {}",
443            version, last_version, self.manifest.metadata.region_id
444        );
445
446        Ok(last_version)
447    }
448
449    /// Installs the last checkpoint.
450    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
451        let last_version = self.last_version();
452        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
453        else {
454            return NoCheckpointSnafu {
455                region_id: self.manifest.metadata.region_id,
456                last_version,
457            }
458            .fail();
459        };
460        self.store.reset_manifest_size();
461        self.store
462            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
463        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
464        let manifest = builder.try_build()?;
465        let last_version = self.set_version(manifest.manifest_version);
466        self.manifest = Arc::new(manifest);
467        info!(
468            "Installed region manifest from checkpoint: {}, region: {}",
469            checkpoint.last_version, self.manifest.metadata.region_id
470        );
471
472        Ok(last_version)
473    }
474
475    /// Updates the manifest. Returns the current manifest version number.
476    pub async fn update(
477        &mut self,
478        action_list: RegionMetaActionList,
479        region_state: RegionRoleState,
480    ) -> Result<ManifestVersion> {
481        let _t = MANIFEST_OP_ELAPSED
482            .with_label_values(&["update"])
483            .start_timer();
484
485        ensure!(
486            !self.stopped,
487            RegionStoppedSnafu {
488                region_id: self.manifest.metadata.region_id,
489            }
490        );
491
492        let version = self.increase_version();
493        let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
494        self.store
495            .save(version, &action_list.encode()?, is_staging)
496            .await?;
497
498        let mut manifest_builder =
499            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
500        for action in action_list.actions {
501            match action {
502                RegionMetaAction::Change(action) => {
503                    manifest_builder.apply_change(version, action);
504                }
505                RegionMetaAction::Edit(action) => {
506                    manifest_builder.apply_edit(version, action);
507                }
508                RegionMetaAction::Remove(_) => {
509                    debug!(
510                        "Unhandled action for region {}, action: {:?}",
511                        self.manifest.metadata.region_id, action
512                    );
513                }
514                RegionMetaAction::Truncate(action) => {
515                    manifest_builder.apply_truncate(version, action);
516                }
517            }
518        }
519        let new_manifest = manifest_builder.try_build()?;
520        let updated_manifest = self
521            .checkpointer
522            .update_manifest_removed_files(new_manifest)?;
523        self.manifest = Arc::new(updated_manifest);
524
525        self.checkpointer
526            .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
527
528        Ok(version)
529    }
530
531    /// Retrieves the current [RegionManifest].
532    pub fn manifest(&self) -> Arc<RegionManifest> {
533        self.manifest.clone()
534    }
535
536    /// Returns total manifest size.
537    pub fn manifest_usage(&self) -> u64 {
538        self.store.total_manifest_size()
539    }
540
541    /// Returns true if a newer version manifest file is found.
542    ///
543    /// It is typically used in read-only regions to catch up with manifest.
544    /// It doesn't lock the manifest directory in the object store so the result
545    /// may be inaccurate if there are concurrent writes.
546    pub async fn has_update(&self) -> Result<bool> {
547        let last_version = self.last_version();
548
549        let streamer =
550            self.store
551                .manifest_lister(false)
552                .await?
553                .context(error::EmptyManifestDirSnafu {
554                    manifest_dir: self.store.manifest_dir(),
555                })?;
556
557        let need_update = streamer
558            .try_any(|entry| async move {
559                let file_name = entry.name();
560                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
561                    let version = file_version(file_name);
562                    if version > last_version {
563                        return true;
564                    }
565                }
566                false
567            })
568            .await
569            .context(error::OpenDalSnafu)?;
570
571        Ok(need_update)
572    }
573
574    /// Increases last version and returns the increased version.
575    fn increase_version(&mut self) -> ManifestVersion {
576        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
577        previous + 1
578    }
579
580    /// Sets the last version.
581    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
582        self.last_version.store(version, Ordering::Relaxed);
583        version
584    }
585
586    fn last_version(&self) -> ManifestVersion {
587        self.last_version.load(Ordering::Relaxed)
588    }
589
590    /// Fetches the last [RegionCheckpoint] from storage.
591    ///
592    /// If the checkpoint is not found, returns `None`.
593    /// Otherwise, returns the checkpoint and the size of the checkpoint.
594    pub(crate) async fn last_checkpoint(
595        store: &mut ManifestObjectStore,
596    ) -> Result<Option<(RegionCheckpoint, u64)>> {
597        let last_checkpoint = store.load_last_checkpoint().await?;
598
599        if let Some((_, bytes)) = last_checkpoint {
600            let checkpoint = RegionCheckpoint::decode(&bytes)?;
601            Ok(Some((checkpoint, bytes.len() as u64)))
602        } else {
603            Ok(None)
604        }
605    }
606
607    pub fn store(&self) -> ManifestObjectStore {
608        self.store.clone()
609    }
610
611    #[cfg(test)]
612    pub(crate) fn checkpointer(&self) -> &Checkpointer {
613        &self.checkpointer
614    }
615
616    /// Merge all staged manifest actions into a single action list ready for submission.
617    /// This collects all staging manifests, applies them sequentially, and returns the merged actions.
618    pub(crate) async fn merge_staged_actions(
619        &mut self,
620        region_state: RegionRoleState,
621    ) -> Result<Option<RegionMetaActionList>> {
622        // Only merge if we're in staging mode
623        if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
624            return Ok(None);
625        }
626
627        // Fetch all staging manifests
628        let staging_manifests = self.store.fetch_staging_manifests().await?;
629
630        if staging_manifests.is_empty() {
631            info!(
632                "No staging manifests to merge for region {}",
633                self.manifest.metadata.region_id
634            );
635            return Ok(None);
636        }
637
638        info!(
639            "Merging {} staging manifests for region {}",
640            staging_manifests.len(),
641            self.manifest.metadata.region_id
642        );
643
644        // Start with current manifest state as the base
645        let mut merged_actions = Vec::new();
646        let mut latest_version = self.last_version();
647
648        // Apply all staging actions in order
649        for (manifest_version, raw_action_list) in staging_manifests {
650            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
651
652            for action in action_list.actions {
653                merged_actions.push(action);
654            }
655
656            latest_version = latest_version.max(manifest_version);
657        }
658
659        if merged_actions.is_empty() {
660            return Ok(None);
661        }
662
663        info!(
664            "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
665            merged_actions.len(),
666            self.manifest.metadata.region_id,
667            latest_version
668        );
669
670        Ok(Some(RegionMetaActionList::new(merged_actions)))
671    }
672}
673
674#[cfg(test)]
675impl RegionManifestManager {
676    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
677        let manifest = self.manifest();
678        assert_eq!(manifest.metadata, *expect);
679        assert_eq!(self.manifest.manifest_version, self.last_version());
680        assert_eq!(last_version, self.last_version());
681    }
682}
683
684#[cfg(test)]
685mod test {
686    use std::time::Duration;
687
688    use api::v1::SemanticType;
689    use common_datasource::compression::CompressionType;
690    use common_test_util::temp_dir::create_temp_dir;
691    use datatypes::prelude::ConcreteDataType;
692    use datatypes::schema::ColumnSchema;
693    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
694
695    use super::*;
696    use crate::manifest::action::{RegionChange, RegionEdit};
697    use crate::manifest::tests::utils::basic_region_metadata;
698    use crate::test_util::TestEnv;
699
700    #[tokio::test]
701    async fn create_manifest_manager() {
702        let metadata = Arc::new(basic_region_metadata());
703        let env = TestEnv::new().await;
704        let manager = env
705            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
706            .await
707            .unwrap()
708            .unwrap();
709
710        manager.validate_manifest(&metadata, 0);
711    }
712
713    #[tokio::test]
714    async fn open_manifest_manager() {
715        let env = TestEnv::new().await;
716        // Try to opens an empty manifest.
717        assert!(
718            env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
719                .await
720                .unwrap()
721                .is_none()
722        );
723
724        // Creates a manifest.
725        let metadata = Arc::new(basic_region_metadata());
726        let mut manager = env
727            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
728            .await
729            .unwrap()
730            .unwrap();
731        // Stops it.
732        manager.stop().await;
733
734        // Open it.
735        let manager = env
736            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
737            .await
738            .unwrap()
739            .unwrap();
740
741        manager.validate_manifest(&metadata, 0);
742    }
743
744    #[tokio::test]
745    async fn manifest_with_partition_expr_roundtrip() {
746        let env = TestEnv::new().await;
747        let expr_json =
748            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
749        let mut metadata = basic_region_metadata();
750        metadata.partition_expr = Some(expr_json.to_string());
751        let metadata = Arc::new(metadata);
752        let mut manager = env
753            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
754            .await
755            .unwrap()
756            .unwrap();
757
758        // persisted manifest should contain the same partition_expr JSON
759        let manifest = manager.manifest();
760        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
761
762        manager.stop().await;
763
764        // Reopen and check again
765        let manager = env
766            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
767            .await
768            .unwrap()
769            .unwrap();
770        let manifest = manager.manifest();
771        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
772    }
773
774    #[tokio::test]
775    async fn region_change_add_column() {
776        let metadata = Arc::new(basic_region_metadata());
777        let env = TestEnv::new().await;
778        let mut manager = env
779            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
780            .await
781            .unwrap()
782            .unwrap();
783
784        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
785        new_metadata_builder.push_column_metadata(ColumnMetadata {
786            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
787            semantic_type: SemanticType::Field,
788            column_id: 252,
789        });
790        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
791
792        let action_list =
793            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
794                metadata: new_metadata.clone(),
795            }));
796
797        let current_version = manager
798            .update(
799                action_list,
800                RegionRoleState::Leader(RegionLeaderState::Writable),
801            )
802            .await
803            .unwrap();
804        assert_eq!(current_version, 1);
805        manager.validate_manifest(&new_metadata, 1);
806
807        // Reopen the manager.
808        manager.stop().await;
809        let manager = env
810            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
811            .await
812            .unwrap()
813            .unwrap();
814        manager.validate_manifest(&new_metadata, 1);
815    }
816
817    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
818    async fn manifest_dir_usage(path: &str) -> u64 {
819        let mut size = 0;
820        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
821        while let Ok(dir_entry) = read_dir.next_entry().await {
822            let Some(entry) = dir_entry else {
823                break;
824            };
825            if entry.file_type().await.unwrap().is_file() {
826                let file_name = entry.file_name().into_string().unwrap();
827                if file_name.contains(".checkpoint") || file_name.contains(".json") {
828                    let file_size = entry.metadata().await.unwrap().len() as usize;
829                    debug!("File: {file_name:?}, size: {file_size}");
830                    size += file_size;
831                }
832            }
833        }
834        size as u64
835    }
836
837    #[tokio::test]
838    async fn test_manifest_size() {
839        let metadata = Arc::new(basic_region_metadata());
840        let data_home = create_temp_dir("");
841        let data_home_path = data_home.path().to_str().unwrap().to_string();
842        let env = TestEnv::with_data_home(data_home).await;
843
844        let manifest_dir = format!("{}/manifest", data_home_path);
845
846        let mut manager = env
847            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
848            .await
849            .unwrap()
850            .unwrap();
851
852        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
853        new_metadata_builder.push_column_metadata(ColumnMetadata {
854            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
855            semantic_type: SemanticType::Field,
856            column_id: 252,
857        });
858        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
859
860        let action_list =
861            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
862                metadata: new_metadata.clone(),
863            }));
864
865        let current_version = manager
866            .update(
867                action_list,
868                RegionRoleState::Leader(RegionLeaderState::Writable),
869            )
870            .await
871            .unwrap();
872        assert_eq!(current_version, 1);
873        manager.validate_manifest(&new_metadata, 1);
874
875        // get manifest size
876        let manifest_size = manager.manifest_usage();
877        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
878
879        // update 10 times nop_action to trigger checkpoint
880        for _ in 0..10 {
881            manager
882                .update(
883                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
884                        files_to_add: vec![],
885                        files_to_remove: vec![],
886                        timestamp_ms: None,
887                        compaction_time_window: None,
888                        flushed_entry_id: None,
889                        flushed_sequence: None,
890                        committed_sequence: None,
891                    })]),
892                    RegionRoleState::Leader(RegionLeaderState::Writable),
893                )
894                .await
895                .unwrap();
896        }
897
898        while manager.checkpointer.is_doing_checkpoint() {
899            tokio::time::sleep(Duration::from_millis(10)).await;
900        }
901
902        // check manifest size again
903        let manifest_size = manager.manifest_usage();
904        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
905
906        // Reopen the manager,
907        // we just calculate the size from the latest checkpoint file
908        manager.stop().await;
909        let manager = env
910            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
911            .await
912            .unwrap()
913            .unwrap();
914        manager.validate_manifest(&new_metadata, 11);
915
916        // get manifest size again
917        let manifest_size = manager.manifest_usage();
918        assert_eq!(manifest_size, 1721);
919    }
920}