1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::cache::manifest_cache::ManifestCache;
27use crate::config::MitoConfig;
28use crate::error::{
29 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33 RegionMetaAction, RegionMetaActionList, RemovedFile,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38 manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47 pub manifest_dir: String,
49 pub object_store: ObjectStore,
50 pub compress_type: CompressionType,
51 pub checkpoint_distance: u64,
54 pub remove_file_options: RemoveFileOptions,
55 pub manifest_cache: Option<ManifestCache>,
57}
58
59impl RegionManifestOptions {
60 pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
62 RegionManifestOptions {
63 manifest_dir: manifest_dir(region_dir),
64 object_store: object_store.clone(),
65 compress_type: manifest_compress_type(config.compress_manifest),
68 checkpoint_distance: config.manifest_checkpoint_distance,
69 remove_file_options: RemoveFileOptions {
70 enable_gc: config.gc.enable,
71 },
72 manifest_cache: None,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79#[cfg_attr(any(test, feature = "test"), derive(Default))]
80pub struct RemoveFileOptions {
81 pub enable_gc: bool,
83}
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
91#[derive(Debug)]
152pub struct RegionManifestManager {
153 store: ManifestObjectStore,
154 last_version: Arc<AtomicU64>,
155 checkpointer: Checkpointer,
156 manifest: Arc<RegionManifest>,
157 staging_manifest: Option<Arc<RegionManifest>>,
161 stats: ManifestStats,
162 stopped: bool,
163}
164
165impl RegionManifestManager {
166 pub async fn new(
168 metadata: RegionMetadataRef,
169 flushed_entry_id: u64,
170 options: RegionManifestOptions,
171 sst_format: FormatType,
172 stats: &ManifestStats,
173 ) -> Result<Self> {
174 let mut store = ManifestObjectStore::new(
176 &options.manifest_dir,
177 options.object_store.clone(),
178 options.compress_type,
179 stats.total_manifest_size.clone(),
180 options.manifest_cache.clone(),
181 );
182 let manifest_version = stats.manifest_version.clone();
183
184 info!(
185 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
186 options.manifest_dir, metadata, flushed_entry_id
187 );
188
189 let version = MIN_VERSION;
190 let mut manifest_builder = RegionManifestBuilder::default();
191 manifest_builder.apply_change(
193 version,
194 RegionChange {
195 metadata: metadata.clone(),
196 sst_format,
197 },
198 );
199 let manifest = manifest_builder.try_build()?;
200 let region_id = metadata.region_id;
201
202 debug!(
203 "Build region manifest in {}, manifest: {:?}",
204 options.manifest_dir, manifest
205 );
206
207 let mut actions = vec![RegionMetaAction::Change(RegionChange {
208 metadata,
209 sst_format,
210 })];
211 if flushed_entry_id > 0 {
212 actions.push(RegionMetaAction::Edit(RegionEdit {
213 files_to_add: vec![],
214 files_to_remove: vec![],
215 timestamp_ms: None,
216 compaction_time_window: None,
217 flushed_entry_id: Some(flushed_entry_id),
218 flushed_sequence: None,
219 committed_sequence: None,
220 }));
221 }
222
223 let action_list = RegionMetaActionList::new(actions);
225
226 store.save(version, &action_list.encode()?, false).await?;
229
230 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
231 manifest_version.store(version, Ordering::Relaxed);
232 manifest
233 .removed_files
234 .update_file_removed_cnt_to_stats(stats);
235 Ok(Self {
236 store,
237 last_version: manifest_version,
238 checkpointer,
239 manifest: Arc::new(manifest),
240 staging_manifest: None,
241 stats: stats.clone(),
242 stopped: false,
243 })
244 }
245
246 pub async fn open(
250 options: RegionManifestOptions,
251 stats: &ManifestStats,
252 ) -> Result<Option<Self>> {
253 let _t = MANIFEST_OP_ELAPSED
254 .with_label_values(&["open"])
255 .start_timer();
256
257 let mut store = ManifestObjectStore::new(
259 &options.manifest_dir,
260 options.object_store.clone(),
261 options.compress_type,
262 stats.total_manifest_size.clone(),
263 options.manifest_cache.clone(),
264 );
265 let manifest_version = stats.manifest_version.clone();
266
267 let mut version = MIN_VERSION;
271 let checkpoint = Self::last_checkpoint(&mut store).await?;
272 let last_checkpoint_version = checkpoint
273 .as_ref()
274 .map(|(checkpoint, _)| checkpoint.last_version)
275 .unwrap_or(MIN_VERSION);
276 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
277 info!(
278 "Recover region manifest {} from checkpoint version {}",
279 options.manifest_dir, checkpoint.last_version
280 );
281 version = version.max(checkpoint.last_version + 1);
282 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
283 } else {
284 info!(
285 "Checkpoint not found in {}, build manifest from scratch",
286 options.manifest_dir
287 );
288 RegionManifestBuilder::default()
289 };
290
291 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
293
294 for (manifest_version, raw_action_list) in manifests {
295 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
296 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
298 for action in action_list.actions {
299 match action {
300 RegionMetaAction::Change(action) => {
301 manifest_builder.apply_change(manifest_version, action);
302 }
303 RegionMetaAction::Edit(action) => {
304 manifest_builder.apply_edit(manifest_version, action);
305 }
306 RegionMetaAction::Remove(_) => {
307 debug!(
308 "Unhandled action in {}, action: {:?}",
309 options.manifest_dir, action
310 );
311 }
312 RegionMetaAction::Truncate(action) => {
313 manifest_builder.apply_truncate(manifest_version, action);
314 }
315 }
316 }
317 }
318
319 if !manifest_builder.contains_metadata() {
321 debug!("No region manifest in {}", options.manifest_dir);
322 return Ok(None);
323 }
324
325 let manifest = manifest_builder.try_build()?;
326 debug!(
327 "Recovered region manifest from {}, manifest: {:?}",
328 options.manifest_dir, manifest
329 );
330 let version = manifest.manifest_version;
331
332 let checkpointer = Checkpointer::new(
333 manifest.metadata.region_id,
334 options,
335 store.clone(),
336 last_checkpoint_version,
337 );
338 manifest_version.store(version, Ordering::Relaxed);
339 manifest
340 .removed_files
341 .update_file_removed_cnt_to_stats(stats);
342 Ok(Some(Self {
343 store,
344 last_version: manifest_version,
345 checkpointer,
346 manifest: Arc::new(manifest),
347 staging_manifest: None,
349 stats: stats.clone(),
350 stopped: false,
351 }))
352 }
353
354 pub async fn stop(&mut self) {
356 self.stopped = true;
357 }
358
359 pub async fn install_manifest_to(
365 &mut self,
366 target_version: ManifestVersion,
367 ) -> Result<ManifestVersion> {
368 let _t = MANIFEST_OP_ELAPSED
369 .with_label_values(&["install_manifest_to"])
370 .start_timer();
371
372 let last_version = self.last_version();
373 if last_version >= target_version {
375 debug!(
376 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
377 target_version, last_version, self.manifest.metadata.region_id
378 );
379 return Ok(last_version);
380 }
381
382 ensure!(
383 !self.stopped,
384 RegionStoppedSnafu {
385 region_id: self.manifest.metadata.region_id,
386 }
387 );
388
389 let region_id = self.manifest.metadata.region_id;
390 let mut manifests = self
392 .store
393 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
395 .await?;
396
397 if manifests.is_empty() {
404 info!(
405 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
406 last_version, self.manifest.metadata.region_id
407 );
408 let last_version = self.install_last_checkpoint().await?;
409 if last_version >= target_version {
411 return Ok(last_version);
412 }
413
414 manifests = self
416 .store
417 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
419 .await?;
420 }
421
422 if manifests.is_empty() {
423 return NoManifestsSnafu {
424 region_id: self.manifest.metadata.region_id,
425 start_version: last_version + 1,
426 end_version: target_version + 1,
427 last_version,
428 }
429 .fail();
430 }
431
432 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
433 let mut manifest_builder =
434 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
435
436 for (manifest_version, raw_action_list) in manifests {
437 self.store
438 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
439 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
440 for action in action_list.actions {
441 match action {
442 RegionMetaAction::Change(action) => {
443 manifest_builder.apply_change(manifest_version, action);
444 }
445 RegionMetaAction::Edit(action) => {
446 manifest_builder.apply_edit(manifest_version, action);
447 }
448 RegionMetaAction::Remove(_) => {
449 debug!(
450 "Unhandled action for region {}, action: {:?}",
451 self.manifest.metadata.region_id, action
452 );
453 }
454 RegionMetaAction::Truncate(action) => {
455 manifest_builder.apply_truncate(manifest_version, action);
456 }
457 }
458 }
459 }
460
461 let new_manifest = manifest_builder.try_build()?;
462 ensure!(
463 new_manifest.manifest_version >= target_version,
464 InstallManifestToSnafu {
465 region_id: self.manifest.metadata.region_id,
466 target_version,
467 available_version: new_manifest.manifest_version,
468 last_version,
469 }
470 );
471
472 let version = self.last_version();
473 new_manifest
474 .removed_files
475 .update_file_removed_cnt_to_stats(&self.stats);
476 self.manifest = Arc::new(new_manifest);
477 let last_version = self.set_version(self.manifest.manifest_version);
478 info!(
479 "Install manifest changes from {} to {}, region: {}",
480 version, last_version, self.manifest.metadata.region_id
481 );
482
483 Ok(last_version)
484 }
485
486 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
488 let last_version = self.last_version();
489 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
490 else {
491 return NoCheckpointSnafu {
492 region_id: self.manifest.metadata.region_id,
493 last_version,
494 }
495 .fail();
496 };
497 self.store.reset_manifest_size();
498 self.store
499 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
500 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
501 let manifest = builder.try_build()?;
502 let last_version = self.set_version(manifest.manifest_version);
503 manifest
504 .removed_files
505 .update_file_removed_cnt_to_stats(&self.stats);
506 self.manifest = Arc::new(manifest);
507 info!(
508 "Installed region manifest from checkpoint: {}, region: {}",
509 checkpoint.last_version, self.manifest.metadata.region_id
510 );
511
512 Ok(last_version)
513 }
514
515 pub async fn update(
517 &mut self,
518 action_list: RegionMetaActionList,
519 is_staging: bool,
520 ) -> Result<ManifestVersion> {
521 let _t = MANIFEST_OP_ELAPSED
522 .with_label_values(&["update"])
523 .start_timer();
524
525 ensure!(
526 !self.stopped,
527 RegionStoppedSnafu {
528 region_id: self.manifest.metadata.region_id,
529 }
530 );
531
532 let version = self.increase_version();
533 self.store
534 .save(version, &action_list.encode()?, is_staging)
535 .await?;
536
537 let mut manifest_builder =
540 if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
541 RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
542 } else {
543 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
544 };
545
546 for action in action_list.actions {
547 match action {
548 RegionMetaAction::Change(action) => {
549 manifest_builder.apply_change(version, action);
550 }
551 RegionMetaAction::Edit(action) => {
552 manifest_builder.apply_edit(version, action);
553 }
554 RegionMetaAction::Remove(_) => {
555 debug!(
556 "Unhandled action for region {}, action: {:?}",
557 self.manifest.metadata.region_id, action
558 );
559 }
560 RegionMetaAction::Truncate(action) => {
561 manifest_builder.apply_truncate(version, action);
562 }
563 }
564 }
565
566 if is_staging {
567 let new_manifest = manifest_builder.try_build()?;
568 self.staging_manifest = Some(Arc::new(new_manifest));
569
570 info!(
571 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
572 self.manifest.metadata.region_id, self.manifest.manifest_version
573 );
574 } else {
575 let new_manifest = manifest_builder.try_build()?;
576 new_manifest
577 .removed_files
578 .update_file_removed_cnt_to_stats(&self.stats);
579 let updated_manifest = self
580 .checkpointer
581 .update_manifest_removed_files(new_manifest)?;
582 self.manifest = Arc::new(updated_manifest);
583 self.checkpointer
584 .maybe_do_checkpoint(self.manifest.as_ref());
585 }
586
587 Ok(version)
588 }
589
590 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
592 let mut manifest = (*self.manifest()).clone();
593 manifest.removed_files.clear_deleted_files(deleted_files);
594 self.set_manifest(Arc::new(manifest));
595 }
596
597 pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
598 self.manifest = manifest;
599 }
600
601 pub fn manifest(&self) -> Arc<RegionManifest> {
603 self.manifest.clone()
604 }
605
606 pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
608 self.staging_manifest.clone()
609 }
610
611 pub fn manifest_usage(&self) -> u64 {
613 self.store.total_manifest_size()
614 }
615
616 pub async fn has_update(&self) -> Result<bool> {
622 let last_version = self.last_version();
623
624 let streamer =
625 self.store
626 .manifest_lister(false)
627 .await?
628 .context(error::EmptyManifestDirSnafu {
629 manifest_dir: self.store.manifest_dir(),
630 })?;
631
632 let need_update = streamer
633 .try_any(|entry| async move {
634 let file_name = entry.name();
635 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
636 let version = file_version(file_name);
637 if version > last_version {
638 return true;
639 }
640 }
641 false
642 })
643 .await
644 .context(error::OpenDalSnafu)?;
645
646 Ok(need_update)
647 }
648
649 fn increase_version(&mut self) -> ManifestVersion {
651 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
652 previous + 1
653 }
654
655 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
657 self.last_version.store(version, Ordering::Relaxed);
658 version
659 }
660
661 fn last_version(&self) -> ManifestVersion {
662 self.last_version.load(Ordering::Relaxed)
663 }
664
665 pub(crate) async fn last_checkpoint(
670 store: &mut ManifestObjectStore,
671 ) -> Result<Option<(RegionCheckpoint, u64)>> {
672 let last_checkpoint = store.load_last_checkpoint().await?;
673
674 if let Some((_, bytes)) = last_checkpoint {
675 let checkpoint = RegionCheckpoint::decode(&bytes)?;
676 Ok(Some((checkpoint, bytes.len() as u64)))
677 } else {
678 Ok(None)
679 }
680 }
681
682 pub fn store(&self) -> ManifestObjectStore {
683 self.store.clone()
684 }
685
686 #[cfg(test)]
687 pub(crate) fn checkpointer(&self) -> &Checkpointer {
688 &self.checkpointer
689 }
690
691 pub(crate) async fn merge_staged_actions(
694 &mut self,
695 region_state: RegionRoleState,
696 ) -> Result<Option<RegionMetaActionList>> {
697 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
699 return Ok(None);
700 }
701
702 let staging_manifests = self.store.fetch_staging_manifests().await?;
704
705 if staging_manifests.is_empty() {
706 info!(
707 "No staging manifests to merge for region {}",
708 self.manifest.metadata.region_id
709 );
710 return Ok(None);
711 }
712
713 info!(
714 "Merging {} staging manifests for region {}",
715 staging_manifests.len(),
716 self.manifest.metadata.region_id
717 );
718
719 let mut merged_actions = Vec::new();
721 let mut latest_version = self.last_version();
722
723 for (manifest_version, raw_action_list) in staging_manifests {
725 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
726
727 for action in action_list.actions {
728 merged_actions.push(action);
729 }
730
731 latest_version = latest_version.max(manifest_version);
732 }
733
734 if merged_actions.is_empty() {
735 return Ok(None);
736 }
737
738 info!(
739 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
740 merged_actions.len(),
741 self.manifest.metadata.region_id,
742 latest_version
743 );
744
745 Ok(Some(RegionMetaActionList::new(merged_actions)))
746 }
747
748 pub(crate) fn unset_staging_manifest(&mut self) {
750 self.staging_manifest = None;
751 }
752
753 pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
755 self.staging_manifest = None;
756 self.store.clear_staging_manifests().await?;
757 info!(
758 "Cleared all staging manifests for region {}",
759 self.manifest.metadata.region_id
760 );
761 Ok(())
762 }
763}
764
765#[cfg(test)]
766impl RegionManifestManager {
767 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
768 let manifest = self.manifest();
769 assert_eq!(manifest.metadata, *expect);
770 assert_eq!(self.manifest.manifest_version, self.last_version());
771 assert_eq!(last_version, self.last_version());
772 }
773}
774
775#[cfg(test)]
776mod test {
777 use std::time::Duration;
778
779 use api::v1::SemanticType;
780 use common_datasource::compression::CompressionType;
781 use common_test_util::temp_dir::create_temp_dir;
782 use datatypes::prelude::ConcreteDataType;
783 use datatypes::schema::ColumnSchema;
784 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
785
786 use super::*;
787 use crate::manifest::action::{RegionChange, RegionEdit};
788 use crate::manifest::tests::utils::basic_region_metadata;
789 use crate::test_util::TestEnv;
790
791 #[tokio::test]
792 async fn create_manifest_manager() {
793 let metadata = Arc::new(basic_region_metadata());
794 let env = TestEnv::new().await;
795 let manager = env
796 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
797 .await
798 .unwrap()
799 .unwrap();
800
801 manager.validate_manifest(&metadata, 0);
802 }
803
804 #[tokio::test]
805 async fn open_manifest_manager() {
806 let env = TestEnv::new().await;
807 assert!(
809 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
810 .await
811 .unwrap()
812 .is_none()
813 );
814
815 let metadata = Arc::new(basic_region_metadata());
817 let mut manager = env
818 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
819 .await
820 .unwrap()
821 .unwrap();
822 manager.stop().await;
824
825 let manager = env
827 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
828 .await
829 .unwrap()
830 .unwrap();
831
832 manager.validate_manifest(&metadata, 0);
833 }
834
835 #[tokio::test]
836 async fn manifest_with_partition_expr_roundtrip() {
837 let env = TestEnv::new().await;
838 let expr_json =
839 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
840 let mut metadata = basic_region_metadata();
841 metadata.partition_expr = Some(expr_json.to_string());
842 let metadata = Arc::new(metadata);
843 let mut manager = env
844 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
845 .await
846 .unwrap()
847 .unwrap();
848
849 let manifest = manager.manifest();
851 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
852
853 manager.stop().await;
854
855 let manager = env
857 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
858 .await
859 .unwrap()
860 .unwrap();
861 let manifest = manager.manifest();
862 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
863 }
864
865 #[tokio::test]
866 async fn region_change_add_column() {
867 let metadata = Arc::new(basic_region_metadata());
868 let env = TestEnv::new().await;
869 let mut manager = env
870 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
871 .await
872 .unwrap()
873 .unwrap();
874
875 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
876 new_metadata_builder.push_column_metadata(ColumnMetadata {
877 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
878 semantic_type: SemanticType::Field,
879 column_id: 252,
880 });
881 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
882
883 let action_list =
884 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
885 metadata: new_metadata.clone(),
886 sst_format: FormatType::PrimaryKey,
887 }));
888
889 let current_version = manager.update(action_list, false).await.unwrap();
890 assert_eq!(current_version, 1);
891 manager.validate_manifest(&new_metadata, 1);
892
893 manager.stop().await;
895 let manager = env
896 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
897 .await
898 .unwrap()
899 .unwrap();
900 manager.validate_manifest(&new_metadata, 1);
901 }
902
903 async fn manifest_dir_usage(path: &str) -> u64 {
905 let mut size = 0;
906 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
907 while let Ok(dir_entry) = read_dir.next_entry().await {
908 let Some(entry) = dir_entry else {
909 break;
910 };
911 if entry.file_type().await.unwrap().is_file() {
912 let file_name = entry.file_name().into_string().unwrap();
913 if file_name.contains(".checkpoint") || file_name.contains(".json") {
914 let file_size = entry.metadata().await.unwrap().len() as usize;
915 debug!("File: {file_name:?}, size: {file_size}");
916 size += file_size;
917 }
918 }
919 }
920 size as u64
921 }
922
923 #[tokio::test]
924 async fn test_manifest_size() {
925 let metadata = Arc::new(basic_region_metadata());
926 let data_home = create_temp_dir("");
927 let data_home_path = data_home.path().to_str().unwrap().to_string();
928 let env = TestEnv::with_data_home(data_home).await;
929
930 let manifest_dir = format!("{}/manifest", data_home_path);
931
932 let mut manager = env
933 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
934 .await
935 .unwrap()
936 .unwrap();
937
938 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
939 new_metadata_builder.push_column_metadata(ColumnMetadata {
940 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
941 semantic_type: SemanticType::Field,
942 column_id: 252,
943 });
944 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
945
946 let action_list =
947 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
948 metadata: new_metadata.clone(),
949 sst_format: FormatType::PrimaryKey,
950 }));
951
952 let current_version = manager.update(action_list, false).await.unwrap();
953 assert_eq!(current_version, 1);
954 manager.validate_manifest(&new_metadata, 1);
955
956 let manifest_size = manager.manifest_usage();
958 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
959
960 for _ in 0..10 {
962 manager
963 .update(
964 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
965 files_to_add: vec![],
966 files_to_remove: vec![],
967 timestamp_ms: None,
968 compaction_time_window: None,
969 flushed_entry_id: None,
970 flushed_sequence: None,
971 committed_sequence: None,
972 })]),
973 false,
974 )
975 .await
976 .unwrap();
977 }
978
979 while manager.checkpointer.is_doing_checkpoint() {
980 tokio::time::sleep(Duration::from_millis(10)).await;
981 }
982
983 let manifest_size = manager.manifest_usage();
985 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
986
987 manager.stop().await;
990 let manager = env
991 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
992 .await
993 .unwrap()
994 .unwrap();
995 manager.validate_manifest(&new_metadata, 11);
996
997 let manifest_size = manager.manifest_usage();
999 assert_eq!(manifest_size, 1378);
1000 }
1001}