1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::error::{
27 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31 RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39use crate::sst::FormatType;
40
41#[derive(Debug, Clone)]
43pub struct RegionManifestOptions {
44 pub manifest_dir: String,
46 pub object_store: ObjectStore,
47 pub compress_type: CompressionType,
48 pub checkpoint_distance: u64,
51 pub remove_file_options: RemoveFileOptions,
52}
53
54#[derive(Debug, Clone)]
56pub struct RemoveFileOptions {
57 pub keep_count: usize,
60 pub keep_ttl: std::time::Duration,
63}
64
65#[cfg(any(test, feature = "test"))]
66impl Default for RemoveFileOptions {
67 fn default() -> Self {
68 Self {
69 keep_count: 256,
70 keep_ttl: std::time::Duration::from_secs(3600),
71 }
72 }
73}
74
75#[cfg_attr(doc, aquamarine::aquamarine)]
81#[derive(Debug)]
142pub struct RegionManifestManager {
143 store: ManifestObjectStore,
144 last_version: Arc<AtomicU64>,
145 checkpointer: Checkpointer,
146 manifest: Arc<RegionManifest>,
147 stopped: bool,
148}
149
150impl RegionManifestManager {
151 pub async fn new(
153 metadata: RegionMetadataRef,
154 flushed_entry_id: u64,
155 options: RegionManifestOptions,
156 total_manifest_size: Arc<AtomicU64>,
157 manifest_version: Arc<AtomicU64>,
158 sst_format: FormatType,
159 ) -> Result<Self> {
160 let mut store = ManifestObjectStore::new(
162 &options.manifest_dir,
163 options.object_store.clone(),
164 options.compress_type,
165 total_manifest_size,
166 );
167
168 info!(
169 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
170 options.manifest_dir, metadata, flushed_entry_id
171 );
172
173 let version = MIN_VERSION;
174 let mut manifest_builder = RegionManifestBuilder::default();
175 manifest_builder.apply_change(
177 version,
178 RegionChange {
179 metadata: metadata.clone(),
180 sst_format,
181 },
182 );
183 let manifest = manifest_builder.try_build()?;
184 let region_id = metadata.region_id;
185
186 debug!(
187 "Build region manifest in {}, manifest: {:?}",
188 options.manifest_dir, manifest
189 );
190
191 let mut actions = vec![RegionMetaAction::Change(RegionChange {
192 metadata,
193 sst_format,
194 })];
195 if flushed_entry_id > 0 {
196 actions.push(RegionMetaAction::Edit(RegionEdit {
197 files_to_add: vec![],
198 files_to_remove: vec![],
199 timestamp_ms: None,
200 compaction_time_window: None,
201 flushed_entry_id: Some(flushed_entry_id),
202 flushed_sequence: None,
203 committed_sequence: None,
204 }));
205 }
206
207 let action_list = RegionMetaActionList::new(actions);
209
210 store.save(version, &action_list.encode()?, false).await?;
213
214 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
215 manifest_version.store(version, Ordering::Relaxed);
216 Ok(Self {
217 store,
218 last_version: manifest_version,
219 checkpointer,
220 manifest: Arc::new(manifest),
221 stopped: false,
222 })
223 }
224
225 pub async fn open(
229 options: RegionManifestOptions,
230 total_manifest_size: Arc<AtomicU64>,
231 manifest_version: Arc<AtomicU64>,
232 ) -> Result<Option<Self>> {
233 let _t = MANIFEST_OP_ELAPSED
234 .with_label_values(&["open"])
235 .start_timer();
236
237 let mut store = ManifestObjectStore::new(
239 &options.manifest_dir,
240 options.object_store.clone(),
241 options.compress_type,
242 total_manifest_size,
243 );
244
245 let mut version = MIN_VERSION;
249 let checkpoint = Self::last_checkpoint(&mut store).await?;
250 let last_checkpoint_version = checkpoint
251 .as_ref()
252 .map(|(checkpoint, _)| checkpoint.last_version)
253 .unwrap_or(MIN_VERSION);
254 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
255 info!(
256 "Recover region manifest {} from checkpoint version {}",
257 options.manifest_dir, checkpoint.last_version
258 );
259 version = version.max(checkpoint.last_version + 1);
260 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
261 } else {
262 info!(
263 "Checkpoint not found in {}, build manifest from scratch",
264 options.manifest_dir
265 );
266 RegionManifestBuilder::default()
267 };
268
269 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
271
272 for (manifest_version, raw_action_list) in manifests {
273 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
274 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
276 for action in action_list.actions {
277 match action {
278 RegionMetaAction::Change(action) => {
279 manifest_builder.apply_change(manifest_version, action);
280 }
281 RegionMetaAction::Edit(action) => {
282 manifest_builder.apply_edit(manifest_version, action);
283 }
284 RegionMetaAction::Remove(_) => {
285 debug!(
286 "Unhandled action in {}, action: {:?}",
287 options.manifest_dir, action
288 );
289 }
290 RegionMetaAction::Truncate(action) => {
291 manifest_builder.apply_truncate(manifest_version, action);
292 }
293 }
294 }
295 }
296
297 if !manifest_builder.contains_metadata() {
299 debug!("No region manifest in {}", options.manifest_dir);
300 return Ok(None);
301 }
302
303 let manifest = manifest_builder.try_build()?;
304 debug!(
305 "Recovered region manifest from {}, manifest: {:?}",
306 options.manifest_dir, manifest
307 );
308 let version = manifest.manifest_version;
309
310 let checkpointer = Checkpointer::new(
311 manifest.metadata.region_id,
312 options,
313 store.clone(),
314 last_checkpoint_version,
315 );
316 manifest_version.store(version, Ordering::Relaxed);
317 Ok(Some(Self {
318 store,
319 last_version: manifest_version,
320 checkpointer,
321 manifest: Arc::new(manifest),
322 stopped: false,
323 }))
324 }
325
326 pub async fn stop(&mut self) {
328 self.stopped = true;
329 }
330
331 pub async fn install_manifest_to(
337 &mut self,
338 target_version: ManifestVersion,
339 ) -> Result<ManifestVersion> {
340 let _t = MANIFEST_OP_ELAPSED
341 .with_label_values(&["install_manifest_to"])
342 .start_timer();
343
344 let last_version = self.last_version();
345 if last_version >= target_version {
347 debug!(
348 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
349 target_version, last_version, self.manifest.metadata.region_id
350 );
351 return Ok(last_version);
352 }
353
354 ensure!(
355 !self.stopped,
356 RegionStoppedSnafu {
357 region_id: self.manifest.metadata.region_id,
358 }
359 );
360
361 let region_id = self.manifest.metadata.region_id;
362 let mut manifests = self
364 .store
365 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
367 .await?;
368
369 if manifests.is_empty() {
376 info!(
377 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
378 last_version, self.manifest.metadata.region_id
379 );
380 let last_version = self.install_last_checkpoint().await?;
381 if last_version >= target_version {
383 return Ok(last_version);
384 }
385
386 manifests = self
388 .store
389 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
391 .await?;
392 }
393
394 if manifests.is_empty() {
395 return NoManifestsSnafu {
396 region_id: self.manifest.metadata.region_id,
397 start_version: last_version + 1,
398 end_version: target_version + 1,
399 last_version,
400 }
401 .fail();
402 }
403
404 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
405 let mut manifest_builder =
406 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
407
408 for (manifest_version, raw_action_list) in manifests {
409 self.store
410 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
411 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
412 for action in action_list.actions {
413 match action {
414 RegionMetaAction::Change(action) => {
415 manifest_builder.apply_change(manifest_version, action);
416 }
417 RegionMetaAction::Edit(action) => {
418 manifest_builder.apply_edit(manifest_version, action);
419 }
420 RegionMetaAction::Remove(_) => {
421 debug!(
422 "Unhandled action for region {}, action: {:?}",
423 self.manifest.metadata.region_id, action
424 );
425 }
426 RegionMetaAction::Truncate(action) => {
427 manifest_builder.apply_truncate(manifest_version, action);
428 }
429 }
430 }
431 }
432
433 let new_manifest = manifest_builder.try_build()?;
434 ensure!(
435 new_manifest.manifest_version >= target_version,
436 InstallManifestToSnafu {
437 region_id: self.manifest.metadata.region_id,
438 target_version,
439 available_version: new_manifest.manifest_version,
440 last_version,
441 }
442 );
443
444 let version = self.last_version();
445 self.manifest = Arc::new(new_manifest);
446 let last_version = self.set_version(self.manifest.manifest_version);
447 info!(
448 "Install manifest changes from {} to {}, region: {}",
449 version, last_version, self.manifest.metadata.region_id
450 );
451
452 Ok(last_version)
453 }
454
455 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
457 let last_version = self.last_version();
458 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
459 else {
460 return NoCheckpointSnafu {
461 region_id: self.manifest.metadata.region_id,
462 last_version,
463 }
464 .fail();
465 };
466 self.store.reset_manifest_size();
467 self.store
468 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
469 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
470 let manifest = builder.try_build()?;
471 let last_version = self.set_version(manifest.manifest_version);
472 self.manifest = Arc::new(manifest);
473 info!(
474 "Installed region manifest from checkpoint: {}, region: {}",
475 checkpoint.last_version, self.manifest.metadata.region_id
476 );
477
478 Ok(last_version)
479 }
480
481 pub async fn update(
483 &mut self,
484 action_list: RegionMetaActionList,
485 region_state: RegionRoleState,
486 ) -> Result<ManifestVersion> {
487 let _t = MANIFEST_OP_ELAPSED
488 .with_label_values(&["update"])
489 .start_timer();
490
491 ensure!(
492 !self.stopped,
493 RegionStoppedSnafu {
494 region_id: self.manifest.metadata.region_id,
495 }
496 );
497
498 let version = self.increase_version();
499 let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
500 self.store
501 .save(version, &action_list.encode()?, is_staging)
502 .await?;
503
504 let mut manifest_builder =
505 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
506 for action in action_list.actions {
507 match action {
508 RegionMetaAction::Change(action) => {
509 manifest_builder.apply_change(version, action);
510 }
511 RegionMetaAction::Edit(action) => {
512 manifest_builder.apply_edit(version, action);
513 }
514 RegionMetaAction::Remove(_) => {
515 debug!(
516 "Unhandled action for region {}, action: {:?}",
517 self.manifest.metadata.region_id, action
518 );
519 }
520 RegionMetaAction::Truncate(action) => {
521 manifest_builder.apply_truncate(version, action);
522 }
523 }
524 }
525 let new_manifest = manifest_builder.try_build()?;
526 let updated_manifest = self
527 .checkpointer
528 .update_manifest_removed_files(new_manifest)?;
529 self.manifest = Arc::new(updated_manifest);
530
531 self.checkpointer
532 .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
533
534 Ok(version)
535 }
536
537 pub fn manifest(&self) -> Arc<RegionManifest> {
539 self.manifest.clone()
540 }
541
542 pub fn manifest_usage(&self) -> u64 {
544 self.store.total_manifest_size()
545 }
546
547 pub async fn has_update(&self) -> Result<bool> {
553 let last_version = self.last_version();
554
555 let streamer =
556 self.store
557 .manifest_lister(false)
558 .await?
559 .context(error::EmptyManifestDirSnafu {
560 manifest_dir: self.store.manifest_dir(),
561 })?;
562
563 let need_update = streamer
564 .try_any(|entry| async move {
565 let file_name = entry.name();
566 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
567 let version = file_version(file_name);
568 if version > last_version {
569 return true;
570 }
571 }
572 false
573 })
574 .await
575 .context(error::OpenDalSnafu)?;
576
577 Ok(need_update)
578 }
579
580 fn increase_version(&mut self) -> ManifestVersion {
582 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
583 previous + 1
584 }
585
586 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
588 self.last_version.store(version, Ordering::Relaxed);
589 version
590 }
591
592 fn last_version(&self) -> ManifestVersion {
593 self.last_version.load(Ordering::Relaxed)
594 }
595
596 pub(crate) async fn last_checkpoint(
601 store: &mut ManifestObjectStore,
602 ) -> Result<Option<(RegionCheckpoint, u64)>> {
603 let last_checkpoint = store.load_last_checkpoint().await?;
604
605 if let Some((_, bytes)) = last_checkpoint {
606 let checkpoint = RegionCheckpoint::decode(&bytes)?;
607 Ok(Some((checkpoint, bytes.len() as u64)))
608 } else {
609 Ok(None)
610 }
611 }
612
613 pub fn store(&self) -> ManifestObjectStore {
614 self.store.clone()
615 }
616
617 #[cfg(test)]
618 pub(crate) fn checkpointer(&self) -> &Checkpointer {
619 &self.checkpointer
620 }
621
622 pub(crate) async fn merge_staged_actions(
625 &mut self,
626 region_state: RegionRoleState,
627 ) -> Result<Option<RegionMetaActionList>> {
628 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
630 return Ok(None);
631 }
632
633 let staging_manifests = self.store.fetch_staging_manifests().await?;
635
636 if staging_manifests.is_empty() {
637 info!(
638 "No staging manifests to merge for region {}",
639 self.manifest.metadata.region_id
640 );
641 return Ok(None);
642 }
643
644 info!(
645 "Merging {} staging manifests for region {}",
646 staging_manifests.len(),
647 self.manifest.metadata.region_id
648 );
649
650 let mut merged_actions = Vec::new();
652 let mut latest_version = self.last_version();
653
654 for (manifest_version, raw_action_list) in staging_manifests {
656 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
657
658 for action in action_list.actions {
659 merged_actions.push(action);
660 }
661
662 latest_version = latest_version.max(manifest_version);
663 }
664
665 if merged_actions.is_empty() {
666 return Ok(None);
667 }
668
669 info!(
670 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
671 merged_actions.len(),
672 self.manifest.metadata.region_id,
673 latest_version
674 );
675
676 Ok(Some(RegionMetaActionList::new(merged_actions)))
677 }
678}
679
680#[cfg(test)]
681impl RegionManifestManager {
682 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
683 let manifest = self.manifest();
684 assert_eq!(manifest.metadata, *expect);
685 assert_eq!(self.manifest.manifest_version, self.last_version());
686 assert_eq!(last_version, self.last_version());
687 }
688}
689
690#[cfg(test)]
691mod test {
692 use std::time::Duration;
693
694 use api::v1::SemanticType;
695 use common_datasource::compression::CompressionType;
696 use common_test_util::temp_dir::create_temp_dir;
697 use datatypes::prelude::ConcreteDataType;
698 use datatypes::schema::ColumnSchema;
699 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
700
701 use super::*;
702 use crate::manifest::action::{RegionChange, RegionEdit};
703 use crate::manifest::tests::utils::basic_region_metadata;
704 use crate::test_util::TestEnv;
705
706 #[tokio::test]
707 async fn create_manifest_manager() {
708 let metadata = Arc::new(basic_region_metadata());
709 let env = TestEnv::new().await;
710 let manager = env
711 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
712 .await
713 .unwrap()
714 .unwrap();
715
716 manager.validate_manifest(&metadata, 0);
717 }
718
719 #[tokio::test]
720 async fn open_manifest_manager() {
721 let env = TestEnv::new().await;
722 assert!(
724 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
725 .await
726 .unwrap()
727 .is_none()
728 );
729
730 let metadata = Arc::new(basic_region_metadata());
732 let mut manager = env
733 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
734 .await
735 .unwrap()
736 .unwrap();
737 manager.stop().await;
739
740 let manager = env
742 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
743 .await
744 .unwrap()
745 .unwrap();
746
747 manager.validate_manifest(&metadata, 0);
748 }
749
750 #[tokio::test]
751 async fn manifest_with_partition_expr_roundtrip() {
752 let env = TestEnv::new().await;
753 let expr_json =
754 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
755 let mut metadata = basic_region_metadata();
756 metadata.partition_expr = Some(expr_json.to_string());
757 let metadata = Arc::new(metadata);
758 let mut manager = env
759 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
760 .await
761 .unwrap()
762 .unwrap();
763
764 let manifest = manager.manifest();
766 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
767
768 manager.stop().await;
769
770 let manager = env
772 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
773 .await
774 .unwrap()
775 .unwrap();
776 let manifest = manager.manifest();
777 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
778 }
779
780 #[tokio::test]
781 async fn region_change_add_column() {
782 let metadata = Arc::new(basic_region_metadata());
783 let env = TestEnv::new().await;
784 let mut manager = env
785 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
786 .await
787 .unwrap()
788 .unwrap();
789
790 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
791 new_metadata_builder.push_column_metadata(ColumnMetadata {
792 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
793 semantic_type: SemanticType::Field,
794 column_id: 252,
795 });
796 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
797
798 let action_list =
799 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
800 metadata: new_metadata.clone(),
801 sst_format: FormatType::PrimaryKey,
802 }));
803
804 let current_version = manager
805 .update(
806 action_list,
807 RegionRoleState::Leader(RegionLeaderState::Writable),
808 )
809 .await
810 .unwrap();
811 assert_eq!(current_version, 1);
812 manager.validate_manifest(&new_metadata, 1);
813
814 manager.stop().await;
816 let manager = env
817 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
818 .await
819 .unwrap()
820 .unwrap();
821 manager.validate_manifest(&new_metadata, 1);
822 }
823
824 async fn manifest_dir_usage(path: &str) -> u64 {
826 let mut size = 0;
827 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
828 while let Ok(dir_entry) = read_dir.next_entry().await {
829 let Some(entry) = dir_entry else {
830 break;
831 };
832 if entry.file_type().await.unwrap().is_file() {
833 let file_name = entry.file_name().into_string().unwrap();
834 if file_name.contains(".checkpoint") || file_name.contains(".json") {
835 let file_size = entry.metadata().await.unwrap().len() as usize;
836 debug!("File: {file_name:?}, size: {file_size}");
837 size += file_size;
838 }
839 }
840 }
841 size as u64
842 }
843
844 #[tokio::test]
845 async fn test_manifest_size() {
846 let metadata = Arc::new(basic_region_metadata());
847 let data_home = create_temp_dir("");
848 let data_home_path = data_home.path().to_str().unwrap().to_string();
849 let env = TestEnv::with_data_home(data_home).await;
850
851 let manifest_dir = format!("{}/manifest", data_home_path);
852
853 let mut manager = env
854 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
855 .await
856 .unwrap()
857 .unwrap();
858
859 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
860 new_metadata_builder.push_column_metadata(ColumnMetadata {
861 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
862 semantic_type: SemanticType::Field,
863 column_id: 252,
864 });
865 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
866
867 let action_list =
868 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
869 metadata: new_metadata.clone(),
870 sst_format: FormatType::PrimaryKey,
871 }));
872
873 let current_version = manager
874 .update(
875 action_list,
876 RegionRoleState::Leader(RegionLeaderState::Writable),
877 )
878 .await
879 .unwrap();
880 assert_eq!(current_version, 1);
881 manager.validate_manifest(&new_metadata, 1);
882
883 let manifest_size = manager.manifest_usage();
885 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
886
887 for _ in 0..10 {
889 manager
890 .update(
891 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
892 files_to_add: vec![],
893 files_to_remove: vec![],
894 timestamp_ms: None,
895 compaction_time_window: None,
896 flushed_entry_id: None,
897 flushed_sequence: None,
898 committed_sequence: None,
899 })]),
900 RegionRoleState::Leader(RegionLeaderState::Writable),
901 )
902 .await
903 .unwrap();
904 }
905
906 while manager.checkpointer.is_doing_checkpoint() {
907 tokio::time::sleep(Duration::from_millis(10)).await;
908 }
909
910 let manifest_size = manager.manifest_usage();
912 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
913
914 manager.stop().await;
917 let manager = env
918 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
919 .await
920 .unwrap()
921 .unwrap();
922 manager.validate_manifest(&new_metadata, 11);
923
924 let manifest_size = manager.manifest_usage();
926 assert_eq!(manifest_size, 1748);
927 }
928}