1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use common_datasource::compression::CompressionType;
20use common_telemetry::{debug, info};
21use futures::TryStreamExt;
22use object_store::ObjectStore;
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::metadata::RegionMetadataRef;
25use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::config::MitoConfig;
29use crate::error::{
30 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
31};
32use crate::manifest::action::{
33 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
34 RegionMetaAction, RegionMetaActionList, RemovedFile,
35};
36use crate::manifest::checkpointer::Checkpointer;
37use crate::manifest::storage::{
38 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, list_start_after,
39 manifest_compress_type, manifest_dir,
40};
41use crate::metrics::MANIFEST_OP_ELAPSED;
42use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
43use crate::sst::FormatType;
44
45#[derive(Debug, Clone)]
47pub struct RegionManifestOptions {
48 pub manifest_dir: String,
50 pub object_store: ObjectStore,
51 pub compress_type: CompressionType,
52 pub checkpoint_distance: u64,
55 pub remove_file_options: RemoveFileOptions,
56 pub manifest_cache: Option<ManifestCache>,
58}
59
60impl RegionManifestOptions {
61 pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
63 RegionManifestOptions {
64 manifest_dir: manifest_dir(region_dir),
65 object_store: object_store.clone(),
66 compress_type: manifest_compress_type(config.compress_manifest),
69 checkpoint_distance: config.manifest_checkpoint_distance,
70 remove_file_options: RemoveFileOptions {
71 enable_gc: config.gc.enable,
72 },
73 manifest_cache: None,
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
80#[cfg_attr(any(test, feature = "test"), derive(Default))]
81pub struct RemoveFileOptions {
82 pub enable_gc: bool,
84}
85
86#[cfg_attr(doc, aquamarine::aquamarine)]
92#[derive(Debug)]
153pub struct RegionManifestManager {
154 store: ManifestObjectStore,
155 last_version: Arc<AtomicU64>,
156 checkpointer: Checkpointer,
157 manifest: Arc<RegionManifest>,
158 staging_manifest: Option<Arc<RegionManifest>>,
162 stats: ManifestStats,
163 stopped: bool,
164}
165
166impl RegionManifestManager {
167 pub async fn new(
169 metadata: RegionMetadataRef,
170 flushed_entry_id: u64,
171 options: RegionManifestOptions,
172 sst_format: FormatType,
173 stats: &ManifestStats,
174 ) -> Result<Self> {
175 let mut store = ManifestObjectStore::new(
177 &options.manifest_dir,
178 options.object_store.clone(),
179 options.compress_type,
180 stats.total_manifest_size.clone(),
181 options.manifest_cache.clone(),
182 );
183 let manifest_version = stats.manifest_version.clone();
184
185 info!(
186 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
187 options.manifest_dir, metadata, flushed_entry_id
188 );
189
190 let version = MIN_VERSION;
191 let mut manifest_builder = RegionManifestBuilder::default();
192 manifest_builder.apply_change(
194 version,
195 RegionChange {
196 metadata: metadata.clone(),
197 sst_format,
198 append_mode: None,
199 },
200 );
201 let manifest = manifest_builder.try_build()?;
202 let region_id = metadata.region_id;
203
204 debug!(
205 "Build region manifest in {}, manifest: {:?}",
206 options.manifest_dir, manifest
207 );
208
209 let mut actions = vec![RegionMetaAction::Change(RegionChange {
210 metadata,
211 sst_format,
212 append_mode: None,
213 })];
214 if flushed_entry_id > 0 {
215 actions.push(RegionMetaAction::Edit(RegionEdit {
216 files_to_add: vec![],
217 files_to_remove: vec![],
218 timestamp_ms: None,
219 compaction_time_window: None,
220 flushed_entry_id: Some(flushed_entry_id),
221 flushed_sequence: None,
222 committed_sequence: None,
223 }));
224 }
225
226 let action_list = RegionMetaActionList::new(actions);
228
229 store.save(version, &action_list.encode()?, false).await?;
232
233 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
234 manifest_version.store(version, Ordering::Relaxed);
235 manifest
236 .removed_files
237 .update_file_removed_cnt_to_stats(stats);
238 Ok(Self {
239 store,
240 last_version: manifest_version,
241 checkpointer,
242 manifest: Arc::new(manifest),
243 staging_manifest: None,
244 stats: stats.clone(),
245 stopped: false,
246 })
247 }
248
249 pub async fn open(
253 options: RegionManifestOptions,
254 stats: &ManifestStats,
255 ) -> Result<Option<Self>> {
256 let _t = MANIFEST_OP_ELAPSED
257 .with_label_values(&["open"])
258 .start_timer();
259 let open_start = Instant::now();
260
261 let mut store = ManifestObjectStore::new(
263 &options.manifest_dir,
264 options.object_store.clone(),
265 options.compress_type,
266 stats.total_manifest_size.clone(),
267 options.manifest_cache.clone(),
268 );
269 let manifest_version = stats.manifest_version.clone();
270
271 let mut version = MIN_VERSION;
275 let checkpoint = Self::last_checkpoint(&mut store).await?;
276 let last_checkpoint_version = checkpoint
277 .as_ref()
278 .map(|(checkpoint, _)| checkpoint.last_version)
279 .unwrap_or(MIN_VERSION);
280 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
281 info!(
282 "Recover region manifest {} from checkpoint version {}",
283 options.manifest_dir, checkpoint.last_version
284 );
285 version = version.max(checkpoint.last_version + 1);
286 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
287 } else {
288 info!(
289 "Checkpoint not found in {}, build manifest from scratch",
290 options.manifest_dir
291 );
292 RegionManifestBuilder::default()
293 };
294
295 let replay_start_version = version;
296 info!(
297 "Replaying region manifest {} from version {}, last checkpoint version: {}",
298 options.manifest_dir, replay_start_version, last_checkpoint_version,
299 );
300
301 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
303 let replayed_deltas = manifests.len();
304
305 for (manifest_version, raw_action_list) in manifests {
306 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
307 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
309 for action in action_list.actions {
310 match action {
311 RegionMetaAction::Change(action) => {
312 manifest_builder.apply_change(manifest_version, action);
313 }
314 RegionMetaAction::PartitionExprChange(action) => {
315 manifest_builder.apply_partition_expr_change(manifest_version, action);
316 }
317 RegionMetaAction::Edit(action) => {
318 manifest_builder.apply_edit(manifest_version, action);
319 }
320 RegionMetaAction::Remove(_) => {
321 debug!(
322 "Unhandled action in {}, action: {:?}",
323 options.manifest_dir, action
324 );
325 }
326 RegionMetaAction::Truncate(action) => {
327 manifest_builder.apply_truncate(manifest_version, action);
328 }
329 }
330 }
331 }
332
333 if !manifest_builder.contains_metadata() {
335 debug!("No region manifest in {}", options.manifest_dir);
336 return Ok(None);
337 }
338
339 let manifest = manifest_builder.try_build()?;
340 debug!(
341 "Recovered region manifest from {}, manifest: {:?}",
342 options.manifest_dir, manifest
343 );
344 let version = manifest.manifest_version;
345
346 let manifest_dir = options.manifest_dir.clone();
347 let checkpointer = Checkpointer::new(
348 manifest.metadata.region_id,
349 options,
350 store.clone(),
351 last_checkpoint_version,
352 );
353 manifest_version.store(version, Ordering::Relaxed);
354 manifest
355 .removed_files
356 .update_file_removed_cnt_to_stats(stats);
357 info!(
358 "Opened region manifest {}, region_id: {}, start_version: {}, last_checkpoint_version: {}, replayed_deltas: {}, final_version: {}, cost: {:?}",
359 manifest_dir,
360 manifest.metadata.region_id,
361 replay_start_version,
362 last_checkpoint_version,
363 replayed_deltas,
364 version,
365 open_start.elapsed(),
366 );
367 Ok(Some(Self {
368 store,
369 last_version: manifest_version,
370 checkpointer,
371 manifest: Arc::new(manifest),
372 staging_manifest: None,
374 stats: stats.clone(),
375 stopped: false,
376 }))
377 }
378
379 pub async fn stop(&mut self) {
381 self.stopped = true;
382 }
383
384 pub async fn install_manifest_to(
390 &mut self,
391 target_version: ManifestVersion,
392 ) -> Result<ManifestVersion> {
393 let _t = MANIFEST_OP_ELAPSED
394 .with_label_values(&["install_manifest_to"])
395 .start_timer();
396
397 let last_version = self.last_version();
398 if last_version >= target_version {
400 debug!(
401 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
402 target_version, last_version, self.manifest.metadata.region_id
403 );
404 return Ok(last_version);
405 }
406
407 ensure!(
408 !self.stopped,
409 RegionStoppedSnafu {
410 region_id: self.manifest.metadata.region_id,
411 }
412 );
413
414 let region_id = self.manifest.metadata.region_id;
415 let mut manifests = self
417 .store
418 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
420 .await?;
421
422 if manifests.is_empty() {
429 info!(
430 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
431 last_version, self.manifest.metadata.region_id
432 );
433 let last_version = self.install_last_checkpoint().await?;
434 if last_version >= target_version {
436 return Ok(last_version);
437 }
438
439 manifests = self
441 .store
442 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
444 .await?;
445 }
446
447 if manifests.is_empty() {
448 return NoManifestsSnafu {
449 region_id: self.manifest.metadata.region_id,
450 start_version: last_version + 1,
451 end_version: target_version + 1,
452 last_version,
453 }
454 .fail();
455 }
456
457 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
458 let mut manifest_builder =
459 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
460
461 for (manifest_version, raw_action_list) in manifests {
462 self.store
463 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
464 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
465 for action in action_list.actions {
466 match action {
467 RegionMetaAction::Change(action) => {
468 manifest_builder.apply_change(manifest_version, action);
469 }
470 RegionMetaAction::PartitionExprChange(action) => {
471 manifest_builder.apply_partition_expr_change(manifest_version, action);
472 }
473 RegionMetaAction::Edit(action) => {
474 manifest_builder.apply_edit(manifest_version, action);
475 }
476 RegionMetaAction::Remove(_) => {
477 debug!(
478 "Unhandled action for region {}, action: {:?}",
479 self.manifest.metadata.region_id, action
480 );
481 }
482 RegionMetaAction::Truncate(action) => {
483 manifest_builder.apply_truncate(manifest_version, action);
484 }
485 }
486 }
487 }
488
489 let new_manifest = manifest_builder.try_build()?;
490 ensure!(
491 new_manifest.manifest_version >= target_version,
492 InstallManifestToSnafu {
493 region_id: self.manifest.metadata.region_id,
494 target_version,
495 available_version: new_manifest.manifest_version,
496 last_version,
497 }
498 );
499
500 let version = self.last_version();
501 new_manifest
502 .removed_files
503 .update_file_removed_cnt_to_stats(&self.stats);
504 self.manifest = Arc::new(new_manifest);
505 let last_version = self.set_version(self.manifest.manifest_version);
506 info!(
507 "Install manifest changes from {} to {}, region: {}",
508 version, last_version, self.manifest.metadata.region_id
509 );
510
511 Ok(last_version)
512 }
513
514 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
516 let last_version = self.last_version();
517 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
518 else {
519 return NoCheckpointSnafu {
520 region_id: self.manifest.metadata.region_id,
521 last_version,
522 }
523 .fail();
524 };
525 self.store.reset_manifest_size();
526 self.store
527 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
528 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
529 let manifest = builder.try_build()?;
530 let last_version = self.set_version(manifest.manifest_version);
531 manifest
532 .removed_files
533 .update_file_removed_cnt_to_stats(&self.stats);
534 self.manifest = Arc::new(manifest);
535 info!(
536 "Installed region manifest from checkpoint: {}, region: {}",
537 checkpoint.last_version, self.manifest.metadata.region_id
538 );
539
540 Ok(last_version)
541 }
542
543 pub async fn update(
545 &mut self,
546 action_list: RegionMetaActionList,
547 is_staging: bool,
548 ) -> Result<ManifestVersion> {
549 let _t = MANIFEST_OP_ELAPSED
550 .with_label_values(&["update"])
551 .start_timer();
552
553 ensure!(
554 !self.stopped,
555 RegionStoppedSnafu {
556 region_id: self.manifest.metadata.region_id,
557 }
558 );
559
560 let version = self.increase_version();
561 self.store
562 .save(version, &action_list.encode()?, is_staging)
563 .await?;
564
565 let mut manifest_builder =
568 if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
569 RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
570 } else {
571 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
572 };
573
574 for action in action_list.actions {
575 match action {
576 RegionMetaAction::Change(action) => {
577 manifest_builder.apply_change(version, action);
578 }
579 RegionMetaAction::PartitionExprChange(action) => {
580 manifest_builder.apply_partition_expr_change(version, action);
581 }
582 RegionMetaAction::Edit(action) => {
583 manifest_builder.apply_edit(version, action);
584 }
585 RegionMetaAction::Remove(_) => {
586 debug!(
587 "Unhandled action for region {}, action: {:?}",
588 self.manifest.metadata.region_id, action
589 );
590 }
591 RegionMetaAction::Truncate(action) => {
592 manifest_builder.apply_truncate(version, action);
593 }
594 }
595 }
596
597 if is_staging {
598 let new_manifest = manifest_builder.try_build()?;
599 self.staging_manifest = Some(Arc::new(new_manifest));
600
601 info!(
602 "Skipping checkpoint for region {} in staging mode, manifest version: {}",
603 self.manifest.metadata.region_id, self.manifest.manifest_version
604 );
605 } else {
606 let new_manifest = manifest_builder.try_build()?;
607 new_manifest
608 .removed_files
609 .update_file_removed_cnt_to_stats(&self.stats);
610 let updated_manifest = self
611 .checkpointer
612 .update_manifest_removed_files(new_manifest)?;
613 self.manifest = Arc::new(updated_manifest);
614 self.checkpointer
615 .maybe_do_checkpoint(self.manifest.as_ref());
616 }
617
618 Ok(version)
619 }
620
621 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
623 let mut manifest = (*self.manifest()).clone();
624 manifest.removed_files.clear_deleted_files(deleted_files);
625 self.set_manifest(Arc::new(manifest));
626 }
627
628 pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
629 self.manifest = manifest;
630 }
631
632 pub fn manifest(&self) -> Arc<RegionManifest> {
634 self.manifest.clone()
635 }
636
637 pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
639 self.staging_manifest.clone()
640 }
641
642 pub fn manifest_usage(&self) -> u64 {
644 self.store.total_manifest_size()
645 }
646
647 pub async fn has_update(&self) -> Result<bool> {
653 let last_version = self.last_version();
654
655 let start_after = list_start_after(self.store.manifest_dir(), last_version);
659 let streamer = self
660 .store
661 .manifest_lister(false, Some(&start_after))
662 .await?
663 .context(error::EmptyManifestDirSnafu {
664 manifest_dir: self.store.manifest_dir(),
665 })?;
666
667 let need_update = streamer
668 .try_any(|entry| async move {
669 let file_name = entry.name();
670 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
671 let version = file_version(file_name);
672 if version > last_version {
673 return true;
674 }
675 }
676 false
677 })
678 .await
679 .context(error::OpenDalSnafu)?;
680
681 Ok(need_update)
682 }
683
684 fn increase_version(&mut self) -> ManifestVersion {
686 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
687 previous + 1
688 }
689
690 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
692 self.last_version.store(version, Ordering::Relaxed);
693 version
694 }
695
696 fn last_version(&self) -> ManifestVersion {
697 self.last_version.load(Ordering::Relaxed)
698 }
699
700 pub(crate) async fn last_checkpoint(
705 store: &mut ManifestObjectStore,
706 ) -> Result<Option<(RegionCheckpoint, u64)>> {
707 let last_checkpoint = store.load_last_checkpoint().await?;
708
709 if let Some((_, bytes)) = last_checkpoint {
710 let checkpoint = RegionCheckpoint::decode(&bytes)?;
711 Ok(Some((checkpoint, bytes.len() as u64)))
712 } else {
713 Ok(None)
714 }
715 }
716
717 pub fn store(&self) -> ManifestObjectStore {
718 self.store.clone()
719 }
720
721 #[cfg(test)]
722 pub(crate) fn checkpointer(&self) -> &Checkpointer {
723 &self.checkpointer
724 }
725
726 pub(crate) async fn merge_staged_actions(
729 &mut self,
730 region_state: RegionRoleState,
731 ) -> Result<Option<RegionMetaActionList>> {
732 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
734 return Ok(None);
735 }
736
737 let staging_manifests = self.store.fetch_staging_manifests().await?;
739
740 if staging_manifests.is_empty() {
741 info!(
742 "No staging manifests to merge for region {}",
743 self.manifest.metadata.region_id
744 );
745 return Ok(None);
746 }
747
748 info!(
749 "Merging {} staging manifests for region {}",
750 staging_manifests.len(),
751 self.manifest.metadata.region_id
752 );
753
754 let mut merged_actions = Vec::new();
756 let mut latest_version = self.last_version();
757
758 for (manifest_version, raw_action_list) in staging_manifests {
760 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
761
762 for action in action_list.actions {
763 merged_actions.push(action);
764 }
765
766 latest_version = latest_version.max(manifest_version);
767 }
768
769 if merged_actions.is_empty() {
770 return Ok(None);
771 }
772
773 info!(
774 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
775 merged_actions.len(),
776 self.manifest.metadata.region_id,
777 latest_version
778 );
779
780 Ok(Some(RegionMetaActionList::new(merged_actions)))
781 }
782
783 pub(crate) fn unset_staging_manifest(&mut self) {
785 self.staging_manifest = None;
786 }
787
788 pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
790 self.staging_manifest = None;
791 self.store.clear_staging_manifests().await?;
792 info!(
793 "Cleared all staging manifests for region {}",
794 self.manifest.metadata.region_id
795 );
796 Ok(())
797 }
798}
799
800#[cfg(test)]
801impl RegionManifestManager {
802 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
803 let manifest = self.manifest();
804 assert_eq!(manifest.metadata, *expect);
805 assert_eq!(self.manifest.manifest_version, self.last_version());
806 assert_eq!(last_version, self.last_version());
807 }
808}
809
810#[cfg(test)]
811mod test {
812 use std::time::Duration;
813
814 use api::v1::SemanticType;
815 use common_datasource::compression::CompressionType;
816 use common_test_util::temp_dir::create_temp_dir;
817 use datatypes::prelude::ConcreteDataType;
818 use datatypes::schema::ColumnSchema;
819 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
820
821 use super::*;
822 use crate::manifest::action::{RegionChange, RegionEdit};
823 use crate::manifest::tests::utils::basic_region_metadata;
824 use crate::test_util::TestEnv;
825
826 #[tokio::test]
827 async fn create_manifest_manager() {
828 let metadata = Arc::new(basic_region_metadata());
829 let env = TestEnv::new().await;
830 let manager = env
831 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
832 .await
833 .unwrap()
834 .unwrap();
835
836 manager.validate_manifest(&metadata, 0);
837 }
838
839 #[tokio::test]
840 async fn open_manifest_manager() {
841 let env = TestEnv::new().await;
842 assert!(
844 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
845 .await
846 .unwrap()
847 .is_none()
848 );
849
850 let metadata = Arc::new(basic_region_metadata());
852 let mut manager = env
853 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
854 .await
855 .unwrap()
856 .unwrap();
857 manager.stop().await;
859
860 let manager = env
862 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
863 .await
864 .unwrap()
865 .unwrap();
866
867 manager.validate_manifest(&metadata, 0);
868 }
869
870 #[tokio::test]
871 async fn manifest_with_partition_expr_roundtrip() {
872 let env = TestEnv::new().await;
873 let expr_json =
874 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
875 let mut metadata = basic_region_metadata();
876 metadata.set_partition_expr(Some(expr_json.to_string()));
877 let metadata = Arc::new(metadata);
878 let mut manager = env
879 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
880 .await
881 .unwrap()
882 .unwrap();
883
884 let manifest = manager.manifest();
886 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
887
888 manager.stop().await;
889
890 let manager = env
892 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
893 .await
894 .unwrap()
895 .unwrap();
896 let manifest = manager.manifest();
897 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
898 }
899
900 #[tokio::test]
901 async fn region_change_add_column() {
902 let metadata = Arc::new(basic_region_metadata());
903 let env = TestEnv::new().await;
904 let mut manager = env
905 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
906 .await
907 .unwrap()
908 .unwrap();
909
910 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
911 new_metadata_builder.push_column_metadata(ColumnMetadata {
912 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
913 semantic_type: SemanticType::Field,
914 column_id: 252,
915 });
916 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
917
918 let action_list =
919 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
920 metadata: new_metadata.clone(),
921 sst_format: FormatType::PrimaryKey,
922 append_mode: None,
923 }));
924
925 let current_version = manager.update(action_list, false).await.unwrap();
926 assert_eq!(current_version, 1);
927 manager.validate_manifest(&new_metadata, 1);
928
929 manager.stop().await;
931 let manager = env
932 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
933 .await
934 .unwrap()
935 .unwrap();
936 manager.validate_manifest(&new_metadata, 1);
937 }
938
939 async fn manifest_dir_usage(path: &str) -> u64 {
941 let mut size = 0;
942 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
943 while let Ok(dir_entry) = read_dir.next_entry().await {
944 let Some(entry) = dir_entry else {
945 break;
946 };
947 if entry.file_type().await.unwrap().is_file() {
948 let file_name = entry.file_name().into_string().unwrap();
949 if file_name.contains(".checkpoint") || file_name.contains(".json") {
950 let file_size = entry.metadata().await.unwrap().len() as usize;
951 debug!("File: {file_name:?}, size: {file_size}");
952 size += file_size;
953 }
954 }
955 }
956 size as u64
957 }
958
959 #[tokio::test]
960 async fn test_manifest_size() {
961 let metadata = Arc::new(basic_region_metadata());
962 let data_home = create_temp_dir("");
963 let data_home_path = data_home.path().to_str().unwrap().to_string();
964 let env = TestEnv::with_data_home(data_home).await;
965
966 let manifest_dir = format!("{}/manifest", data_home_path);
967
968 let mut manager = env
969 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
970 .await
971 .unwrap()
972 .unwrap();
973
974 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
975 new_metadata_builder.push_column_metadata(ColumnMetadata {
976 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
977 semantic_type: SemanticType::Field,
978 column_id: 252,
979 });
980 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
981
982 let action_list =
983 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
984 metadata: new_metadata.clone(),
985 sst_format: FormatType::PrimaryKey,
986 append_mode: None,
987 }));
988
989 let current_version = manager.update(action_list, false).await.unwrap();
990 assert_eq!(current_version, 1);
991 manager.validate_manifest(&new_metadata, 1);
992
993 let manifest_size = manager.manifest_usage();
995 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
996
997 for _ in 0..10 {
999 manager
1000 .update(
1001 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
1002 files_to_add: vec![],
1003 files_to_remove: vec![],
1004 timestamp_ms: None,
1005 compaction_time_window: None,
1006 flushed_entry_id: None,
1007 flushed_sequence: None,
1008 committed_sequence: None,
1009 })]),
1010 false,
1011 )
1012 .await
1013 .unwrap();
1014 }
1015
1016 while manager.checkpointer.is_doing_checkpoint() {
1017 tokio::time::sleep(Duration::from_millis(10)).await;
1018 }
1019
1020 let manifest_size = manager.manifest_usage();
1022 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
1023
1024 manager.stop().await;
1027 let manager = env
1028 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
1029 .await
1030 .unwrap()
1031 .unwrap();
1032 manager.validate_manifest(&new_metadata, 11);
1033
1034 let manifest_size = manager.manifest_usage();
1036 assert_eq!(manifest_size, 1397);
1037 }
1038}