1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
25
26use crate::error::{
27 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31 RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35 ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39
40#[derive(Debug, Clone)]
42pub struct RegionManifestOptions {
43 pub manifest_dir: String,
45 pub object_store: ObjectStore,
46 pub compress_type: CompressionType,
47 pub checkpoint_distance: u64,
50 pub remove_file_options: RemoveFileOptions,
51}
52
53#[derive(Debug, Clone)]
55pub struct RemoveFileOptions {
56 pub keep_count: usize,
59 pub keep_ttl: std::time::Duration,
62}
63
64#[cfg(any(test, feature = "test"))]
65impl Default for RemoveFileOptions {
66 fn default() -> Self {
67 Self {
68 keep_count: 256,
69 keep_ttl: std::time::Duration::from_secs(3600),
70 }
71 }
72}
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
80#[derive(Debug)]
141pub struct RegionManifestManager {
142 store: ManifestObjectStore,
143 last_version: Arc<AtomicU64>,
144 checkpointer: Checkpointer,
145 manifest: Arc<RegionManifest>,
146 stopped: bool,
147}
148
149impl RegionManifestManager {
150 pub async fn new(
152 metadata: RegionMetadataRef,
153 flushed_entry_id: u64,
154 options: RegionManifestOptions,
155 total_manifest_size: Arc<AtomicU64>,
156 manifest_version: Arc<AtomicU64>,
157 ) -> Result<Self> {
158 let mut store = ManifestObjectStore::new(
160 &options.manifest_dir,
161 options.object_store.clone(),
162 options.compress_type,
163 total_manifest_size,
164 );
165
166 info!(
167 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
168 options.manifest_dir, metadata, flushed_entry_id
169 );
170
171 let version = MIN_VERSION;
172 let mut manifest_builder = RegionManifestBuilder::default();
173 manifest_builder.apply_change(
175 version,
176 RegionChange {
177 metadata: metadata.clone(),
178 },
179 );
180 let manifest = manifest_builder.try_build()?;
181 let region_id = metadata.region_id;
182
183 debug!(
184 "Build region manifest in {}, manifest: {:?}",
185 options.manifest_dir, manifest
186 );
187
188 let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
189 if flushed_entry_id > 0 {
190 actions.push(RegionMetaAction::Edit(RegionEdit {
191 files_to_add: vec![],
192 files_to_remove: vec![],
193 timestamp_ms: None,
194 compaction_time_window: None,
195 flushed_entry_id: Some(flushed_entry_id),
196 flushed_sequence: None,
197 committed_sequence: None,
198 }));
199 }
200
201 let action_list = RegionMetaActionList::new(actions);
203
204 store.save(version, &action_list.encode()?, false).await?;
207
208 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
209 manifest_version.store(version, Ordering::Relaxed);
210 Ok(Self {
211 store,
212 last_version: manifest_version,
213 checkpointer,
214 manifest: Arc::new(manifest),
215 stopped: false,
216 })
217 }
218
219 pub async fn open(
223 options: RegionManifestOptions,
224 total_manifest_size: Arc<AtomicU64>,
225 manifest_version: Arc<AtomicU64>,
226 ) -> Result<Option<Self>> {
227 let _t = MANIFEST_OP_ELAPSED
228 .with_label_values(&["open"])
229 .start_timer();
230
231 let mut store = ManifestObjectStore::new(
233 &options.manifest_dir,
234 options.object_store.clone(),
235 options.compress_type,
236 total_manifest_size,
237 );
238
239 let mut version = MIN_VERSION;
243 let checkpoint = Self::last_checkpoint(&mut store).await?;
244 let last_checkpoint_version = checkpoint
245 .as_ref()
246 .map(|(checkpoint, _)| checkpoint.last_version)
247 .unwrap_or(MIN_VERSION);
248 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
249 info!(
250 "Recover region manifest {} from checkpoint version {}",
251 options.manifest_dir, checkpoint.last_version
252 );
253 version = version.max(checkpoint.last_version + 1);
254 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
255 } else {
256 info!(
257 "Checkpoint not found in {}, build manifest from scratch",
258 options.manifest_dir
259 );
260 RegionManifestBuilder::default()
261 };
262
263 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
265
266 for (manifest_version, raw_action_list) in manifests {
267 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
268 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
270 for action in action_list.actions {
271 match action {
272 RegionMetaAction::Change(action) => {
273 manifest_builder.apply_change(manifest_version, action);
274 }
275 RegionMetaAction::Edit(action) => {
276 manifest_builder.apply_edit(manifest_version, action);
277 }
278 RegionMetaAction::Remove(_) => {
279 debug!(
280 "Unhandled action in {}, action: {:?}",
281 options.manifest_dir, action
282 );
283 }
284 RegionMetaAction::Truncate(action) => {
285 manifest_builder.apply_truncate(manifest_version, action);
286 }
287 }
288 }
289 }
290
291 if !manifest_builder.contains_metadata() {
293 debug!("No region manifest in {}", options.manifest_dir);
294 return Ok(None);
295 }
296
297 let manifest = manifest_builder.try_build()?;
298 debug!(
299 "Recovered region manifest from {}, manifest: {:?}",
300 options.manifest_dir, manifest
301 );
302 let version = manifest.manifest_version;
303
304 let checkpointer = Checkpointer::new(
305 manifest.metadata.region_id,
306 options,
307 store.clone(),
308 last_checkpoint_version,
309 );
310 manifest_version.store(version, Ordering::Relaxed);
311 Ok(Some(Self {
312 store,
313 last_version: manifest_version,
314 checkpointer,
315 manifest: Arc::new(manifest),
316 stopped: false,
317 }))
318 }
319
320 pub async fn stop(&mut self) {
322 self.stopped = true;
323 }
324
325 pub async fn install_manifest_to(
331 &mut self,
332 target_version: ManifestVersion,
333 ) -> Result<ManifestVersion> {
334 let _t = MANIFEST_OP_ELAPSED
335 .with_label_values(&["install_manifest_to"])
336 .start_timer();
337
338 let last_version = self.last_version();
339 if last_version >= target_version {
341 debug!(
342 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
343 target_version, last_version, self.manifest.metadata.region_id
344 );
345 return Ok(last_version);
346 }
347
348 ensure!(
349 !self.stopped,
350 RegionStoppedSnafu {
351 region_id: self.manifest.metadata.region_id,
352 }
353 );
354
355 let region_id = self.manifest.metadata.region_id;
356 let mut manifests = self
358 .store
359 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
361 .await?;
362
363 if manifests.is_empty() {
370 info!(
371 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
372 last_version, self.manifest.metadata.region_id
373 );
374 let last_version = self.install_last_checkpoint().await?;
375 if last_version >= target_version {
377 return Ok(last_version);
378 }
379
380 manifests = self
382 .store
383 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
385 .await?;
386 }
387
388 if manifests.is_empty() {
389 return NoManifestsSnafu {
390 region_id: self.manifest.metadata.region_id,
391 start_version: last_version + 1,
392 end_version: target_version + 1,
393 last_version,
394 }
395 .fail();
396 }
397
398 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
399 let mut manifest_builder =
400 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
401
402 for (manifest_version, raw_action_list) in manifests {
403 self.store
404 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
405 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
406 for action in action_list.actions {
407 match action {
408 RegionMetaAction::Change(action) => {
409 manifest_builder.apply_change(manifest_version, action);
410 }
411 RegionMetaAction::Edit(action) => {
412 manifest_builder.apply_edit(manifest_version, action);
413 }
414 RegionMetaAction::Remove(_) => {
415 debug!(
416 "Unhandled action for region {}, action: {:?}",
417 self.manifest.metadata.region_id, action
418 );
419 }
420 RegionMetaAction::Truncate(action) => {
421 manifest_builder.apply_truncate(manifest_version, action);
422 }
423 }
424 }
425 }
426
427 let new_manifest = manifest_builder.try_build()?;
428 ensure!(
429 new_manifest.manifest_version >= target_version,
430 InstallManifestToSnafu {
431 region_id: self.manifest.metadata.region_id,
432 target_version,
433 available_version: new_manifest.manifest_version,
434 last_version,
435 }
436 );
437
438 let version = self.last_version();
439 self.manifest = Arc::new(new_manifest);
440 let last_version = self.set_version(self.manifest.manifest_version);
441 info!(
442 "Install manifest changes from {} to {}, region: {}",
443 version, last_version, self.manifest.metadata.region_id
444 );
445
446 Ok(last_version)
447 }
448
449 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
451 let last_version = self.last_version();
452 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
453 else {
454 return NoCheckpointSnafu {
455 region_id: self.manifest.metadata.region_id,
456 last_version,
457 }
458 .fail();
459 };
460 self.store.reset_manifest_size();
461 self.store
462 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
463 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
464 let manifest = builder.try_build()?;
465 let last_version = self.set_version(manifest.manifest_version);
466 self.manifest = Arc::new(manifest);
467 info!(
468 "Installed region manifest from checkpoint: {}, region: {}",
469 checkpoint.last_version, self.manifest.metadata.region_id
470 );
471
472 Ok(last_version)
473 }
474
475 pub async fn update(
477 &mut self,
478 action_list: RegionMetaActionList,
479 region_state: RegionRoleState,
480 ) -> Result<ManifestVersion> {
481 let _t = MANIFEST_OP_ELAPSED
482 .with_label_values(&["update"])
483 .start_timer();
484
485 ensure!(
486 !self.stopped,
487 RegionStoppedSnafu {
488 region_id: self.manifest.metadata.region_id,
489 }
490 );
491
492 let version = self.increase_version();
493 let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
494 self.store
495 .save(version, &action_list.encode()?, is_staging)
496 .await?;
497
498 let mut manifest_builder =
499 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
500 for action in action_list.actions {
501 match action {
502 RegionMetaAction::Change(action) => {
503 manifest_builder.apply_change(version, action);
504 }
505 RegionMetaAction::Edit(action) => {
506 manifest_builder.apply_edit(version, action);
507 }
508 RegionMetaAction::Remove(_) => {
509 debug!(
510 "Unhandled action for region {}, action: {:?}",
511 self.manifest.metadata.region_id, action
512 );
513 }
514 RegionMetaAction::Truncate(action) => {
515 manifest_builder.apply_truncate(version, action);
516 }
517 }
518 }
519 let new_manifest = manifest_builder.try_build()?;
520 let updated_manifest = self
521 .checkpointer
522 .update_manifest_removed_files(new_manifest)?;
523 self.manifest = Arc::new(updated_manifest);
524
525 self.checkpointer
526 .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
527
528 Ok(version)
529 }
530
531 pub fn manifest(&self) -> Arc<RegionManifest> {
533 self.manifest.clone()
534 }
535
536 pub fn manifest_usage(&self) -> u64 {
538 self.store.total_manifest_size()
539 }
540
541 pub async fn has_update(&self) -> Result<bool> {
547 let last_version = self.last_version();
548
549 let streamer =
550 self.store
551 .manifest_lister(false)
552 .await?
553 .context(error::EmptyManifestDirSnafu {
554 manifest_dir: self.store.manifest_dir(),
555 })?;
556
557 let need_update = streamer
558 .try_any(|entry| async move {
559 let file_name = entry.name();
560 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
561 let version = file_version(file_name);
562 if version > last_version {
563 return true;
564 }
565 }
566 false
567 })
568 .await
569 .context(error::OpenDalSnafu)?;
570
571 Ok(need_update)
572 }
573
574 fn increase_version(&mut self) -> ManifestVersion {
576 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
577 previous + 1
578 }
579
580 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
582 self.last_version.store(version, Ordering::Relaxed);
583 version
584 }
585
586 fn last_version(&self) -> ManifestVersion {
587 self.last_version.load(Ordering::Relaxed)
588 }
589
590 pub(crate) async fn last_checkpoint(
595 store: &mut ManifestObjectStore,
596 ) -> Result<Option<(RegionCheckpoint, u64)>> {
597 let last_checkpoint = store.load_last_checkpoint().await?;
598
599 if let Some((_, bytes)) = last_checkpoint {
600 let checkpoint = RegionCheckpoint::decode(&bytes)?;
601 Ok(Some((checkpoint, bytes.len() as u64)))
602 } else {
603 Ok(None)
604 }
605 }
606
607 pub fn store(&self) -> ManifestObjectStore {
608 self.store.clone()
609 }
610
611 #[cfg(test)]
612 pub(crate) fn checkpointer(&self) -> &Checkpointer {
613 &self.checkpointer
614 }
615
616 pub(crate) async fn merge_staged_actions(
619 &mut self,
620 region_state: RegionRoleState,
621 ) -> Result<Option<RegionMetaActionList>> {
622 if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
624 return Ok(None);
625 }
626
627 let staging_manifests = self.store.fetch_staging_manifests().await?;
629
630 if staging_manifests.is_empty() {
631 info!(
632 "No staging manifests to merge for region {}",
633 self.manifest.metadata.region_id
634 );
635 return Ok(None);
636 }
637
638 info!(
639 "Merging {} staging manifests for region {}",
640 staging_manifests.len(),
641 self.manifest.metadata.region_id
642 );
643
644 let mut merged_actions = Vec::new();
646 let mut latest_version = self.last_version();
647
648 for (manifest_version, raw_action_list) in staging_manifests {
650 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
651
652 for action in action_list.actions {
653 merged_actions.push(action);
654 }
655
656 latest_version = latest_version.max(manifest_version);
657 }
658
659 if merged_actions.is_empty() {
660 return Ok(None);
661 }
662
663 info!(
664 "Successfully merged {} actions from staging manifests for region {}, latest version: {}",
665 merged_actions.len(),
666 self.manifest.metadata.region_id,
667 latest_version
668 );
669
670 Ok(Some(RegionMetaActionList::new(merged_actions)))
671 }
672}
673
674#[cfg(test)]
675impl RegionManifestManager {
676 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
677 let manifest = self.manifest();
678 assert_eq!(manifest.metadata, *expect);
679 assert_eq!(self.manifest.manifest_version, self.last_version());
680 assert_eq!(last_version, self.last_version());
681 }
682}
683
684#[cfg(test)]
685mod test {
686 use std::time::Duration;
687
688 use api::v1::SemanticType;
689 use common_datasource::compression::CompressionType;
690 use common_test_util::temp_dir::create_temp_dir;
691 use datatypes::prelude::ConcreteDataType;
692 use datatypes::schema::ColumnSchema;
693 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
694
695 use super::*;
696 use crate::manifest::action::{RegionChange, RegionEdit};
697 use crate::manifest::tests::utils::basic_region_metadata;
698 use crate::test_util::TestEnv;
699
700 #[tokio::test]
701 async fn create_manifest_manager() {
702 let metadata = Arc::new(basic_region_metadata());
703 let env = TestEnv::new().await;
704 let manager = env
705 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
706 .await
707 .unwrap()
708 .unwrap();
709
710 manager.validate_manifest(&metadata, 0);
711 }
712
713 #[tokio::test]
714 async fn open_manifest_manager() {
715 let env = TestEnv::new().await;
716 assert!(
718 env.create_manifest_manager(CompressionType::Uncompressed, 10, None)
719 .await
720 .unwrap()
721 .is_none()
722 );
723
724 let metadata = Arc::new(basic_region_metadata());
726 let mut manager = env
727 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
728 .await
729 .unwrap()
730 .unwrap();
731 manager.stop().await;
733
734 let manager = env
736 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
737 .await
738 .unwrap()
739 .unwrap();
740
741 manager.validate_manifest(&metadata, 0);
742 }
743
744 #[tokio::test]
745 async fn manifest_with_partition_expr_roundtrip() {
746 let env = TestEnv::new().await;
747 let expr_json =
748 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
749 let mut metadata = basic_region_metadata();
750 metadata.partition_expr = Some(expr_json.to_string());
751 let metadata = Arc::new(metadata);
752 let mut manager = env
753 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
754 .await
755 .unwrap()
756 .unwrap();
757
758 let manifest = manager.manifest();
760 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
761
762 manager.stop().await;
763
764 let manager = env
766 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
767 .await
768 .unwrap()
769 .unwrap();
770 let manifest = manager.manifest();
771 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
772 }
773
774 #[tokio::test]
775 async fn region_change_add_column() {
776 let metadata = Arc::new(basic_region_metadata());
777 let env = TestEnv::new().await;
778 let mut manager = env
779 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
780 .await
781 .unwrap()
782 .unwrap();
783
784 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
785 new_metadata_builder.push_column_metadata(ColumnMetadata {
786 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
787 semantic_type: SemanticType::Field,
788 column_id: 252,
789 });
790 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
791
792 let action_list =
793 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
794 metadata: new_metadata.clone(),
795 }));
796
797 let current_version = manager
798 .update(
799 action_list,
800 RegionRoleState::Leader(RegionLeaderState::Writable),
801 )
802 .await
803 .unwrap();
804 assert_eq!(current_version, 1);
805 manager.validate_manifest(&new_metadata, 1);
806
807 manager.stop().await;
809 let manager = env
810 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
811 .await
812 .unwrap()
813 .unwrap();
814 manager.validate_manifest(&new_metadata, 1);
815 }
816
817 async fn manifest_dir_usage(path: &str) -> u64 {
819 let mut size = 0;
820 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
821 while let Ok(dir_entry) = read_dir.next_entry().await {
822 let Some(entry) = dir_entry else {
823 break;
824 };
825 if entry.file_type().await.unwrap().is_file() {
826 let file_name = entry.file_name().into_string().unwrap();
827 if file_name.contains(".checkpoint") || file_name.contains(".json") {
828 let file_size = entry.metadata().await.unwrap().len() as usize;
829 debug!("File: {file_name:?}, size: {file_size}");
830 size += file_size;
831 }
832 }
833 }
834 size as u64
835 }
836
837 #[tokio::test]
838 async fn test_manifest_size() {
839 let metadata = Arc::new(basic_region_metadata());
840 let data_home = create_temp_dir("");
841 let data_home_path = data_home.path().to_str().unwrap().to_string();
842 let env = TestEnv::with_data_home(data_home).await;
843
844 let manifest_dir = format!("{}/manifest", data_home_path);
845
846 let mut manager = env
847 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
848 .await
849 .unwrap()
850 .unwrap();
851
852 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
853 new_metadata_builder.push_column_metadata(ColumnMetadata {
854 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
855 semantic_type: SemanticType::Field,
856 column_id: 252,
857 });
858 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
859
860 let action_list =
861 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
862 metadata: new_metadata.clone(),
863 }));
864
865 let current_version = manager
866 .update(
867 action_list,
868 RegionRoleState::Leader(RegionLeaderState::Writable),
869 )
870 .await
871 .unwrap();
872 assert_eq!(current_version, 1);
873 manager.validate_manifest(&new_metadata, 1);
874
875 let manifest_size = manager.manifest_usage();
877 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
878
879 for _ in 0..10 {
881 manager
882 .update(
883 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
884 files_to_add: vec![],
885 files_to_remove: vec![],
886 timestamp_ms: None,
887 compaction_time_window: None,
888 flushed_entry_id: None,
889 flushed_sequence: None,
890 committed_sequence: None,
891 })]),
892 RegionRoleState::Leader(RegionLeaderState::Writable),
893 )
894 .await
895 .unwrap();
896 }
897
898 while manager.checkpointer.is_doing_checkpoint() {
899 tokio::time::sleep(Duration::from_millis(10)).await;
900 }
901
902 let manifest_size = manager.manifest_usage();
904 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
905
906 manager.stop().await;
909 let manager = env
910 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
911 .await
912 .unwrap()
913 .unwrap();
914 manager.validate_manifest(&new_metadata, 11);
915
916 let manifest_size = manager.manifest_usage();
918 assert_eq!(manifest_size, 1721);
919 }
920}