mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{ensure, OptionExt, ResultExt};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{ManifestVersion, MAX_VERSION, MIN_VERSION};
25
26use crate::error::{
27    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30    RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31    RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35    file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39
40/// Options for [RegionManifestManager].
41#[derive(Debug, Clone)]
42pub struct RegionManifestOptions {
43    /// Directory to store manifest.
44    pub manifest_dir: String,
45    pub object_store: ObjectStore,
46    pub compress_type: CompressionType,
47    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
48    /// Set to 0 to disable checkpoint.
49    pub checkpoint_distance: u64,
50    pub remove_file_options: RemoveFileOptions,
51}
52
53/// Options for updating `removed_files` field in [RegionManifest].
54#[derive(Debug, Clone)]
55pub struct RemoveFileOptions {
56    /// Number of removed files to keep in manifest's `removed_files` field before also
57    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
58    pub keep_count: usize,
59    /// Duration to keep removed files in manifest's `removed_files` field before also
60    /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
61    pub keep_ttl: std::time::Duration,
62}
63
64#[cfg(any(test, feature = "test"))]
65impl Default for RemoveFileOptions {
66    fn default() -> Self {
67        Self {
68            keep_count: 256,
69            keep_ttl: std::time::Duration::from_secs(3600),
70        }
71    }
72}
73
74// rewrite note:
75// trait Checkpoint -> struct RegionCheckpoint
76// trait MetaAction -> struct RegionMetaActionList
77// trait MetaActionIterator -> struct MetaActionIteratorImpl
78
79#[cfg_attr(doc, aquamarine::aquamarine)]
80/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
81/// metadata.
82///
83/// ```mermaid
84/// classDiagram
85/// class RegionManifestManager {
86///     -ManifestObjectStore store
87///     -RegionManifestOptions options
88///     -RegionManifest manifest
89///     +new() RegionManifestManager
90///     +open() Option~RegionManifestManager~
91///     +stop()
92///     +update(RegionMetaActionList action_list) ManifestVersion
93///     +manifest() RegionManifest
94/// }
95/// class ManifestObjectStore {
96///     -ObjectStore object_store
97/// }
98/// class RegionChange {
99///     -RegionMetadataRef metadata
100/// }
101/// class RegionEdit {
102///     -VersionNumber regoin_version
103///     -Vec~FileMeta~ files_to_add
104///     -Vec~FileMeta~ files_to_remove
105///     -SequenceNumber flushed_sequence
106/// }
107/// class RegionRemove {
108///     -RegionId region_id
109/// }
110/// RegionManifestManager o-- ManifestObjectStore
111/// RegionManifestManager o-- RegionManifest
112/// RegionManifestManager o-- RegionManifestOptions
113/// RegionManifestManager -- RegionMetaActionList
114/// RegionManifestManager -- RegionCheckpoint
115/// ManifestObjectStore o-- ObjectStore
116/// RegionMetaActionList o-- RegionMetaAction
117/// RegionMetaAction o-- ProtocolAction
118/// RegionMetaAction o-- RegionChange
119/// RegionMetaAction o-- RegionEdit
120/// RegionMetaAction o-- RegionRemove
121/// RegionChange o-- RegionMetadata
122/// RegionEdit o-- FileMeta
123///
124/// class RegionManifest {
125///     -RegionMetadataRef metadata
126///     -HashMap<FileId, FileMeta> files
127///     -ManifestVersion manifest_version
128/// }
129/// class RegionMetadata
130/// class FileMeta
131/// RegionManifest o-- RegionMetadata
132/// RegionManifest o-- FileMeta
133///
134/// class RegionCheckpoint {
135///     -ManifestVersion last_version
136///     -Option~RegionManifest~ checkpoint
137/// }
138/// RegionCheckpoint o-- RegionManifest
139/// ```
140#[derive(Debug)]
141pub struct RegionManifestManager {
142    store: ManifestObjectStore,
143    last_version: Arc<AtomicU64>,
144    checkpointer: Checkpointer,
145    manifest: Arc<RegionManifest>,
146    stopped: bool,
147}
148
149impl RegionManifestManager {
150    /// Constructs a region's manifest and persist it.
151    pub async fn new(
152        metadata: RegionMetadataRef,
153        flushed_entry_id: u64,
154        options: RegionManifestOptions,
155        total_manifest_size: Arc<AtomicU64>,
156        manifest_version: Arc<AtomicU64>,
157    ) -> Result<Self> {
158        // construct storage
159        let mut store = ManifestObjectStore::new(
160            &options.manifest_dir,
161            options.object_store.clone(),
162            options.compress_type,
163            total_manifest_size,
164        );
165
166        info!(
167            "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
168            options.manifest_dir, metadata, flushed_entry_id
169        );
170
171        let version = MIN_VERSION;
172        let mut manifest_builder = RegionManifestBuilder::default();
173        // set the initial metadata.
174        manifest_builder.apply_change(
175            version,
176            RegionChange {
177                metadata: metadata.clone(),
178            },
179        );
180        let manifest = manifest_builder.try_build()?;
181        let region_id = metadata.region_id;
182
183        debug!(
184            "Build region manifest in {}, manifest: {:?}",
185            options.manifest_dir, manifest
186        );
187
188        let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
189        if flushed_entry_id > 0 {
190            actions.push(RegionMetaAction::Edit(RegionEdit {
191                files_to_add: vec![],
192                files_to_remove: vec![],
193                timestamp_ms: None,
194                compaction_time_window: None,
195                flushed_entry_id: Some(flushed_entry_id),
196                flushed_sequence: None,
197            }));
198        }
199
200        // Persist region change.
201        let action_list = RegionMetaActionList::new(actions);
202
203        // New region is not in staging mode.
204        // TODO(ruihang): add staging mode support if needed.
205        store.save(version, &action_list.encode()?, false).await?;
206
207        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
208        manifest_version.store(version, Ordering::Relaxed);
209        Ok(Self {
210            store,
211            last_version: manifest_version,
212            checkpointer,
213            manifest: Arc::new(manifest),
214            stopped: false,
215        })
216    }
217
218    /// Opens an existing manifest.
219    ///
220    /// Returns `Ok(None)` if no such manifest.
221    pub async fn open(
222        options: RegionManifestOptions,
223        total_manifest_size: Arc<AtomicU64>,
224        manifest_version: Arc<AtomicU64>,
225    ) -> Result<Option<Self>> {
226        let _t = MANIFEST_OP_ELAPSED
227            .with_label_values(&["open"])
228            .start_timer();
229
230        // construct storage
231        let mut store = ManifestObjectStore::new(
232            &options.manifest_dir,
233            options.object_store.clone(),
234            options.compress_type,
235            total_manifest_size,
236        );
237
238        // recover from storage
239        // construct manifest builder
240        // calculate the manifest size from the latest checkpoint
241        let mut version = MIN_VERSION;
242        let checkpoint = Self::last_checkpoint(&mut store).await?;
243        let last_checkpoint_version = checkpoint
244            .as_ref()
245            .map(|(checkpoint, _)| checkpoint.last_version)
246            .unwrap_or(MIN_VERSION);
247        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
248            info!(
249                "Recover region manifest {} from checkpoint version {}",
250                options.manifest_dir, checkpoint.last_version
251            );
252            version = version.max(checkpoint.last_version + 1);
253            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
254        } else {
255            info!(
256                "Checkpoint not found in {}, build manifest from scratch",
257                options.manifest_dir
258            );
259            RegionManifestBuilder::default()
260        };
261
262        // apply actions from storage
263        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
264
265        for (manifest_version, raw_action_list) in manifests {
266            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
267            // set manifest size after last checkpoint
268            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
269            for action in action_list.actions {
270                match action {
271                    RegionMetaAction::Change(action) => {
272                        manifest_builder.apply_change(manifest_version, action);
273                    }
274                    RegionMetaAction::Edit(action) => {
275                        manifest_builder.apply_edit(manifest_version, action);
276                    }
277                    RegionMetaAction::Remove(_) => {
278                        debug!(
279                            "Unhandled action in {}, action: {:?}",
280                            options.manifest_dir, action
281                        );
282                    }
283                    RegionMetaAction::Truncate(action) => {
284                        manifest_builder.apply_truncate(manifest_version, action);
285                    }
286                }
287            }
288        }
289
290        // set the initial metadata if necessary
291        if !manifest_builder.contains_metadata() {
292            debug!("No region manifest in {}", options.manifest_dir);
293            return Ok(None);
294        }
295
296        let manifest = manifest_builder.try_build()?;
297        debug!(
298            "Recovered region manifest from {}, manifest: {:?}",
299            options.manifest_dir, manifest
300        );
301        let version = manifest.manifest_version;
302
303        let checkpointer = Checkpointer::new(
304            manifest.metadata.region_id,
305            options,
306            store.clone(),
307            last_checkpoint_version,
308        );
309        manifest_version.store(version, Ordering::Relaxed);
310        Ok(Some(Self {
311            store,
312            last_version: manifest_version,
313            checkpointer,
314            manifest: Arc::new(manifest),
315            stopped: false,
316        }))
317    }
318
319    /// Stops the manager.
320    pub async fn stop(&mut self) {
321        self.stopped = true;
322    }
323
324    /// Installs the manifest changes from the current version to the target version (inclusive).
325    ///
326    /// Returns installed version.
327    /// **Note**: This function is not guaranteed to install the target version strictly.
328    /// The installed version may be greater than the target version.
329    pub async fn install_manifest_to(
330        &mut self,
331        target_version: ManifestVersion,
332    ) -> Result<ManifestVersion> {
333        let _t = MANIFEST_OP_ELAPSED
334            .with_label_values(&["install_manifest_to"])
335            .start_timer();
336
337        let last_version = self.last_version();
338        // Case 1: If the target version is less than the current version, return the current version.
339        if last_version >= target_version {
340            debug!(
341                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
342                target_version, last_version, self.manifest.metadata.region_id
343            );
344            return Ok(last_version);
345        }
346
347        ensure!(
348            !self.stopped,
349            RegionStoppedSnafu {
350                region_id: self.manifest.metadata.region_id,
351            }
352        );
353
354        let region_id = self.manifest.metadata.region_id;
355        // Fetches manifests from the last version strictly.
356        let mut manifests = self
357            .store
358            // Invariant: last_version < target_version.
359            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
360            .await?;
361
362        // Case 2: No manifests in range: [current_version+1, target_version+1)
363        //
364        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
365        //                                                                    [Leader region]
366        // [Current Version]......[Target Version]
367        // [Follower region]
368        if manifests.is_empty() {
369            info!(
370                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
371                last_version, self.manifest.metadata.region_id
372            );
373            let last_version = self.install_last_checkpoint().await?;
374            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
375            if last_version >= target_version {
376                return Ok(last_version);
377            }
378
379            // Fetches manifests from the installed version strictly.
380            manifests = self
381                .store
382                // Invariant: last_version < target_version.
383                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
384                .await?;
385        }
386
387        if manifests.is_empty() {
388            return NoManifestsSnafu {
389                region_id: self.manifest.metadata.region_id,
390                start_version: last_version + 1,
391                end_version: target_version + 1,
392                last_version,
393            }
394            .fail();
395        }
396
397        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
398        let mut manifest_builder =
399            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
400
401        for (manifest_version, raw_action_list) in manifests {
402            self.store
403                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
404            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
405            for action in action_list.actions {
406                match action {
407                    RegionMetaAction::Change(action) => {
408                        manifest_builder.apply_change(manifest_version, action);
409                    }
410                    RegionMetaAction::Edit(action) => {
411                        manifest_builder.apply_edit(manifest_version, action);
412                    }
413                    RegionMetaAction::Remove(_) => {
414                        debug!(
415                            "Unhandled action for region {}, action: {:?}",
416                            self.manifest.metadata.region_id, action
417                        );
418                    }
419                    RegionMetaAction::Truncate(action) => {
420                        manifest_builder.apply_truncate(manifest_version, action);
421                    }
422                }
423            }
424        }
425
426        let new_manifest = manifest_builder.try_build()?;
427        ensure!(
428            new_manifest.manifest_version >= target_version,
429            InstallManifestToSnafu {
430                region_id: self.manifest.metadata.region_id,
431                target_version,
432                available_version: new_manifest.manifest_version,
433                last_version,
434            }
435        );
436
437        let version = self.last_version();
438        self.manifest = Arc::new(new_manifest);
439        let last_version = self.set_version(self.manifest.manifest_version);
440        info!(
441            "Install manifest changes from {} to {}, region: {}",
442            version, last_version, self.manifest.metadata.region_id
443        );
444
445        Ok(last_version)
446    }
447
448    /// Installs the last checkpoint.
449    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
450        let last_version = self.last_version();
451        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
452        else {
453            return NoCheckpointSnafu {
454                region_id: self.manifest.metadata.region_id,
455                last_version,
456            }
457            .fail();
458        };
459        self.store.reset_manifest_size();
460        self.store
461            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
462        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
463        let manifest = builder.try_build()?;
464        let last_version = self.set_version(manifest.manifest_version);
465        self.manifest = Arc::new(manifest);
466        info!(
467            "Installed region manifest from checkpoint: {}, region: {}",
468            checkpoint.last_version, self.manifest.metadata.region_id
469        );
470
471        Ok(last_version)
472    }
473
474    /// Updates the manifest. Returns the current manifest version number.
475    pub async fn update(
476        &mut self,
477        action_list: RegionMetaActionList,
478        region_state: RegionRoleState,
479    ) -> Result<ManifestVersion> {
480        let _t = MANIFEST_OP_ELAPSED
481            .with_label_values(&["update"])
482            .start_timer();
483
484        ensure!(
485            !self.stopped,
486            RegionStoppedSnafu {
487                region_id: self.manifest.metadata.region_id,
488            }
489        );
490
491        let version = self.increase_version();
492        let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
493        self.store
494            .save(version, &action_list.encode()?, is_staging)
495            .await?;
496
497        let mut manifest_builder =
498            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
499        for action in action_list.actions {
500            match action {
501                RegionMetaAction::Change(action) => {
502                    manifest_builder.apply_change(version, action);
503                }
504                RegionMetaAction::Edit(action) => {
505                    manifest_builder.apply_edit(version, action);
506                }
507                RegionMetaAction::Remove(_) => {
508                    debug!(
509                        "Unhandled action for region {}, action: {:?}",
510                        self.manifest.metadata.region_id, action
511                    );
512                }
513                RegionMetaAction::Truncate(action) => {
514                    manifest_builder.apply_truncate(version, action);
515                }
516            }
517        }
518        let new_manifest = manifest_builder.try_build()?;
519        let updated_manifest = self
520            .checkpointer
521            .update_manifest_removed_files(new_manifest)?;
522        self.manifest = Arc::new(updated_manifest);
523
524        self.checkpointer
525            .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
526
527        Ok(version)
528    }
529
530    /// Retrieves the current [RegionManifest].
531    pub fn manifest(&self) -> Arc<RegionManifest> {
532        self.manifest.clone()
533    }
534
535    /// Returns total manifest size.
536    pub fn manifest_usage(&self) -> u64 {
537        self.store.total_manifest_size()
538    }
539
540    /// Returns true if a newer version manifest file is found.
541    ///
542    /// It is typically used in read-only regions to catch up with manifest.
543    /// It doesn't lock the manifest directory in the object store so the result
544    /// may be inaccurate if there are concurrent writes.
545    pub async fn has_update(&self) -> Result<bool> {
546        let last_version = self.last_version();
547
548        let streamer =
549            self.store
550                .manifest_lister()
551                .await?
552                .context(error::EmptyManifestDirSnafu {
553                    manifest_dir: self.store.manifest_dir(),
554                })?;
555
556        let need_update = streamer
557            .try_any(|entry| async move {
558                let file_name = entry.name();
559                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
560                    let version = file_version(file_name);
561                    if version > last_version {
562                        return true;
563                    }
564                }
565                false
566            })
567            .await
568            .context(error::OpenDalSnafu)?;
569
570        Ok(need_update)
571    }
572
573    /// Increases last version and returns the increased version.
574    fn increase_version(&mut self) -> ManifestVersion {
575        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
576        previous + 1
577    }
578
579    /// Sets the last version.
580    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
581        self.last_version.store(version, Ordering::Relaxed);
582        version
583    }
584
585    fn last_version(&self) -> ManifestVersion {
586        self.last_version.load(Ordering::Relaxed)
587    }
588
589    /// Fetches the last [RegionCheckpoint] from storage.
590    ///
591    /// If the checkpoint is not found, returns `None`.
592    /// Otherwise, returns the checkpoint and the size of the checkpoint.
593    pub(crate) async fn last_checkpoint(
594        store: &mut ManifestObjectStore,
595    ) -> Result<Option<(RegionCheckpoint, u64)>> {
596        let last_checkpoint = store.load_last_checkpoint().await?;
597
598        if let Some((_, bytes)) = last_checkpoint {
599            let checkpoint = RegionCheckpoint::decode(&bytes)?;
600            Ok(Some((checkpoint, bytes.len() as u64)))
601        } else {
602            Ok(None)
603        }
604    }
605
606    pub fn store(&self) -> ManifestObjectStore {
607        self.store.clone()
608    }
609
610    #[cfg(test)]
611    pub(crate) fn checkpointer(&self) -> &Checkpointer {
612        &self.checkpointer
613    }
614}
615
616#[cfg(test)]
617impl RegionManifestManager {
618    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
619        let manifest = self.manifest();
620        assert_eq!(manifest.metadata, *expect);
621        assert_eq!(self.manifest.manifest_version, self.last_version());
622        assert_eq!(last_version, self.last_version());
623    }
624}
625
626#[cfg(test)]
627mod test {
628    use std::time::Duration;
629
630    use api::v1::SemanticType;
631    use common_datasource::compression::CompressionType;
632    use common_test_util::temp_dir::create_temp_dir;
633    use datatypes::prelude::ConcreteDataType;
634    use datatypes::schema::ColumnSchema;
635    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
636
637    use super::*;
638    use crate::manifest::action::{RegionChange, RegionEdit};
639    use crate::manifest::tests::utils::basic_region_metadata;
640    use crate::test_util::TestEnv;
641
642    #[tokio::test]
643    async fn create_manifest_manager() {
644        let metadata = Arc::new(basic_region_metadata());
645        let env = TestEnv::new().await;
646        let manager = env
647            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
648            .await
649            .unwrap()
650            .unwrap();
651
652        manager.validate_manifest(&metadata, 0);
653    }
654
655    #[tokio::test]
656    async fn open_manifest_manager() {
657        let env = TestEnv::new().await;
658        // Try to opens an empty manifest.
659        assert!(env
660            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
661            .await
662            .unwrap()
663            .is_none());
664
665        // Creates a manifest.
666        let metadata = Arc::new(basic_region_metadata());
667        let mut manager = env
668            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
669            .await
670            .unwrap()
671            .unwrap();
672        // Stops it.
673        manager.stop().await;
674
675        // Open it.
676        let manager = env
677            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
678            .await
679            .unwrap()
680            .unwrap();
681
682        manager.validate_manifest(&metadata, 0);
683    }
684
685    #[tokio::test]
686    async fn manifest_with_partition_expr_roundtrip() {
687        let env = TestEnv::new().await;
688        let expr_json =
689            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
690        let mut metadata = basic_region_metadata();
691        metadata.partition_expr = Some(expr_json.to_string());
692        let metadata = Arc::new(metadata);
693        let mut manager = env
694            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
695            .await
696            .unwrap()
697            .unwrap();
698
699        // persisted manifest should contain the same partition_expr JSON
700        let manifest = manager.manifest();
701        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
702
703        manager.stop().await;
704
705        // Reopen and check again
706        let manager = env
707            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
708            .await
709            .unwrap()
710            .unwrap();
711        let manifest = manager.manifest();
712        assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
713    }
714
715    #[tokio::test]
716    async fn region_change_add_column() {
717        let metadata = Arc::new(basic_region_metadata());
718        let env = TestEnv::new().await;
719        let mut manager = env
720            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
721            .await
722            .unwrap()
723            .unwrap();
724
725        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
726        new_metadata_builder.push_column_metadata(ColumnMetadata {
727            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
728            semantic_type: SemanticType::Field,
729            column_id: 252,
730        });
731        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
732
733        let action_list =
734            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
735                metadata: new_metadata.clone(),
736            }));
737
738        let current_version = manager
739            .update(
740                action_list,
741                RegionRoleState::Leader(RegionLeaderState::Writable),
742            )
743            .await
744            .unwrap();
745        assert_eq!(current_version, 1);
746        manager.validate_manifest(&new_metadata, 1);
747
748        // Reopen the manager.
749        manager.stop().await;
750        let manager = env
751            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
752            .await
753            .unwrap()
754            .unwrap();
755        manager.validate_manifest(&new_metadata, 1);
756    }
757
758    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
759    async fn manifest_dir_usage(path: &str) -> u64 {
760        let mut size = 0;
761        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
762        while let Ok(dir_entry) = read_dir.next_entry().await {
763            let Some(entry) = dir_entry else {
764                break;
765            };
766            if entry.file_type().await.unwrap().is_file() {
767                let file_name = entry.file_name().into_string().unwrap();
768                if file_name.contains(".checkpoint") || file_name.contains(".json") {
769                    let file_size = entry.metadata().await.unwrap().len() as usize;
770                    debug!("File: {file_name:?}, size: {file_size}");
771                    size += file_size;
772                }
773            }
774        }
775        size as u64
776    }
777
778    #[tokio::test]
779    async fn test_manifest_size() {
780        let metadata = Arc::new(basic_region_metadata());
781        let data_home = create_temp_dir("");
782        let data_home_path = data_home.path().to_str().unwrap().to_string();
783        let env = TestEnv::with_data_home(data_home).await;
784
785        let manifest_dir = format!("{}/manifest", data_home_path);
786
787        let mut manager = env
788            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
789            .await
790            .unwrap()
791            .unwrap();
792
793        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
794        new_metadata_builder.push_column_metadata(ColumnMetadata {
795            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
796            semantic_type: SemanticType::Field,
797            column_id: 252,
798        });
799        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
800
801        let action_list =
802            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
803                metadata: new_metadata.clone(),
804            }));
805
806        let current_version = manager
807            .update(
808                action_list,
809                RegionRoleState::Leader(RegionLeaderState::Writable),
810            )
811            .await
812            .unwrap();
813        assert_eq!(current_version, 1);
814        manager.validate_manifest(&new_metadata, 1);
815
816        // get manifest size
817        let manifest_size = manager.manifest_usage();
818        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
819
820        // update 10 times nop_action to trigger checkpoint
821        for _ in 0..10 {
822            manager
823                .update(
824                    RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
825                        files_to_add: vec![],
826                        files_to_remove: vec![],
827                        timestamp_ms: None,
828                        compaction_time_window: None,
829                        flushed_entry_id: None,
830                        flushed_sequence: None,
831                    })]),
832                    RegionRoleState::Leader(RegionLeaderState::Writable),
833                )
834                .await
835                .unwrap();
836        }
837
838        while manager.checkpointer.is_doing_checkpoint() {
839            tokio::time::sleep(Duration::from_millis(10)).await;
840        }
841
842        // check manifest size again
843        let manifest_size = manager.manifest_usage();
844        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
845
846        // Reopen the manager,
847        // we just calculate the size from the latest checkpoint file
848        manager.stop().await;
849        let manager = env
850            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
851            .await
852            .unwrap()
853            .unwrap();
854        manager.validate_manifest(&new_metadata, 11);
855
856        // get manifest size again
857        let manifest_size = manager.manifest_usage();
858        assert_eq!(manifest_size, 1669);
859    }
860}