1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::FileId;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::config::MitoConfig;
28use crate::error::{
29 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
30};
31use crate::manifest::action::{
32 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
33 RegionMetaAction, RegionMetaActionList,
34};
35use crate::manifest::checkpointer::Checkpointer;
36use crate::manifest::storage::{
37 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38 manifest_dir,
39};
40use crate::metrics::MANIFEST_OP_ELAPSED;
41use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
42use crate::sst::FormatType;
43
44#[derive(Debug, Clone)]
46pub struct RegionManifestOptions {
47 pub manifest_dir: String,
49 pub object_store: ObjectStore,
50 pub compress_type: CompressionType,
51 pub checkpoint_distance: u64,
54 pub remove_file_options: RemoveFileOptions,
55}
56
57impl RegionManifestOptions {
58 pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
60 RegionManifestOptions {
61 manifest_dir: manifest_dir(region_dir),
62 object_store: object_store.clone(),
63 compress_type: manifest_compress_type(config.compress_manifest),
66 checkpoint_distance: config.manifest_checkpoint_distance,
67 remove_file_options: RemoveFileOptions {
68 enable_gc: config.gc.enable,
69 },
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76#[cfg_attr(any(test, feature = "test"), derive(Default))]
77pub struct RemoveFileOptions {
78 pub enable_gc: bool,
80}
81
82#[cfg_attr(doc, aquamarine::aquamarine)]
88#[derive(Debug)]
149pub struct RegionManifestManager {
150 store: ManifestObjectStore,
151 last_version: Arc<AtomicU64>,
152 checkpointer: Checkpointer,
153 manifest: Arc<RegionManifest>,
154 staging_manifest: Option<Arc<RegionManifest>>,
158 stats: ManifestStats,
159 stopped: bool,
160}
161
162impl RegionManifestManager {
163 pub async fn new(
165 metadata: RegionMetadataRef,
166 flushed_entry_id: u64,
167 options: RegionManifestOptions,
168 sst_format: FormatType,
169 stats: &ManifestStats,
170 ) -> Result<Self> {
171 let mut store = ManifestObjectStore::new(
173 &options.manifest_dir,
174 options.object_store.clone(),
175 options.compress_type,
176 stats.total_manifest_size.clone(),
177 );
178 let manifest_version = stats.manifest_version.clone();
179
180 info!(
181 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
182 options.manifest_dir, metadata, flushed_entry_id
183 );
184
185 let version = MIN_VERSION;
186 let mut manifest_builder = RegionManifestBuilder::default();
187 manifest_builder.apply_change(
189 version,
190 RegionChange {
191 metadata: metadata.clone(),
192 sst_format,
193 },
194 );
195 let manifest = manifest_builder.try_build()?;
196 let region_id = metadata.region_id;
197
198 debug!(
199 "Build region manifest in {}, manifest: {:?}",
200 options.manifest_dir, manifest
201 );
202
203 let mut actions = vec![RegionMetaAction::Change(RegionChange {
204 metadata,
205 sst_format,
206 })];
207 if flushed_entry_id > 0 {
208 actions.push(RegionMetaAction::Edit(RegionEdit {
209 files_to_add: vec![],
210 files_to_remove: vec![],
211 timestamp_ms: None,
212 compaction_time_window: None,
213 flushed_entry_id: Some(flushed_entry_id),
214 flushed_sequence: None,
215 committed_sequence: None,
216 }));
217 }
218
219 let action_list = RegionMetaActionList::new(actions);
221
222 store.save(version, &action_list.encode()?, false).await?;
225
226 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
227 manifest_version.store(version, Ordering::Relaxed);
228 manifest
229 .removed_files
230 .update_file_removed_cnt_to_stats(stats);
231 Ok(Self {
232 store,
233 last_version: manifest_version,
234 checkpointer,
235 manifest: Arc::new(manifest),
236 staging_manifest: None,
237 stats: stats.clone(),
238 stopped: false,
239 })
240 }
241
242 pub async fn open(
246 options: RegionManifestOptions,
247 stats: &ManifestStats,
248 ) -> Result<Option<Self>> {
249 let _t = MANIFEST_OP_ELAPSED
250 .with_label_values(&["open"])
251 .start_timer();
252
253 let mut store = ManifestObjectStore::new(
255 &options.manifest_dir,
256 options.object_store.clone(),
257 options.compress_type,
258 stats.total_manifest_size.clone(),
259 );
260 let manifest_version = stats.manifest_version.clone();
261
262 let mut version = MIN_VERSION;
266 let checkpoint = Self::last_checkpoint(&mut store).await?;
267 let last_checkpoint_version = checkpoint
268 .as_ref()
269 .map(|(checkpoint, _)| checkpoint.last_version)
270 .unwrap_or(MIN_VERSION);
271 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
272 info!(
273 "Recover region manifest {} from checkpoint version {}",
274 options.manifest_dir, checkpoint.last_version
275 );
276 version = version.max(checkpoint.last_version + 1);
277 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
278 } else {
279 info!(
280 "Checkpoint not found in {}, build manifest from scratch",
281 options.manifest_dir
282 );
283 RegionManifestBuilder::default()
284 };
285
286 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
288
289 for (manifest_version, raw_action_list) in manifests {
290 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
291 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
293 for action in action_list.actions {
294 match action {
295 RegionMetaAction::Change(action) => {
296 manifest_builder.apply_change(manifest_version, action);
297 }
298 RegionMetaAction::Edit(action) => {
299 manifest_builder.apply_edit(manifest_version, action);
300 }
301 RegionMetaAction::Remove(_) => {
302 debug!(
303 "Unhandled action in {}, action: {:?}",
304 options.manifest_dir, action
305 );
306 }
307 RegionMetaAction::Truncate(action) => {
308 manifest_builder.apply_truncate(manifest_version, action);
309 }
310 }
311 }
312 }
313
314 if !manifest_builder.contains_metadata() {
316 debug!("No region manifest in {}", options.manifest_dir);
317 return Ok(None);
318 }
319
320 let manifest = manifest_builder.try_build()?;
321 debug!(
322 "Recovered region manifest from {}, manifest: {:?}",
323 options.manifest_dir, manifest
324 );
325 let version = manifest.manifest_version;
326
327 let checkpointer = Checkpointer::new(
328 manifest.metadata.region_id,
329 options,
330 store.clone(),
331 last_checkpoint_version,
332 );
333 manifest_version.store(version, Ordering::Relaxed);
334 manifest
335 .removed_files
336 .update_file_removed_cnt_to_stats(stats);
337 Ok(Some(Self {
338 store,
339 last_version: manifest_version,
340 checkpointer,
341 manifest: Arc::new(manifest),
342 staging_manifest: None,
344 stats: stats.clone(),
345 stopped: false,
346 }))
347 }
348
349 pub async fn stop(&mut self) {
351 self.stopped = true;
352 }
353
354 pub async fn install_manifest_to(
360 &mut self,
361 target_version: ManifestVersion,
362 ) -> Result<ManifestVersion> {
363 let _t = MANIFEST_OP_ELAPSED
364 .with_label_values(&["install_manifest_to"])
365 .start_timer();
366
367 let last_version = self.last_version();
368 if last_version >= target_version {
370 debug!(
371 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
372 target_version, last_version, self.manifest.metadata.region_id
373 );
374 return Ok(last_version);
375 }
376
377 ensure!(
378 !self.stopped,
379 RegionStoppedSnafu {
380 region_id: self.manifest.metadata.region_id,
381 }
382 );
383
384 let region_id = self.manifest.metadata.region_id;
385 let mut manifests = self
387 .store
388 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
390 .await?;
391
392 if manifests.is_empty() {
399 info!(
400 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
401 last_version, self.manifest.metadata.region_id
402 );
403 let last_version = self.install_last_checkpoint().await?;
404 if last_version >= target_version {
406 return Ok(last_version);
407 }
408
409 manifests = self
411 .store
412 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
414 .await?;
415 }
416
417 if manifests.is_empty() {
418 return NoManifestsSnafu {
419 region_id: self.manifest.metadata.region_id,
420 start_version: last_version + 1,
421 end_version: target_version + 1,
422 last_version,
423 }
424 .fail();
425 }
426
427 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
428 let mut manifest_builder =
429 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
430
431 for (manifest_version, raw_action_list) in manifests {
432 self.store
433 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
434 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
435 for action in action_list.actions {
436 match action {
437 RegionMetaAction::Change(action) => {
438 manifest_builder.apply_change(manifest_version, action);
439 }
440 RegionMetaAction::Edit(action) => {
441 manifest_builder.apply_edit(manifest_version, action);
442 }
443 RegionMetaAction::Remove(_) => {
444 debug!(
445 "Unhandled action for region {}, action: {:?}",
446 self.manifest.metadata.region_id, action
447 );
448 }
449 RegionMetaAction::Truncate(action) => {
450 manifest_builder.apply_truncate(manifest_version, action);
451 }
452 }
453 }
454 }
455
456 let new_manifest = manifest_builder.try_build()?;
457 ensure!(
458 new_manifest.manifest_version >= target_version,
459 InstallManifestToSnafu {
460 region_id: self.manifest.metadata.region_id,
461 target_version,
462 available_version: new_manifest.manifest_version,
463 last_version,
464 }
465 );
466
467 let version = self.last_version();
468 new_manifest
469 .removed_files
470 .update_file_removed_cnt_to_stats(&self.stats);
471 self.manifest = Arc::new(new_manifest);
472 let last_version = self.set_version(self.manifest.manifest_version);
473 info!(
474 "Install manifest changes from {} to {}, region: {}",
475 version, last_version, self.manifest.metadata.region_id
476 );
477
478 Ok(last_version)
479 }
480
481 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
483 let last_version = self.last_version();
484 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
485 else {
486 return NoCheckpointSnafu {
487 region_id: self.manifest.metadata.region_id,
488 last_version,
489 }
490 .fail();
491 };
492 self.store.reset_manifest_size();
493 self.store
494 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
495 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
496 let manifest = builder.try_build()?;
497 let last_version = self.set_version(manifest.manifest_version);
498 manifest
499 .removed_files
500 .update_file_removed_cnt_to_stats(&self.stats);
501 self.manifest = Arc::new(manifest);
502 info!(
503 "Installed region manifest from checkpoint: {}, region: {}",
504 checkpoint.last_version, self.manifest.metadata.region_id
505 );
506
507 Ok(last_version)
508 }
509
510 pub async fn update(
512 &mut self,
513 action_list: RegionMetaActionList,
514 is_staging: bool,
515 ) -> Result<ManifestVersion> {
516 let _t = MANIFEST_OP_ELAPSED
517 .with_label_values(&["update"])
518 .start_timer();
519
520 ensure!(
521 !self.stopped,
522 RegionStoppedSnafu {
523 region_id: self.manifest.metadata.region_id,
524 }
525 );
526
527 let version = self.increase_version();
528 self.store
529 .save(version, &action_list.encode()?, is_staging)
530 .await?;
531
532 let mut manifest_builder =
535 if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
536 RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
537 } else {
538 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
539 };
540
541 for action in action_list.actions {
542 match action {
543 RegionMetaAction::Change(action) => {
544 manifest_builder.apply_change(version, action);
545 }
546 RegionMetaAction::Edit(action) => {
547 manifest_builder.apply_edit(version, action);
548 }
549 RegionMetaAction::Remove(_) => {
550 debug!(
551 "Unhandled action for region {}, action: {:?}",
552 self.manifest.metadata.region_id, action
553 );
554 }
555 RegionMetaAction::Truncate(action) => {
556 manifest_builder.apply_truncate(version, action);
557 }
558 }
559 }
560
561 if is_staging {
562 let new_manifest = manifest_builder.try_build()?;
563 self.staging_manifest = Some(Arc::new(new_manifest));
564
565 info!(
566 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
567 self.manifest.metadata.region_id, self.manifest.manifest_version
568 );
569 } else {
570 let new_manifest = manifest_builder.try_build()?;
571 new_manifest
572 .removed_files
573 .update_file_removed_cnt_to_stats(&self.stats);
574 let updated_manifest = self
575 .checkpointer
576 .update_manifest_removed_files(new_manifest)?;
577 self.manifest = Arc::new(updated_manifest);
578 self.checkpointer
579 .maybe_do_checkpoint(self.manifest.as_ref());
580 }
581
582 Ok(version)
583 }
584
585 pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
587 let mut manifest = (*self.manifest()).clone();
588 manifest.removed_files.clear_deleted_files(deleted_files);
589 self.set_manifest(Arc::new(manifest));
590 }
591
592 pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
593 self.manifest = manifest;
594 }
595
596 pub fn manifest(&self) -> Arc<RegionManifest> {
598 self.manifest.clone()
599 }
600
601 pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
603 self.staging_manifest.clone()
604 }
605
606 pub fn manifest_usage(&self) -> u64 {
608 self.store.total_manifest_size()
609 }
610
611 pub async fn has_update(&self) -> Result<bool> {
617 let last_version = self.last_version();
618
619 let streamer =
620 self.store
621 .manifest_lister(false)
622 .await?
623 .context(error::EmptyManifestDirSnafu {
624 manifest_dir: self.store.manifest_dir(),
625 })?;
626
627 let need_update = streamer
628 .try_any(|entry| async move {
629 let file_name = entry.name();
630 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
631 let version = file_version(file_name);
632 if version > last_version {
633 return true;
634 }
635 }
636 false
637 })
638 .await
639 .context(error::OpenDalSnafu)?;
640
641 Ok(need_update)
642 }
643
644 fn increase_version(&mut self) -> ManifestVersion {
646 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
647 previous + 1
648 }
649
650 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
652 self.last_version.store(version, Ordering::Relaxed);
653 version
654 }
655
656 fn last_version(&self) -> ManifestVersion {
657 self.last_version.load(Ordering::Relaxed)
658 }
659
660 pub(crate) async fn last_checkpoint(
665 store: &mut ManifestObjectStore,
666 ) -> Result<Option<(RegionCheckpoint, u64)>> {
667 let last_checkpoint = store.load_last_checkpoint().await?;
668
669 if let Some((_, bytes)) = last_checkpoint {
670 let checkpoint = RegionCheckpoint::decode(&bytes)?;
671 Ok(Some((checkpoint, bytes.len() as u64)))
672 } else {
673 Ok(None)
674 }
675 }
676
677 pub fn store(&self) -> ManifestObjectStore {
678 self.store.clone()
679 }
680
681 #[cfg(test)]
682 pub(crate) fn checkpointer(&self) -> &Checkpointer {
683 &self.checkpointer
684 }
685
686 pub(crate) async fn merge_staged_actions(
689 &mut self,
690 region_state: RegionRoleState,
691 ) -> Result<Option<RegionMetaActionList>> {
692 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
694 return Ok(None);
695 }
696
697 let staging_manifests = self.store.fetch_staging_manifests().await?;
699
700 if staging_manifests.is_empty() {
701 info!(
702 "No staging manifests to merge for region {}",
703 self.manifest.metadata.region_id
704 );
705 return Ok(None);
706 }
707
708 info!(
709 "Merging {} staging manifests for region {}",
710 staging_manifests.len(),
711 self.manifest.metadata.region_id
712 );
713
714 let mut merged_actions = Vec::new();
716 let mut latest_version = self.last_version();
717
718 for (manifest_version, raw_action_list) in staging_manifests {
720 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
721
722 for action in action_list.actions {
723 merged_actions.push(action);
724 }
725
726 latest_version = latest_version.max(manifest_version);
727 }
728
729 if merged_actions.is_empty() {
730 return Ok(None);
731 }
732
733 info!(
734 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
735 merged_actions.len(),
736 self.manifest.metadata.region_id,
737 latest_version
738 );
739
740 Ok(Some(RegionMetaActionList::new(merged_actions)))
741 }
742
743 pub(crate) fn unset_staging_manifest(&mut self) {
745 self.staging_manifest = None;
746 }
747
748 pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
750 self.staging_manifest = None;
751 self.store.clear_staging_manifests().await?;
752 info!(
753 "Cleared all staging manifests for region {}",
754 self.manifest.metadata.region_id
755 );
756 Ok(())
757 }
758}
759
760#[cfg(test)]
761impl RegionManifestManager {
762 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
763 let manifest = self.manifest();
764 assert_eq!(manifest.metadata, *expect);
765 assert_eq!(self.manifest.manifest_version, self.last_version());
766 assert_eq!(last_version, self.last_version());
767 }
768}
769
770#[cfg(test)]
771mod test {
772 use std::time::Duration;
773
774 use api::v1::SemanticType;
775 use common_datasource::compression::CompressionType;
776 use common_test_util::temp_dir::create_temp_dir;
777 use datatypes::prelude::ConcreteDataType;
778 use datatypes::schema::ColumnSchema;
779 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
780
781 use super::*;
782 use crate::manifest::action::{RegionChange, RegionEdit};
783 use crate::manifest::tests::utils::basic_region_metadata;
784 use crate::test_util::TestEnv;
785
786 #[tokio::test]
787 async fn create_manifest_manager() {
788 let metadata = Arc::new(basic_region_metadata());
789 let env = TestEnv::new().await;
790 let manager = env
791 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
792 .await
793 .unwrap()
794 .unwrap();
795
796 manager.validate_manifest(&metadata, 0);
797 }
798
799 #[tokio::test]
800 async fn open_manifest_manager() {
801 let env = TestEnv::new().await;
802 assert!(
804 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
805 .await
806 .unwrap()
807 .is_none()
808 );
809
810 let metadata = Arc::new(basic_region_metadata());
812 let mut manager = env
813 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
814 .await
815 .unwrap()
816 .unwrap();
817 manager.stop().await;
819
820 let manager = env
822 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
823 .await
824 .unwrap()
825 .unwrap();
826
827 manager.validate_manifest(&metadata, 0);
828 }
829
830 #[tokio::test]
831 async fn manifest_with_partition_expr_roundtrip() {
832 let env = TestEnv::new().await;
833 let expr_json =
834 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
835 let mut metadata = basic_region_metadata();
836 metadata.partition_expr = Some(expr_json.to_string());
837 let metadata = Arc::new(metadata);
838 let mut manager = env
839 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
840 .await
841 .unwrap()
842 .unwrap();
843
844 let manifest = manager.manifest();
846 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
847
848 manager.stop().await;
849
850 let manager = env
852 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
853 .await
854 .unwrap()
855 .unwrap();
856 let manifest = manager.manifest();
857 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
858 }
859
860 #[tokio::test]
861 async fn region_change_add_column() {
862 let metadata = Arc::new(basic_region_metadata());
863 let env = TestEnv::new().await;
864 let mut manager = env
865 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
866 .await
867 .unwrap()
868 .unwrap();
869
870 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
871 new_metadata_builder.push_column_metadata(ColumnMetadata {
872 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
873 semantic_type: SemanticType::Field,
874 column_id: 252,
875 });
876 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
877
878 let action_list =
879 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
880 metadata: new_metadata.clone(),
881 sst_format: FormatType::PrimaryKey,
882 }));
883
884 let current_version = manager.update(action_list, false).await.unwrap();
885 assert_eq!(current_version, 1);
886 manager.validate_manifest(&new_metadata, 1);
887
888 manager.stop().await;
890 let manager = env
891 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
892 .await
893 .unwrap()
894 .unwrap();
895 manager.validate_manifest(&new_metadata, 1);
896 }
897
898 async fn manifest_dir_usage(path: &str) -> u64 {
900 let mut size = 0;
901 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
902 while let Ok(dir_entry) = read_dir.next_entry().await {
903 let Some(entry) = dir_entry else {
904 break;
905 };
906 if entry.file_type().await.unwrap().is_file() {
907 let file_name = entry.file_name().into_string().unwrap();
908 if file_name.contains(".checkpoint") || file_name.contains(".json") {
909 let file_size = entry.metadata().await.unwrap().len() as usize;
910 debug!("File: {file_name:?}, size: {file_size}");
911 size += file_size;
912 }
913 }
914 }
915 size as u64
916 }
917
918 #[tokio::test]
919 async fn test_manifest_size() {
920 let metadata = Arc::new(basic_region_metadata());
921 let data_home = create_temp_dir("");
922 let data_home_path = data_home.path().to_str().unwrap().to_string();
923 let env = TestEnv::with_data_home(data_home).await;
924
925 let manifest_dir = format!("{}/manifest", data_home_path);
926
927 let mut manager = env
928 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
929 .await
930 .unwrap()
931 .unwrap();
932
933 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
934 new_metadata_builder.push_column_metadata(ColumnMetadata {
935 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
936 semantic_type: SemanticType::Field,
937 column_id: 252,
938 });
939 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
940
941 let action_list =
942 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
943 metadata: new_metadata.clone(),
944 sst_format: FormatType::PrimaryKey,
945 }));
946
947 let current_version = manager.update(action_list, false).await.unwrap();
948 assert_eq!(current_version, 1);
949 manager.validate_manifest(&new_metadata, 1);
950
951 let manifest_size = manager.manifest_usage();
953 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
954
955 for _ in 0..10 {
957 manager
958 .update(
959 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
960 files_to_add: vec![],
961 files_to_remove: vec![],
962 timestamp_ms: None,
963 compaction_time_window: None,
964 flushed_entry_id: None,
965 flushed_sequence: None,
966 committed_sequence: None,
967 })]),
968 false,
969 )
970 .await
971 .unwrap();
972 }
973
974 while manager.checkpointer.is_doing_checkpoint() {
975 tokio::time::sleep(Duration::from_millis(10)).await;
976 }
977
978 let manifest_size = manager.manifest_usage();
980 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
981
982 manager.stop().await;
985 let manager = env
986 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
987 .await
988 .unwrap()
989 .unwrap();
990 manager.validate_manifest(&new_metadata, 11);
991
992 let manifest_size = manager.manifest_usage();
994 assert_eq!(manifest_size, 1378);
995 }
996}