mito2/manifest/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{ensure, OptionExt, ResultExt};
23use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
24use store_api::metadata::RegionMetadataRef;
25
26use crate::error::{
27    self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30    RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
31    RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35    file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38
39/// Options for [RegionManifestManager].
40#[derive(Debug, Clone)]
41pub struct RegionManifestOptions {
42    /// Directory to store manifest.
43    pub manifest_dir: String,
44    pub object_store: ObjectStore,
45    pub compress_type: CompressionType,
46    /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
47    /// Set to 0 to disable checkpoint.
48    pub checkpoint_distance: u64,
49}
50
51// rewrite note:
52// trait Checkpoint -> struct RegionCheckpoint
53// trait MetaAction -> struct RegionMetaActionList
54// trait MetaActionIterator -> struct MetaActionIteratorImpl
55
56#[cfg_attr(doc, aquamarine::aquamarine)]
57/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
58/// metadata.
59///
60/// ```mermaid
61/// classDiagram
62/// class RegionManifestManager {
63///     -ManifestObjectStore store
64///     -RegionManifestOptions options
65///     -RegionManifest manifest
66///     +new() RegionManifestManager
67///     +open() Option~RegionManifestManager~
68///     +stop()
69///     +update(RegionMetaActionList action_list) ManifestVersion
70///     +manifest() RegionManifest
71/// }
72/// class ManifestObjectStore {
73///     -ObjectStore object_store
74/// }
75/// class RegionChange {
76///     -RegionMetadataRef metadata
77/// }
78/// class RegionEdit {
79///     -VersionNumber regoin_version
80///     -Vec~FileMeta~ files_to_add
81///     -Vec~FileMeta~ files_to_remove
82///     -SequenceNumber flushed_sequence
83/// }
84/// class RegionRemove {
85///     -RegionId region_id
86/// }
87/// RegionManifestManager o-- ManifestObjectStore
88/// RegionManifestManager o-- RegionManifest
89/// RegionManifestManager o-- RegionManifestOptions
90/// RegionManifestManager -- RegionMetaActionList
91/// RegionManifestManager -- RegionCheckpoint
92/// ManifestObjectStore o-- ObjectStore
93/// RegionMetaActionList o-- RegionMetaAction
94/// RegionMetaAction o-- ProtocolAction
95/// RegionMetaAction o-- RegionChange
96/// RegionMetaAction o-- RegionEdit
97/// RegionMetaAction o-- RegionRemove
98/// RegionChange o-- RegionMetadata
99/// RegionEdit o-- FileMeta
100///
101/// class RegionManifest {
102///     -RegionMetadataRef metadata
103///     -HashMap<FileId, FileMeta> files
104///     -ManifestVersion manifest_version
105/// }
106/// class RegionMetadata
107/// class FileMeta
108/// RegionManifest o-- RegionMetadata
109/// RegionManifest o-- FileMeta
110///
111/// class RegionCheckpoint {
112///     -ManifestVersion last_version
113///     -Option~RegionManifest~ checkpoint
114/// }
115/// RegionCheckpoint o-- RegionManifest
116/// ```
117#[derive(Debug)]
118pub struct RegionManifestManager {
119    store: ManifestObjectStore,
120    last_version: Arc<AtomicU64>,
121    checkpointer: Checkpointer,
122    manifest: Arc<RegionManifest>,
123    stopped: bool,
124}
125
126impl RegionManifestManager {
127    /// Constructs a region's manifest and persist it.
128    pub async fn new(
129        metadata: RegionMetadataRef,
130        options: RegionManifestOptions,
131        total_manifest_size: Arc<AtomicU64>,
132        manifest_version: Arc<AtomicU64>,
133    ) -> Result<Self> {
134        // construct storage
135        let mut store = ManifestObjectStore::new(
136            &options.manifest_dir,
137            options.object_store.clone(),
138            options.compress_type,
139            total_manifest_size,
140        );
141
142        info!(
143            "Creating region manifest in {} with metadata {:?}",
144            options.manifest_dir, metadata
145        );
146
147        let version = MIN_VERSION;
148        let mut manifest_builder = RegionManifestBuilder::default();
149        // set the initial metadata.
150        manifest_builder.apply_change(
151            version,
152            RegionChange {
153                metadata: metadata.clone(),
154            },
155        );
156        let manifest = manifest_builder.try_build()?;
157        let region_id = metadata.region_id;
158
159        debug!(
160            "Build region manifest in {}, manifest: {:?}",
161            options.manifest_dir, manifest
162        );
163
164        // Persist region change.
165        let action_list =
166            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
167        store.save(version, &action_list.encode()?).await?;
168
169        let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
170        manifest_version.store(version, Ordering::Relaxed);
171        Ok(Self {
172            store,
173            last_version: manifest_version,
174            checkpointer,
175            manifest: Arc::new(manifest),
176            stopped: false,
177        })
178    }
179
180    /// Opens an existing manifest.
181    ///
182    /// Returns `Ok(None)` if no such manifest.
183    pub async fn open(
184        options: RegionManifestOptions,
185        total_manifest_size: Arc<AtomicU64>,
186        manifest_version: Arc<AtomicU64>,
187    ) -> Result<Option<Self>> {
188        let _t = MANIFEST_OP_ELAPSED
189            .with_label_values(&["open"])
190            .start_timer();
191
192        // construct storage
193        let mut store = ManifestObjectStore::new(
194            &options.manifest_dir,
195            options.object_store.clone(),
196            options.compress_type,
197            total_manifest_size,
198        );
199
200        // recover from storage
201        // construct manifest builder
202        // calculate the manifest size from the latest checkpoint
203        let mut version = MIN_VERSION;
204        let checkpoint = Self::last_checkpoint(&mut store).await?;
205        let last_checkpoint_version = checkpoint
206            .as_ref()
207            .map(|(checkpoint, _)| checkpoint.last_version)
208            .unwrap_or(MIN_VERSION);
209        let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
210            info!(
211                "Recover region manifest {} from checkpoint version {}",
212                options.manifest_dir, checkpoint.last_version
213            );
214            version = version.max(checkpoint.last_version + 1);
215            RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
216        } else {
217            info!(
218                "Checkpoint not found in {}, build manifest from scratch",
219                options.manifest_dir
220            );
221            RegionManifestBuilder::default()
222        };
223
224        // apply actions from storage
225        let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
226
227        for (manifest_version, raw_action_list) in manifests {
228            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
229            // set manifest size after last checkpoint
230            store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
231            for action in action_list.actions {
232                match action {
233                    RegionMetaAction::Change(action) => {
234                        manifest_builder.apply_change(manifest_version, action);
235                    }
236                    RegionMetaAction::Edit(action) => {
237                        manifest_builder.apply_edit(manifest_version, action);
238                    }
239                    RegionMetaAction::Remove(_) => {
240                        debug!(
241                            "Unhandled action in {}, action: {:?}",
242                            options.manifest_dir, action
243                        );
244                    }
245                    RegionMetaAction::Truncate(action) => {
246                        manifest_builder.apply_truncate(manifest_version, action);
247                    }
248                }
249            }
250        }
251
252        // set the initial metadata if necessary
253        if !manifest_builder.contains_metadata() {
254            debug!("No region manifest in {}", options.manifest_dir);
255            return Ok(None);
256        }
257
258        let manifest = manifest_builder.try_build()?;
259        debug!(
260            "Recovered region manifest from {}, manifest: {:?}",
261            options.manifest_dir, manifest
262        );
263        let version = manifest.manifest_version;
264
265        let checkpointer = Checkpointer::new(
266            manifest.metadata.region_id,
267            options,
268            store.clone(),
269            last_checkpoint_version,
270        );
271        manifest_version.store(version, Ordering::Relaxed);
272        Ok(Some(Self {
273            store,
274            last_version: manifest_version,
275            checkpointer,
276            manifest: Arc::new(manifest),
277            stopped: false,
278        }))
279    }
280
281    /// Stops the manager.
282    pub async fn stop(&mut self) {
283        self.stopped = true;
284    }
285
286    /// Installs the manifest changes from the current version to the target version (inclusive).
287    ///
288    /// Returns installed version.
289    /// **Note**: This function is not guaranteed to install the target version strictly.
290    /// The installed version may be greater than the target version.
291    pub async fn install_manifest_to(
292        &mut self,
293        target_version: ManifestVersion,
294    ) -> Result<ManifestVersion> {
295        let _t = MANIFEST_OP_ELAPSED
296            .with_label_values(&["install_manifest_to"])
297            .start_timer();
298
299        let last_version = self.last_version();
300        // Case 1: If the target version is less than the current version, return the current version.
301        if last_version >= target_version {
302            debug!(
303                "Target version {} is less than or equal to the current version {}, region: {}, skip install",
304                target_version, last_version, self.manifest.metadata.region_id
305            );
306            return Ok(last_version);
307        }
308
309        ensure!(
310            !self.stopped,
311            RegionStoppedSnafu {
312                region_id: self.manifest.metadata.region_id,
313            }
314        );
315
316        let region_id = self.manifest.metadata.region_id;
317        // Fetches manifests from the last version strictly.
318        let mut manifests = self
319            .store
320            // Invariant: last_version < target_version.
321            .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
322            .await?;
323
324        // Case 2: No manifests in range: [current_version+1, target_version+1)
325        //
326        // |---------Has been deleted------------|     [Checkpoint Version]...[Latest Version]
327        //                                                                    [Leader region]
328        // [Current Version]......[Target Version]
329        // [Follower region]
330        if manifests.is_empty() {
331            info!(
332                "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
333                last_version, self.manifest.metadata.region_id
334            );
335            let last_version = self.install_last_checkpoint().await?;
336            // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
337            if last_version >= target_version {
338                return Ok(last_version);
339            }
340
341            // Fetches manifests from the installed version strictly.
342            manifests = self
343                .store
344                // Invariant: last_version < target_version.
345                .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
346                .await?;
347        }
348
349        if manifests.is_empty() {
350            return NoManifestsSnafu {
351                region_id: self.manifest.metadata.region_id,
352                start_version: last_version + 1,
353                end_version: target_version + 1,
354                last_version,
355            }
356            .fail();
357        }
358
359        debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
360        let mut manifest_builder =
361            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
362
363        for (manifest_version, raw_action_list) in manifests {
364            self.store
365                .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
366            let action_list = RegionMetaActionList::decode(&raw_action_list)?;
367            for action in action_list.actions {
368                match action {
369                    RegionMetaAction::Change(action) => {
370                        manifest_builder.apply_change(manifest_version, action);
371                    }
372                    RegionMetaAction::Edit(action) => {
373                        manifest_builder.apply_edit(manifest_version, action);
374                    }
375                    RegionMetaAction::Remove(_) => {
376                        debug!(
377                            "Unhandled action for region {}, action: {:?}",
378                            self.manifest.metadata.region_id, action
379                        );
380                    }
381                    RegionMetaAction::Truncate(action) => {
382                        manifest_builder.apply_truncate(manifest_version, action);
383                    }
384                }
385            }
386        }
387
388        let new_manifest = manifest_builder.try_build()?;
389        ensure!(
390            new_manifest.manifest_version >= target_version,
391            InstallManifestToSnafu {
392                region_id: self.manifest.metadata.region_id,
393                target_version,
394                available_version: new_manifest.manifest_version,
395                last_version,
396            }
397        );
398
399        let version = self.last_version();
400        self.manifest = Arc::new(new_manifest);
401        let last_version = self.set_version(self.manifest.manifest_version);
402        info!(
403            "Install manifest changes from {} to {}, region: {}",
404            version, last_version, self.manifest.metadata.region_id
405        );
406
407        Ok(last_version)
408    }
409
410    /// Installs the last checkpoint.
411    pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
412        let last_version = self.last_version();
413        let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
414        else {
415            return NoCheckpointSnafu {
416                region_id: self.manifest.metadata.region_id,
417                last_version,
418            }
419            .fail();
420        };
421        self.store.reset_manifest_size();
422        self.store
423            .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
424        let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
425        let manifest = builder.try_build()?;
426        let last_version = self.set_version(manifest.manifest_version);
427        self.manifest = Arc::new(manifest);
428        info!(
429            "Installed region manifest from checkpoint: {}, region: {}",
430            checkpoint.last_version, self.manifest.metadata.region_id
431        );
432
433        Ok(last_version)
434    }
435
436    /// Updates the manifest. Returns the current manifest version number.
437    pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
438        let _t = MANIFEST_OP_ELAPSED
439            .with_label_values(&["update"])
440            .start_timer();
441
442        ensure!(
443            !self.stopped,
444            RegionStoppedSnafu {
445                region_id: self.manifest.metadata.region_id,
446            }
447        );
448
449        let version = self.increase_version();
450        self.store.save(version, &action_list.encode()?).await?;
451
452        let mut manifest_builder =
453            RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
454        for action in action_list.actions {
455            match action {
456                RegionMetaAction::Change(action) => {
457                    manifest_builder.apply_change(version, action);
458                }
459                RegionMetaAction::Edit(action) => {
460                    manifest_builder.apply_edit(version, action);
461                }
462                RegionMetaAction::Remove(_) => {
463                    debug!(
464                        "Unhandled action for region {}, action: {:?}",
465                        self.manifest.metadata.region_id, action
466                    );
467                }
468                RegionMetaAction::Truncate(action) => {
469                    manifest_builder.apply_truncate(version, action);
470                }
471            }
472        }
473        let new_manifest = manifest_builder.try_build()?;
474        self.manifest = Arc::new(new_manifest);
475
476        self.checkpointer
477            .maybe_do_checkpoint(self.manifest.as_ref());
478
479        Ok(version)
480    }
481
482    /// Retrieves the current [RegionManifest].
483    pub fn manifest(&self) -> Arc<RegionManifest> {
484        self.manifest.clone()
485    }
486
487    /// Returns total manifest size.
488    pub fn manifest_usage(&self) -> u64 {
489        self.store.total_manifest_size()
490    }
491
492    /// Returns true if a newer version manifest file is found.
493    ///
494    /// It is typically used in read-only regions to catch up with manifest.
495    /// It doesn't lock the manifest directory in the object store so the result
496    /// may be inaccurate if there are concurrent writes.
497    pub async fn has_update(&self) -> Result<bool> {
498        let last_version = self.last_version();
499
500        let streamer =
501            self.store
502                .manifest_lister()
503                .await?
504                .context(error::EmptyManifestDirSnafu {
505                    manifest_dir: self.store.manifest_dir(),
506                })?;
507
508        let need_update = streamer
509            .try_any(|entry| async move {
510                let file_name = entry.name();
511                if is_delta_file(file_name) || is_checkpoint_file(file_name) {
512                    let version = file_version(file_name);
513                    if version > last_version {
514                        return true;
515                    }
516                }
517                false
518            })
519            .await
520            .context(error::OpenDalSnafu)?;
521
522        Ok(need_update)
523    }
524
525    /// Increases last version and returns the increased version.
526    fn increase_version(&mut self) -> ManifestVersion {
527        let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
528        previous + 1
529    }
530
531    /// Sets the last version.
532    fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
533        self.last_version.store(version, Ordering::Relaxed);
534        version
535    }
536
537    fn last_version(&self) -> ManifestVersion {
538        self.last_version.load(Ordering::Relaxed)
539    }
540
541    /// Fetches the last [RegionCheckpoint] from storage.
542    ///
543    /// If the checkpoint is not found, returns `None`.
544    /// Otherwise, returns the checkpoint and the size of the checkpoint.
545    pub(crate) async fn last_checkpoint(
546        store: &mut ManifestObjectStore,
547    ) -> Result<Option<(RegionCheckpoint, u64)>> {
548        let last_checkpoint = store.load_last_checkpoint().await?;
549
550        if let Some((_, bytes)) = last_checkpoint {
551            let checkpoint = RegionCheckpoint::decode(&bytes)?;
552            Ok(Some((checkpoint, bytes.len() as u64)))
553        } else {
554            Ok(None)
555        }
556    }
557
558    #[cfg(test)]
559    pub(crate) fn checkpointer(&self) -> &Checkpointer {
560        &self.checkpointer
561    }
562}
563
564#[cfg(test)]
565impl RegionManifestManager {
566    fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
567        let manifest = self.manifest();
568        assert_eq!(manifest.metadata, *expect);
569        assert_eq!(self.manifest.manifest_version, self.last_version());
570        assert_eq!(last_version, self.last_version());
571    }
572
573    pub fn store(&self) -> ManifestObjectStore {
574        self.store.clone()
575    }
576}
577
578#[cfg(test)]
579mod test {
580    use std::time::Duration;
581
582    use api::v1::SemanticType;
583    use common_datasource::compression::CompressionType;
584    use common_test_util::temp_dir::create_temp_dir;
585    use datatypes::prelude::ConcreteDataType;
586    use datatypes::schema::ColumnSchema;
587    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
588
589    use super::*;
590    use crate::manifest::action::{RegionChange, RegionEdit};
591    use crate::manifest::tests::utils::basic_region_metadata;
592    use crate::test_util::TestEnv;
593
594    #[tokio::test]
595    async fn create_manifest_manager() {
596        let metadata = Arc::new(basic_region_metadata());
597        let env = TestEnv::new();
598        let manager = env
599            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
600            .await
601            .unwrap()
602            .unwrap();
603
604        manager.validate_manifest(&metadata, 0);
605    }
606
607    #[tokio::test]
608    async fn open_manifest_manager() {
609        let env = TestEnv::new();
610        // Try to opens an empty manifest.
611        assert!(env
612            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
613            .await
614            .unwrap()
615            .is_none());
616
617        // Creates a manifest.
618        let metadata = Arc::new(basic_region_metadata());
619        let mut manager = env
620            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
621            .await
622            .unwrap()
623            .unwrap();
624        // Stops it.
625        manager.stop().await;
626
627        // Open it.
628        let manager = env
629            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
630            .await
631            .unwrap()
632            .unwrap();
633
634        manager.validate_manifest(&metadata, 0);
635    }
636
637    #[tokio::test]
638    async fn region_change_add_column() {
639        let metadata = Arc::new(basic_region_metadata());
640        let env = TestEnv::new();
641        let mut manager = env
642            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
643            .await
644            .unwrap()
645            .unwrap();
646
647        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
648        new_metadata_builder.push_column_metadata(ColumnMetadata {
649            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
650            semantic_type: SemanticType::Field,
651            column_id: 252,
652        });
653        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
654
655        let action_list =
656            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
657                metadata: new_metadata.clone(),
658            }));
659
660        let current_version = manager.update(action_list).await.unwrap();
661        assert_eq!(current_version, 1);
662        manager.validate_manifest(&new_metadata, 1);
663
664        // Reopen the manager.
665        manager.stop().await;
666        let manager = env
667            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
668            .await
669            .unwrap()
670            .unwrap();
671        manager.validate_manifest(&new_metadata, 1);
672    }
673
674    /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
675    async fn manifest_dir_usage(path: &str) -> u64 {
676        let mut size = 0;
677        let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
678        while let Ok(dir_entry) = read_dir.next_entry().await {
679            let Some(entry) = dir_entry else {
680                break;
681            };
682            if entry.file_type().await.unwrap().is_file() {
683                let file_name = entry.file_name().into_string().unwrap();
684                if file_name.contains(".checkpoint") || file_name.contains(".json") {
685                    let file_size = entry.metadata().await.unwrap().len() as usize;
686                    debug!("File: {file_name:?}, size: {file_size}");
687                    size += file_size;
688                }
689            }
690        }
691        size as u64
692    }
693
694    #[tokio::test]
695    async fn test_manifest_size() {
696        let metadata = Arc::new(basic_region_metadata());
697        let data_home = create_temp_dir("");
698        let data_home_path = data_home.path().to_str().unwrap().to_string();
699        let env = TestEnv::with_data_home(data_home);
700
701        let manifest_dir = format!("{}/manifest", data_home_path);
702
703        let mut manager = env
704            .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
705            .await
706            .unwrap()
707            .unwrap();
708
709        let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
710        new_metadata_builder.push_column_metadata(ColumnMetadata {
711            column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
712            semantic_type: SemanticType::Field,
713            column_id: 252,
714        });
715        let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
716
717        let action_list =
718            RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
719                metadata: new_metadata.clone(),
720            }));
721
722        let current_version = manager.update(action_list).await.unwrap();
723        assert_eq!(current_version, 1);
724        manager.validate_manifest(&new_metadata, 1);
725
726        // get manifest size
727        let manifest_size = manager.manifest_usage();
728        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
729
730        // update 10 times nop_action to trigger checkpoint
731        for _ in 0..10 {
732            manager
733                .update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
734                    RegionEdit {
735                        files_to_add: vec![],
736                        files_to_remove: vec![],
737                        compaction_time_window: None,
738                        flushed_entry_id: None,
739                        flushed_sequence: None,
740                    },
741                )]))
742                .await
743                .unwrap();
744        }
745
746        while manager.checkpointer.is_doing_checkpoint() {
747            tokio::time::sleep(Duration::from_millis(10)).await;
748        }
749
750        // check manifest size again
751        let manifest_size = manager.manifest_usage();
752        assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
753
754        // Reopen the manager,
755        // we just calculate the size from the latest checkpoint file
756        manager.stop().await;
757        let manager = env
758            .create_manifest_manager(CompressionType::Uncompressed, 10, None)
759            .await
760            .unwrap()
761            .unwrap();
762        manager.validate_manifest(&new_metadata, 11);
763
764        // get manifest size again
765        let manifest_size = manager.manifest_usage();
766        assert_eq!(manifest_size, 1204);
767    }
768}