1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::FileId;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::config::MitoConfig;
29use crate::error::{
30 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
31};
32use crate::manifest::action::{
33 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
34 RegionMetaAction, RegionMetaActionList,
35};
36use crate::manifest::checkpointer::Checkpointer;
37use crate::manifest::storage::{
38 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
39 manifest_dir,
40};
41use crate::metrics::MANIFEST_OP_ELAPSED;
42use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
43use crate::sst::FormatType;
44
45#[derive(Debug, Clone)]
47pub struct RegionManifestOptions {
48 pub manifest_dir: String,
50 pub object_store: ObjectStore,
51 pub compress_type: CompressionType,
52 pub checkpoint_distance: u64,
55 pub remove_file_options: RemoveFileOptions,
56 pub manifest_cache: Option<ManifestCache>,
58}
59
60impl RegionManifestOptions {
61 pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
63 RegionManifestOptions {
64 manifest_dir: manifest_dir(region_dir),
65 object_store: object_store.clone(),
66 compress_type: manifest_compress_type(config.compress_manifest),
69 checkpoint_distance: config.manifest_checkpoint_distance,
70 remove_file_options: RemoveFileOptions {
71 enable_gc: config.gc.enable,
72 },
73 manifest_cache: None,
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
80#[cfg_attr(any(test, feature = "test"), derive(Default))]
81pub struct RemoveFileOptions {
82 pub enable_gc: bool,
84}
85
86#[cfg_attr(doc, aquamarine::aquamarine)]
92#[derive(Debug)]
153pub struct RegionManifestManager {
154 store: ManifestObjectStore,
155 last_version: Arc<AtomicU64>,
156 checkpointer: Checkpointer,
157 manifest: Arc<RegionManifest>,
158 staging_manifest: Option<Arc<RegionManifest>>,
162 stats: ManifestStats,
163 stopped: bool,
164}
165
166impl RegionManifestManager {
167 pub async fn new(
169 metadata: RegionMetadataRef,
170 flushed_entry_id: u64,
171 options: RegionManifestOptions,
172 sst_format: FormatType,
173 stats: &ManifestStats,
174 ) -> Result<Self> {
175 let mut store = ManifestObjectStore::new(
177 &options.manifest_dir,
178 options.object_store.clone(),
179 options.compress_type,
180 stats.total_manifest_size.clone(),
181 options.manifest_cache.clone(),
182 );
183 let manifest_version = stats.manifest_version.clone();
184
185 info!(
186 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
187 options.manifest_dir, metadata, flushed_entry_id
188 );
189
190 let version = MIN_VERSION;
191 let mut manifest_builder = RegionManifestBuilder::default();
192 manifest_builder.apply_change(
194 version,
195 RegionChange {
196 metadata: metadata.clone(),
197 sst_format,
198 },
199 );
200 let manifest = manifest_builder.try_build()?;
201 let region_id = metadata.region_id;
202
203 debug!(
204 "Build region manifest in {}, manifest: {:?}",
205 options.manifest_dir, manifest
206 );
207
208 let mut actions = vec![RegionMetaAction::Change(RegionChange {
209 metadata,
210 sst_format,
211 })];
212 if flushed_entry_id > 0 {
213 actions.push(RegionMetaAction::Edit(RegionEdit {
214 files_to_add: vec![],
215 files_to_remove: vec![],
216 timestamp_ms: None,
217 compaction_time_window: None,
218 flushed_entry_id: Some(flushed_entry_id),
219 flushed_sequence: None,
220 committed_sequence: None,
221 }));
222 }
223
224 let action_list = RegionMetaActionList::new(actions);
226
227 store.save(version, &action_list.encode()?, false).await?;
230
231 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
232 manifest_version.store(version, Ordering::Relaxed);
233 manifest
234 .removed_files
235 .update_file_removed_cnt_to_stats(stats);
236 Ok(Self {
237 store,
238 last_version: manifest_version,
239 checkpointer,
240 manifest: Arc::new(manifest),
241 staging_manifest: None,
242 stats: stats.clone(),
243 stopped: false,
244 })
245 }
246
247 pub async fn open(
251 options: RegionManifestOptions,
252 stats: &ManifestStats,
253 ) -> Result<Option<Self>> {
254 let _t = MANIFEST_OP_ELAPSED
255 .with_label_values(&["open"])
256 .start_timer();
257
258 let mut store = ManifestObjectStore::new(
260 &options.manifest_dir,
261 options.object_store.clone(),
262 options.compress_type,
263 stats.total_manifest_size.clone(),
264 options.manifest_cache.clone(),
265 );
266 let manifest_version = stats.manifest_version.clone();
267
268 let mut version = MIN_VERSION;
272 let checkpoint = Self::last_checkpoint(&mut store).await?;
273 let last_checkpoint_version = checkpoint
274 .as_ref()
275 .map(|(checkpoint, _)| checkpoint.last_version)
276 .unwrap_or(MIN_VERSION);
277 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
278 info!(
279 "Recover region manifest {} from checkpoint version {}",
280 options.manifest_dir, checkpoint.last_version
281 );
282 version = version.max(checkpoint.last_version + 1);
283 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
284 } else {
285 info!(
286 "Checkpoint not found in {}, build manifest from scratch",
287 options.manifest_dir
288 );
289 RegionManifestBuilder::default()
290 };
291
292 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
294
295 for (manifest_version, raw_action_list) in manifests {
296 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
297 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
299 for action in action_list.actions {
300 match action {
301 RegionMetaAction::Change(action) => {
302 manifest_builder.apply_change(manifest_version, action);
303 }
304 RegionMetaAction::Edit(action) => {
305 manifest_builder.apply_edit(manifest_version, action);
306 }
307 RegionMetaAction::Remove(_) => {
308 debug!(
309 "Unhandled action in {}, action: {:?}",
310 options.manifest_dir, action
311 );
312 }
313 RegionMetaAction::Truncate(action) => {
314 manifest_builder.apply_truncate(manifest_version, action);
315 }
316 }
317 }
318 }
319
320 if !manifest_builder.contains_metadata() {
322 debug!("No region manifest in {}", options.manifest_dir);
323 return Ok(None);
324 }
325
326 let manifest = manifest_builder.try_build()?;
327 debug!(
328 "Recovered region manifest from {}, manifest: {:?}",
329 options.manifest_dir, manifest
330 );
331 let version = manifest.manifest_version;
332
333 let checkpointer = Checkpointer::new(
334 manifest.metadata.region_id,
335 options,
336 store.clone(),
337 last_checkpoint_version,
338 );
339 manifest_version.store(version, Ordering::Relaxed);
340 manifest
341 .removed_files
342 .update_file_removed_cnt_to_stats(stats);
343 Ok(Some(Self {
344 store,
345 last_version: manifest_version,
346 checkpointer,
347 manifest: Arc::new(manifest),
348 staging_manifest: None,
350 stats: stats.clone(),
351 stopped: false,
352 }))
353 }
354
355 pub async fn stop(&mut self) {
357 self.stopped = true;
358 }
359
360 pub async fn install_manifest_to(
366 &mut self,
367 target_version: ManifestVersion,
368 ) -> Result<ManifestVersion> {
369 let _t = MANIFEST_OP_ELAPSED
370 .with_label_values(&["install_manifest_to"])
371 .start_timer();
372
373 let last_version = self.last_version();
374 if last_version >= target_version {
376 debug!(
377 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
378 target_version, last_version, self.manifest.metadata.region_id
379 );
380 return Ok(last_version);
381 }
382
383 ensure!(
384 !self.stopped,
385 RegionStoppedSnafu {
386 region_id: self.manifest.metadata.region_id,
387 }
388 );
389
390 let region_id = self.manifest.metadata.region_id;
391 let mut manifests = self
393 .store
394 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
396 .await?;
397
398 if manifests.is_empty() {
405 info!(
406 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
407 last_version, self.manifest.metadata.region_id
408 );
409 let last_version = self.install_last_checkpoint().await?;
410 if last_version >= target_version {
412 return Ok(last_version);
413 }
414
415 manifests = self
417 .store
418 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
420 .await?;
421 }
422
423 if manifests.is_empty() {
424 return NoManifestsSnafu {
425 region_id: self.manifest.metadata.region_id,
426 start_version: last_version + 1,
427 end_version: target_version + 1,
428 last_version,
429 }
430 .fail();
431 }
432
433 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
434 let mut manifest_builder =
435 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
436
437 for (manifest_version, raw_action_list) in manifests {
438 self.store
439 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
440 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
441 for action in action_list.actions {
442 match action {
443 RegionMetaAction::Change(action) => {
444 manifest_builder.apply_change(manifest_version, action);
445 }
446 RegionMetaAction::Edit(action) => {
447 manifest_builder.apply_edit(manifest_version, action);
448 }
449 RegionMetaAction::Remove(_) => {
450 debug!(
451 "Unhandled action for region {}, action: {:?}",
452 self.manifest.metadata.region_id, action
453 );
454 }
455 RegionMetaAction::Truncate(action) => {
456 manifest_builder.apply_truncate(manifest_version, action);
457 }
458 }
459 }
460 }
461
462 let new_manifest = manifest_builder.try_build()?;
463 ensure!(
464 new_manifest.manifest_version >= target_version,
465 InstallManifestToSnafu {
466 region_id: self.manifest.metadata.region_id,
467 target_version,
468 available_version: new_manifest.manifest_version,
469 last_version,
470 }
471 );
472
473 let version = self.last_version();
474 new_manifest
475 .removed_files
476 .update_file_removed_cnt_to_stats(&self.stats);
477 self.manifest = Arc::new(new_manifest);
478 let last_version = self.set_version(self.manifest.manifest_version);
479 info!(
480 "Install manifest changes from {} to {}, region: {}",
481 version, last_version, self.manifest.metadata.region_id
482 );
483
484 Ok(last_version)
485 }
486
487 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
489 let last_version = self.last_version();
490 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
491 else {
492 return NoCheckpointSnafu {
493 region_id: self.manifest.metadata.region_id,
494 last_version,
495 }
496 .fail();
497 };
498 self.store.reset_manifest_size();
499 self.store
500 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
501 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
502 let manifest = builder.try_build()?;
503 let last_version = self.set_version(manifest.manifest_version);
504 manifest
505 .removed_files
506 .update_file_removed_cnt_to_stats(&self.stats);
507 self.manifest = Arc::new(manifest);
508 info!(
509 "Installed region manifest from checkpoint: {}, region: {}",
510 checkpoint.last_version, self.manifest.metadata.region_id
511 );
512
513 Ok(last_version)
514 }
515
516 pub async fn update(
518 &mut self,
519 action_list: RegionMetaActionList,
520 is_staging: bool,
521 ) -> Result<ManifestVersion> {
522 let _t = MANIFEST_OP_ELAPSED
523 .with_label_values(&["update"])
524 .start_timer();
525
526 ensure!(
527 !self.stopped,
528 RegionStoppedSnafu {
529 region_id: self.manifest.metadata.region_id,
530 }
531 );
532
533 let version = self.increase_version();
534 self.store
535 .save(version, &action_list.encode()?, is_staging)
536 .await?;
537
538 let mut manifest_builder =
541 if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
542 RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
543 } else {
544 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
545 };
546
547 for action in action_list.actions {
548 match action {
549 RegionMetaAction::Change(action) => {
550 manifest_builder.apply_change(version, action);
551 }
552 RegionMetaAction::Edit(action) => {
553 manifest_builder.apply_edit(version, action);
554 }
555 RegionMetaAction::Remove(_) => {
556 debug!(
557 "Unhandled action for region {}, action: {:?}",
558 self.manifest.metadata.region_id, action
559 );
560 }
561 RegionMetaAction::Truncate(action) => {
562 manifest_builder.apply_truncate(version, action);
563 }
564 }
565 }
566
567 if is_staging {
568 let new_manifest = manifest_builder.try_build()?;
569 self.staging_manifest = Some(Arc::new(new_manifest));
570
571 info!(
572 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
573 self.manifest.metadata.region_id, self.manifest.manifest_version
574 );
575 } else {
576 let new_manifest = manifest_builder.try_build()?;
577 new_manifest
578 .removed_files
579 .update_file_removed_cnt_to_stats(&self.stats);
580 let updated_manifest = self
581 .checkpointer
582 .update_manifest_removed_files(new_manifest)?;
583 self.manifest = Arc::new(updated_manifest);
584 self.checkpointer
585 .maybe_do_checkpoint(self.manifest.as_ref());
586 }
587
588 Ok(version)
589 }
590
591 pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
593 let mut manifest = (*self.manifest()).clone();
594 manifest.removed_files.clear_deleted_files(deleted_files);
595 self.set_manifest(Arc::new(manifest));
596 }
597
598 pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
599 self.manifest = manifest;
600 }
601
602 pub fn manifest(&self) -> Arc<RegionManifest> {
604 self.manifest.clone()
605 }
606
607 pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
609 self.staging_manifest.clone()
610 }
611
612 pub fn manifest_usage(&self) -> u64 {
614 self.store.total_manifest_size()
615 }
616
617 pub async fn has_update(&self) -> Result<bool> {
623 let last_version = self.last_version();
624
625 let streamer =
626 self.store
627 .manifest_lister(false)
628 .await?
629 .context(error::EmptyManifestDirSnafu {
630 manifest_dir: self.store.manifest_dir(),
631 })?;
632
633 let need_update = streamer
634 .try_any(|entry| async move {
635 let file_name = entry.name();
636 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
637 let version = file_version(file_name);
638 if version > last_version {
639 return true;
640 }
641 }
642 false
643 })
644 .await
645 .context(error::OpenDalSnafu)?;
646
647 Ok(need_update)
648 }
649
650 fn increase_version(&mut self) -> ManifestVersion {
652 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
653 previous + 1
654 }
655
656 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
658 self.last_version.store(version, Ordering::Relaxed);
659 version
660 }
661
662 fn last_version(&self) -> ManifestVersion {
663 self.last_version.load(Ordering::Relaxed)
664 }
665
666 pub(crate) async fn last_checkpoint(
671 store: &mut ManifestObjectStore,
672 ) -> Result<Option<(RegionCheckpoint, u64)>> {
673 let last_checkpoint = store.load_last_checkpoint().await?;
674
675 if let Some((_, bytes)) = last_checkpoint {
676 let checkpoint = RegionCheckpoint::decode(&bytes)?;
677 Ok(Some((checkpoint, bytes.len() as u64)))
678 } else {
679 Ok(None)
680 }
681 }
682
683 pub fn store(&self) -> ManifestObjectStore {
684 self.store.clone()
685 }
686
687 #[cfg(test)]
688 pub(crate) fn checkpointer(&self) -> &Checkpointer {
689 &self.checkpointer
690 }
691
692 pub(crate) async fn merge_staged_actions(
695 &mut self,
696 region_state: RegionRoleState,
697 ) -> Result<Option<RegionMetaActionList>> {
698 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
700 return Ok(None);
701 }
702
703 let staging_manifests = self.store.fetch_staging_manifests().await?;
705
706 if staging_manifests.is_empty() {
707 info!(
708 "No staging manifests to merge for region {}",
709 self.manifest.metadata.region_id
710 );
711 return Ok(None);
712 }
713
714 info!(
715 "Merging {} staging manifests for region {}",
716 staging_manifests.len(),
717 self.manifest.metadata.region_id
718 );
719
720 let mut merged_actions = Vec::new();
722 let mut latest_version = self.last_version();
723
724 for (manifest_version, raw_action_list) in staging_manifests {
726 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
727
728 for action in action_list.actions {
729 merged_actions.push(action);
730 }
731
732 latest_version = latest_version.max(manifest_version);
733 }
734
735 if merged_actions.is_empty() {
736 return Ok(None);
737 }
738
739 info!(
740 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
741 merged_actions.len(),
742 self.manifest.metadata.region_id,
743 latest_version
744 );
745
746 Ok(Some(RegionMetaActionList::new(merged_actions)))
747 }
748
749 pub(crate) fn unset_staging_manifest(&mut self) {
751 self.staging_manifest = None;
752 }
753
754 pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
756 self.staging_manifest = None;
757 self.store.clear_staging_manifests().await?;
758 info!(
759 "Cleared all staging manifests for region {}",
760 self.manifest.metadata.region_id
761 );
762 Ok(())
763 }
764}
765
766#[cfg(test)]
767impl RegionManifestManager {
768 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
769 let manifest = self.manifest();
770 assert_eq!(manifest.metadata, *expect);
771 assert_eq!(self.manifest.manifest_version, self.last_version());
772 assert_eq!(last_version, self.last_version());
773 }
774}
775
776#[cfg(test)]
777mod test {
778 use std::time::Duration;
779
780 use api::v1::SemanticType;
781 use common_datasource::compression::CompressionType;
782 use common_test_util::temp_dir::create_temp_dir;
783 use datatypes::prelude::ConcreteDataType;
784 use datatypes::schema::ColumnSchema;
785 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
786
787 use super::*;
788 use crate::manifest::action::{RegionChange, RegionEdit};
789 use crate::manifest::tests::utils::basic_region_metadata;
790 use crate::test_util::TestEnv;
791
792 #[tokio::test]
793 async fn create_manifest_manager() {
794 let metadata = Arc::new(basic_region_metadata());
795 let env = TestEnv::new().await;
796 let manager = env
797 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
798 .await
799 .unwrap()
800 .unwrap();
801
802 manager.validate_manifest(&metadata, 0);
803 }
804
805 #[tokio::test]
806 async fn open_manifest_manager() {
807 let env = TestEnv::new().await;
808 assert!(
810 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
811 .await
812 .unwrap()
813 .is_none()
814 );
815
816 let metadata = Arc::new(basic_region_metadata());
818 let mut manager = env
819 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
820 .await
821 .unwrap()
822 .unwrap();
823 manager.stop().await;
825
826 let manager = env
828 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
829 .await
830 .unwrap()
831 .unwrap();
832
833 manager.validate_manifest(&metadata, 0);
834 }
835
836 #[tokio::test]
837 async fn manifest_with_partition_expr_roundtrip() {
838 let env = TestEnv::new().await;
839 let expr_json =
840 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
841 let mut metadata = basic_region_metadata();
842 metadata.partition_expr = Some(expr_json.to_string());
843 let metadata = Arc::new(metadata);
844 let mut manager = env
845 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
846 .await
847 .unwrap()
848 .unwrap();
849
850 let manifest = manager.manifest();
852 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
853
854 manager.stop().await;
855
856 let manager = env
858 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
859 .await
860 .unwrap()
861 .unwrap();
862 let manifest = manager.manifest();
863 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
864 }
865
866 #[tokio::test]
867 async fn region_change_add_column() {
868 let metadata = Arc::new(basic_region_metadata());
869 let env = TestEnv::new().await;
870 let mut manager = env
871 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
872 .await
873 .unwrap()
874 .unwrap();
875
876 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
877 new_metadata_builder.push_column_metadata(ColumnMetadata {
878 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
879 semantic_type: SemanticType::Field,
880 column_id: 252,
881 });
882 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
883
884 let action_list =
885 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
886 metadata: new_metadata.clone(),
887 sst_format: FormatType::PrimaryKey,
888 }));
889
890 let current_version = manager.update(action_list, false).await.unwrap();
891 assert_eq!(current_version, 1);
892 manager.validate_manifest(&new_metadata, 1);
893
894 manager.stop().await;
896 let manager = env
897 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
898 .await
899 .unwrap()
900 .unwrap();
901 manager.validate_manifest(&new_metadata, 1);
902 }
903
904 async fn manifest_dir_usage(path: &str) -> u64 {
906 let mut size = 0;
907 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
908 while let Ok(dir_entry) = read_dir.next_entry().await {
909 let Some(entry) = dir_entry else {
910 break;
911 };
912 if entry.file_type().await.unwrap().is_file() {
913 let file_name = entry.file_name().into_string().unwrap();
914 if file_name.contains(".checkpoint") || file_name.contains(".json") {
915 let file_size = entry.metadata().await.unwrap().len() as usize;
916 debug!("File: {file_name:?}, size: {file_size}");
917 size += file_size;
918 }
919 }
920 }
921 size as u64
922 }
923
924 #[tokio::test]
925 async fn test_manifest_size() {
926 let metadata = Arc::new(basic_region_metadata());
927 let data_home = create_temp_dir("");
928 let data_home_path = data_home.path().to_str().unwrap().to_string();
929 let env = TestEnv::with_data_home(data_home).await;
930
931 let manifest_dir = format!("{}/manifest", data_home_path);
932
933 let mut manager = env
934 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
935 .await
936 .unwrap()
937 .unwrap();
938
939 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
940 new_metadata_builder.push_column_metadata(ColumnMetadata {
941 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
942 semantic_type: SemanticType::Field,
943 column_id: 252,
944 });
945 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
946
947 let action_list =
948 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
949 metadata: new_metadata.clone(),
950 sst_format: FormatType::PrimaryKey,
951 }));
952
953 let current_version = manager.update(action_list, false).await.unwrap();
954 assert_eq!(current_version, 1);
955 manager.validate_manifest(&new_metadata, 1);
956
957 let manifest_size = manager.manifest_usage();
959 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
960
961 for _ in 0..10 {
963 manager
964 .update(
965 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
966 files_to_add: vec![],
967 files_to_remove: vec![],
968 timestamp_ms: None,
969 compaction_time_window: None,
970 flushed_entry_id: None,
971 flushed_sequence: None,
972 committed_sequence: None,
973 })]),
974 false,
975 )
976 .await
977 .unwrap();
978 }
979
980 while manager.checkpointer.is_doing_checkpoint() {
981 tokio::time::sleep(Duration::from_millis(10)).await;
982 }
983
984 let manifest_size = manager.manifest_usage();
986 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
987
988 manager.stop().await;
991 let manager = env
992 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
993 .await
994 .unwrap()
995 .unwrap();
996 manager.validate_manifest(&new_metadata, 11);
997
998 let manifest_size = manager.manifest_usage();
1000 assert_eq!(manifest_size, 1378);
1001 }
1002}