1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::cache::manifest_cache::ManifestCache;
27use crate::config::MitoConfig;
28use crate::error::{
29 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33 RegionMetaAction, RegionMetaActionList, RemovedFile,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38 manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47 pub manifest_dir: String,
49 pub object_store: ObjectStore,
50 pub compress_type: CompressionType,
51 pub checkpoint_distance: u64,
54 pub remove_file_options: RemoveFileOptions,
55 pub manifest_cache: Option<ManifestCache>,
57}
58
59impl RegionManifestOptions {
60 pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
62 RegionManifestOptions {
63 manifest_dir: manifest_dir(region_dir),
64 object_store: object_store.clone(),
65 compress_type: manifest_compress_type(config.compress_manifest),
68 checkpoint_distance: config.manifest_checkpoint_distance,
69 remove_file_options: RemoveFileOptions {
70 enable_gc: config.gc.enable,
71 },
72 manifest_cache: None,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79#[cfg_attr(any(test, feature = "test"), derive(Default))]
80pub struct RemoveFileOptions {
81 pub enable_gc: bool,
83}
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
91#[derive(Debug)]
152pub struct RegionManifestManager {
153 store: ManifestObjectStore,
154 last_version: Arc<AtomicU64>,
155 checkpointer: Checkpointer,
156 manifest: Arc<RegionManifest>,
157 staging_manifest: Option<Arc<RegionManifest>>,
161 stats: ManifestStats,
162 stopped: bool,
163}
164
165impl RegionManifestManager {
166 pub async fn new(
168 metadata: RegionMetadataRef,
169 flushed_entry_id: u64,
170 options: RegionManifestOptions,
171 sst_format: FormatType,
172 stats: &ManifestStats,
173 ) -> Result<Self> {
174 let mut store = ManifestObjectStore::new(
176 &options.manifest_dir,
177 options.object_store.clone(),
178 options.compress_type,
179 stats.total_manifest_size.clone(),
180 options.manifest_cache.clone(),
181 );
182 let manifest_version = stats.manifest_version.clone();
183
184 info!(
185 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
186 options.manifest_dir, metadata, flushed_entry_id
187 );
188
189 let version = MIN_VERSION;
190 let mut manifest_builder = RegionManifestBuilder::default();
191 manifest_builder.apply_change(
193 version,
194 RegionChange {
195 metadata: metadata.clone(),
196 sst_format,
197 append_mode: None,
198 },
199 );
200 let manifest = manifest_builder.try_build()?;
201 let region_id = metadata.region_id;
202
203 debug!(
204 "Build region manifest in {}, manifest: {:?}",
205 options.manifest_dir, manifest
206 );
207
208 let mut actions = vec![RegionMetaAction::Change(RegionChange {
209 metadata,
210 sst_format,
211 append_mode: None,
212 })];
213 if flushed_entry_id > 0 {
214 actions.push(RegionMetaAction::Edit(RegionEdit {
215 files_to_add: vec![],
216 files_to_remove: vec![],
217 timestamp_ms: None,
218 compaction_time_window: None,
219 flushed_entry_id: Some(flushed_entry_id),
220 flushed_sequence: None,
221 committed_sequence: None,
222 }));
223 }
224
225 let action_list = RegionMetaActionList::new(actions);
227
228 store.save(version, &action_list.encode()?, false).await?;
231
232 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
233 manifest_version.store(version, Ordering::Relaxed);
234 manifest
235 .removed_files
236 .update_file_removed_cnt_to_stats(stats);
237 Ok(Self {
238 store,
239 last_version: manifest_version,
240 checkpointer,
241 manifest: Arc::new(manifest),
242 staging_manifest: None,
243 stats: stats.clone(),
244 stopped: false,
245 })
246 }
247
248 pub async fn open(
252 options: RegionManifestOptions,
253 stats: &ManifestStats,
254 ) -> Result<Option<Self>> {
255 let _t = MANIFEST_OP_ELAPSED
256 .with_label_values(&["open"])
257 .start_timer();
258
259 let mut store = ManifestObjectStore::new(
261 &options.manifest_dir,
262 options.object_store.clone(),
263 options.compress_type,
264 stats.total_manifest_size.clone(),
265 options.manifest_cache.clone(),
266 );
267 let manifest_version = stats.manifest_version.clone();
268
269 let mut version = MIN_VERSION;
273 let checkpoint = Self::last_checkpoint(&mut store).await?;
274 let last_checkpoint_version = checkpoint
275 .as_ref()
276 .map(|(checkpoint, _)| checkpoint.last_version)
277 .unwrap_or(MIN_VERSION);
278 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
279 info!(
280 "Recover region manifest {} from checkpoint version {}",
281 options.manifest_dir, checkpoint.last_version
282 );
283 version = version.max(checkpoint.last_version + 1);
284 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
285 } else {
286 info!(
287 "Checkpoint not found in {}, build manifest from scratch",
288 options.manifest_dir
289 );
290 RegionManifestBuilder::default()
291 };
292
293 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
295
296 for (manifest_version, raw_action_list) in manifests {
297 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
298 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
300 for action in action_list.actions {
301 match action {
302 RegionMetaAction::Change(action) => {
303 manifest_builder.apply_change(manifest_version, action);
304 }
305 RegionMetaAction::PartitionExprChange(action) => {
306 manifest_builder.apply_partition_expr_change(manifest_version, action);
307 }
308 RegionMetaAction::Edit(action) => {
309 manifest_builder.apply_edit(manifest_version, action);
310 }
311 RegionMetaAction::Remove(_) => {
312 debug!(
313 "Unhandled action in {}, action: {:?}",
314 options.manifest_dir, action
315 );
316 }
317 RegionMetaAction::Truncate(action) => {
318 manifest_builder.apply_truncate(manifest_version, action);
319 }
320 }
321 }
322 }
323
324 if !manifest_builder.contains_metadata() {
326 debug!("No region manifest in {}", options.manifest_dir);
327 return Ok(None);
328 }
329
330 let manifest = manifest_builder.try_build()?;
331 debug!(
332 "Recovered region manifest from {}, manifest: {:?}",
333 options.manifest_dir, manifest
334 );
335 let version = manifest.manifest_version;
336
337 let checkpointer = Checkpointer::new(
338 manifest.metadata.region_id,
339 options,
340 store.clone(),
341 last_checkpoint_version,
342 );
343 manifest_version.store(version, Ordering::Relaxed);
344 manifest
345 .removed_files
346 .update_file_removed_cnt_to_stats(stats);
347 Ok(Some(Self {
348 store,
349 last_version: manifest_version,
350 checkpointer,
351 manifest: Arc::new(manifest),
352 staging_manifest: None,
354 stats: stats.clone(),
355 stopped: false,
356 }))
357 }
358
359 pub async fn stop(&mut self) {
361 self.stopped = true;
362 }
363
364 pub async fn install_manifest_to(
370 &mut self,
371 target_version: ManifestVersion,
372 ) -> Result<ManifestVersion> {
373 let _t = MANIFEST_OP_ELAPSED
374 .with_label_values(&["install_manifest_to"])
375 .start_timer();
376
377 let last_version = self.last_version();
378 if last_version >= target_version {
380 debug!(
381 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
382 target_version, last_version, self.manifest.metadata.region_id
383 );
384 return Ok(last_version);
385 }
386
387 ensure!(
388 !self.stopped,
389 RegionStoppedSnafu {
390 region_id: self.manifest.metadata.region_id,
391 }
392 );
393
394 let region_id = self.manifest.metadata.region_id;
395 let mut manifests = self
397 .store
398 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
400 .await?;
401
402 if manifests.is_empty() {
409 info!(
410 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
411 last_version, self.manifest.metadata.region_id
412 );
413 let last_version = self.install_last_checkpoint().await?;
414 if last_version >= target_version {
416 return Ok(last_version);
417 }
418
419 manifests = self
421 .store
422 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
424 .await?;
425 }
426
427 if manifests.is_empty() {
428 return NoManifestsSnafu {
429 region_id: self.manifest.metadata.region_id,
430 start_version: last_version + 1,
431 end_version: target_version + 1,
432 last_version,
433 }
434 .fail();
435 }
436
437 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
438 let mut manifest_builder =
439 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
440
441 for (manifest_version, raw_action_list) in manifests {
442 self.store
443 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
444 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
445 for action in action_list.actions {
446 match action {
447 RegionMetaAction::Change(action) => {
448 manifest_builder.apply_change(manifest_version, action);
449 }
450 RegionMetaAction::PartitionExprChange(action) => {
451 manifest_builder.apply_partition_expr_change(manifest_version, action);
452 }
453 RegionMetaAction::Edit(action) => {
454 manifest_builder.apply_edit(manifest_version, action);
455 }
456 RegionMetaAction::Remove(_) => {
457 debug!(
458 "Unhandled action for region {}, action: {:?}",
459 self.manifest.metadata.region_id, action
460 );
461 }
462 RegionMetaAction::Truncate(action) => {
463 manifest_builder.apply_truncate(manifest_version, action);
464 }
465 }
466 }
467 }
468
469 let new_manifest = manifest_builder.try_build()?;
470 ensure!(
471 new_manifest.manifest_version >= target_version,
472 InstallManifestToSnafu {
473 region_id: self.manifest.metadata.region_id,
474 target_version,
475 available_version: new_manifest.manifest_version,
476 last_version,
477 }
478 );
479
480 let version = self.last_version();
481 new_manifest
482 .removed_files
483 .update_file_removed_cnt_to_stats(&self.stats);
484 self.manifest = Arc::new(new_manifest);
485 let last_version = self.set_version(self.manifest.manifest_version);
486 info!(
487 "Install manifest changes from {} to {}, region: {}",
488 version, last_version, self.manifest.metadata.region_id
489 );
490
491 Ok(last_version)
492 }
493
494 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
496 let last_version = self.last_version();
497 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
498 else {
499 return NoCheckpointSnafu {
500 region_id: self.manifest.metadata.region_id,
501 last_version,
502 }
503 .fail();
504 };
505 self.store.reset_manifest_size();
506 self.store
507 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
508 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
509 let manifest = builder.try_build()?;
510 let last_version = self.set_version(manifest.manifest_version);
511 manifest
512 .removed_files
513 .update_file_removed_cnt_to_stats(&self.stats);
514 self.manifest = Arc::new(manifest);
515 info!(
516 "Installed region manifest from checkpoint: {}, region: {}",
517 checkpoint.last_version, self.manifest.metadata.region_id
518 );
519
520 Ok(last_version)
521 }
522
523 pub async fn update(
525 &mut self,
526 action_list: RegionMetaActionList,
527 is_staging: bool,
528 ) -> Result<ManifestVersion> {
529 let _t = MANIFEST_OP_ELAPSED
530 .with_label_values(&["update"])
531 .start_timer();
532
533 ensure!(
534 !self.stopped,
535 RegionStoppedSnafu {
536 region_id: self.manifest.metadata.region_id,
537 }
538 );
539
540 let version = self.increase_version();
541 self.store
542 .save(version, &action_list.encode()?, is_staging)
543 .await?;
544
545 let mut manifest_builder =
548 if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
549 RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
550 } else {
551 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
552 };
553
554 for action in action_list.actions {
555 match action {
556 RegionMetaAction::Change(action) => {
557 manifest_builder.apply_change(version, action);
558 }
559 RegionMetaAction::PartitionExprChange(action) => {
560 manifest_builder.apply_partition_expr_change(version, action);
561 }
562 RegionMetaAction::Edit(action) => {
563 manifest_builder.apply_edit(version, action);
564 }
565 RegionMetaAction::Remove(_) => {
566 debug!(
567 "Unhandled action for region {}, action: {:?}",
568 self.manifest.metadata.region_id, action
569 );
570 }
571 RegionMetaAction::Truncate(action) => {
572 manifest_builder.apply_truncate(version, action);
573 }
574 }
575 }
576
577 if is_staging {
578 let new_manifest = manifest_builder.try_build()?;
579 self.staging_manifest = Some(Arc::new(new_manifest));
580
581 info!(
582 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
583 self.manifest.metadata.region_id, self.manifest.manifest_version
584 );
585 } else {
586 let new_manifest = manifest_builder.try_build()?;
587 new_manifest
588 .removed_files
589 .update_file_removed_cnt_to_stats(&self.stats);
590 let updated_manifest = self
591 .checkpointer
592 .update_manifest_removed_files(new_manifest)?;
593 self.manifest = Arc::new(updated_manifest);
594 self.checkpointer
595 .maybe_do_checkpoint(self.manifest.as_ref());
596 }
597
598 Ok(version)
599 }
600
601 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
603 let mut manifest = (*self.manifest()).clone();
604 manifest.removed_files.clear_deleted_files(deleted_files);
605 self.set_manifest(Arc::new(manifest));
606 }
607
608 pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
609 self.manifest = manifest;
610 }
611
612 pub fn manifest(&self) -> Arc<RegionManifest> {
614 self.manifest.clone()
615 }
616
617 pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
619 self.staging_manifest.clone()
620 }
621
622 pub fn manifest_usage(&self) -> u64 {
624 self.store.total_manifest_size()
625 }
626
627 pub async fn has_update(&self) -> Result<bool> {
633 let last_version = self.last_version();
634
635 let streamer =
636 self.store
637 .manifest_lister(false)
638 .await?
639 .context(error::EmptyManifestDirSnafu {
640 manifest_dir: self.store.manifest_dir(),
641 })?;
642
643 let need_update = streamer
644 .try_any(|entry| async move {
645 let file_name = entry.name();
646 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
647 let version = file_version(file_name);
648 if version > last_version {
649 return true;
650 }
651 }
652 false
653 })
654 .await
655 .context(error::OpenDalSnafu)?;
656
657 Ok(need_update)
658 }
659
660 fn increase_version(&mut self) -> ManifestVersion {
662 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
663 previous + 1
664 }
665
666 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
668 self.last_version.store(version, Ordering::Relaxed);
669 version
670 }
671
672 fn last_version(&self) -> ManifestVersion {
673 self.last_version.load(Ordering::Relaxed)
674 }
675
676 pub(crate) async fn last_checkpoint(
681 store: &mut ManifestObjectStore,
682 ) -> Result<Option<(RegionCheckpoint, u64)>> {
683 let last_checkpoint = store.load_last_checkpoint().await?;
684
685 if let Some((_, bytes)) = last_checkpoint {
686 let checkpoint = RegionCheckpoint::decode(&bytes)?;
687 Ok(Some((checkpoint, bytes.len() as u64)))
688 } else {
689 Ok(None)
690 }
691 }
692
693 pub fn store(&self) -> ManifestObjectStore {
694 self.store.clone()
695 }
696
697 #[cfg(test)]
698 pub(crate) fn checkpointer(&self) -> &Checkpointer {
699 &self.checkpointer
700 }
701
702 pub(crate) async fn merge_staged_actions(
705 &mut self,
706 region_state: RegionRoleState,
707 ) -> Result<Option<RegionMetaActionList>> {
708 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
710 return Ok(None);
711 }
712
713 let staging_manifests = self.store.fetch_staging_manifests().await?;
715
716 if staging_manifests.is_empty() {
717 info!(
718 "No staging manifests to merge for region {}",
719 self.manifest.metadata.region_id
720 );
721 return Ok(None);
722 }
723
724 info!(
725 "Merging {} staging manifests for region {}",
726 staging_manifests.len(),
727 self.manifest.metadata.region_id
728 );
729
730 let mut merged_actions = Vec::new();
732 let mut latest_version = self.last_version();
733
734 for (manifest_version, raw_action_list) in staging_manifests {
736 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
737
738 for action in action_list.actions {
739 merged_actions.push(action);
740 }
741
742 latest_version = latest_version.max(manifest_version);
743 }
744
745 if merged_actions.is_empty() {
746 return Ok(None);
747 }
748
749 info!(
750 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
751 merged_actions.len(),
752 self.manifest.metadata.region_id,
753 latest_version
754 );
755
756 Ok(Some(RegionMetaActionList::new(merged_actions)))
757 }
758
759 pub(crate) fn unset_staging_manifest(&mut self) {
761 self.staging_manifest = None;
762 }
763
764 pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
766 self.staging_manifest = None;
767 self.store.clear_staging_manifests().await?;
768 info!(
769 "Cleared all staging manifests for region {}",
770 self.manifest.metadata.region_id
771 );
772 Ok(())
773 }
774}
775
776#[cfg(test)]
777impl RegionManifestManager {
778 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
779 let manifest = self.manifest();
780 assert_eq!(manifest.metadata, *expect);
781 assert_eq!(self.manifest.manifest_version, self.last_version());
782 assert_eq!(last_version, self.last_version());
783 }
784}
785
786#[cfg(test)]
787mod test {
788 use std::time::Duration;
789
790 use api::v1::SemanticType;
791 use common_datasource::compression::CompressionType;
792 use common_test_util::temp_dir::create_temp_dir;
793 use datatypes::prelude::ConcreteDataType;
794 use datatypes::schema::ColumnSchema;
795 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
796
797 use super::*;
798 use crate::manifest::action::{RegionChange, RegionEdit};
799 use crate::manifest::tests::utils::basic_region_metadata;
800 use crate::test_util::TestEnv;
801
802 #[tokio::test]
803 async fn create_manifest_manager() {
804 let metadata = Arc::new(basic_region_metadata());
805 let env = TestEnv::new().await;
806 let manager = env
807 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
808 .await
809 .unwrap()
810 .unwrap();
811
812 manager.validate_manifest(&metadata, 0);
813 }
814
815 #[tokio::test]
816 async fn open_manifest_manager() {
817 let env = TestEnv::new().await;
818 assert!(
820 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
821 .await
822 .unwrap()
823 .is_none()
824 );
825
826 let metadata = Arc::new(basic_region_metadata());
828 let mut manager = env
829 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
830 .await
831 .unwrap()
832 .unwrap();
833 manager.stop().await;
835
836 let manager = env
838 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
839 .await
840 .unwrap()
841 .unwrap();
842
843 manager.validate_manifest(&metadata, 0);
844 }
845
846 #[tokio::test]
847 async fn manifest_with_partition_expr_roundtrip() {
848 let env = TestEnv::new().await;
849 let expr_json =
850 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
851 let mut metadata = basic_region_metadata();
852 metadata.set_partition_expr(Some(expr_json.to_string()));
853 let metadata = Arc::new(metadata);
854 let mut manager = env
855 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
856 .await
857 .unwrap()
858 .unwrap();
859
860 let manifest = manager.manifest();
862 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
863
864 manager.stop().await;
865
866 let manager = env
868 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
869 .await
870 .unwrap()
871 .unwrap();
872 let manifest = manager.manifest();
873 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
874 }
875
876 #[tokio::test]
877 async fn region_change_add_column() {
878 let metadata = Arc::new(basic_region_metadata());
879 let env = TestEnv::new().await;
880 let mut manager = env
881 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
882 .await
883 .unwrap()
884 .unwrap();
885
886 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
887 new_metadata_builder.push_column_metadata(ColumnMetadata {
888 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
889 semantic_type: SemanticType::Field,
890 column_id: 252,
891 });
892 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
893
894 let action_list =
895 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
896 metadata: new_metadata.clone(),
897 sst_format: FormatType::PrimaryKey,
898 append_mode: None,
899 }));
900
901 let current_version = manager.update(action_list, false).await.unwrap();
902 assert_eq!(current_version, 1);
903 manager.validate_manifest(&new_metadata, 1);
904
905 manager.stop().await;
907 let manager = env
908 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
909 .await
910 .unwrap()
911 .unwrap();
912 manager.validate_manifest(&new_metadata, 1);
913 }
914
915 async fn manifest_dir_usage(path: &str) -> u64 {
917 let mut size = 0;
918 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
919 while let Ok(dir_entry) = read_dir.next_entry().await {
920 let Some(entry) = dir_entry else {
921 break;
922 };
923 if entry.file_type().await.unwrap().is_file() {
924 let file_name = entry.file_name().into_string().unwrap();
925 if file_name.contains(".checkpoint") || file_name.contains(".json") {
926 let file_size = entry.metadata().await.unwrap().len() as usize;
927 debug!("File: {file_name:?}, size: {file_size}");
928 size += file_size;
929 }
930 }
931 }
932 size as u64
933 }
934
935 #[tokio::test]
936 async fn test_manifest_size() {
937 let metadata = Arc::new(basic_region_metadata());
938 let data_home = create_temp_dir("");
939 let data_home_path = data_home.path().to_str().unwrap().to_string();
940 let env = TestEnv::with_data_home(data_home).await;
941
942 let manifest_dir = format!("{}/manifest", data_home_path);
943
944 let mut manager = env
945 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
946 .await
947 .unwrap()
948 .unwrap();
949
950 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
951 new_metadata_builder.push_column_metadata(ColumnMetadata {
952 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
953 semantic_type: SemanticType::Field,
954 column_id: 252,
955 });
956 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
957
958 let action_list =
959 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
960 metadata: new_metadata.clone(),
961 sst_format: FormatType::PrimaryKey,
962 append_mode: None,
963 }));
964
965 let current_version = manager.update(action_list, false).await.unwrap();
966 assert_eq!(current_version, 1);
967 manager.validate_manifest(&new_metadata, 1);
968
969 let manifest_size = manager.manifest_usage();
971 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
972
973 for _ in 0..10 {
975 manager
976 .update(
977 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
978 files_to_add: vec![],
979 files_to_remove: vec![],
980 timestamp_ms: None,
981 compaction_time_window: None,
982 flushed_entry_id: None,
983 flushed_sequence: None,
984 committed_sequence: None,
985 })]),
986 false,
987 )
988 .await
989 .unwrap();
990 }
991
992 while manager.checkpointer.is_doing_checkpoint() {
993 tokio::time::sleep(Duration::from_millis(10)).await;
994 }
995
996 let manifest_size = manager.manifest_usage();
998 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
999
1000 manager.stop().await;
1003 let manager = env
1004 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
1005 .await
1006 .unwrap()
1007 .unwrap();
1008 manager.validate_manifest(&new_metadata, 11);
1009
1010 let manifest_size = manager.manifest_usage();
1012 assert_eq!(manifest_size, 1397);
1013 }
1014}