1use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{ensure, OptionExt, ResultExt};
23use store_api::metadata::RegionMetadataRef;
24use store_api::{ManifestVersion, MAX_VERSION, MIN_VERSION};
25
26use crate::error::{
27 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30 RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
31 RegionMetaAction, RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35 file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38use crate::region::{RegionLeaderState, RegionRoleState};
39
40#[derive(Debug, Clone)]
42pub struct RegionManifestOptions {
43 pub manifest_dir: String,
45 pub object_store: ObjectStore,
46 pub compress_type: CompressionType,
47 pub checkpoint_distance: u64,
50 pub remove_file_options: RemoveFileOptions,
51}
52
53#[derive(Debug, Clone)]
55pub struct RemoveFileOptions {
56 pub keep_count: usize,
59 pub keep_ttl: std::time::Duration,
62}
63
64#[cfg(any(test, feature = "test"))]
65impl Default for RemoveFileOptions {
66 fn default() -> Self {
67 Self {
68 keep_count: 256,
69 keep_ttl: std::time::Duration::from_secs(3600),
70 }
71 }
72}
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
80#[derive(Debug)]
141pub struct RegionManifestManager {
142 store: ManifestObjectStore,
143 last_version: Arc<AtomicU64>,
144 checkpointer: Checkpointer,
145 manifest: Arc<RegionManifest>,
146 stopped: bool,
147}
148
149impl RegionManifestManager {
150 pub async fn new(
152 metadata: RegionMetadataRef,
153 flushed_entry_id: u64,
154 options: RegionManifestOptions,
155 total_manifest_size: Arc<AtomicU64>,
156 manifest_version: Arc<AtomicU64>,
157 ) -> Result<Self> {
158 let mut store = ManifestObjectStore::new(
160 &options.manifest_dir,
161 options.object_store.clone(),
162 options.compress_type,
163 total_manifest_size,
164 );
165
166 info!(
167 "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
168 options.manifest_dir, metadata, flushed_entry_id
169 );
170
171 let version = MIN_VERSION;
172 let mut manifest_builder = RegionManifestBuilder::default();
173 manifest_builder.apply_change(
175 version,
176 RegionChange {
177 metadata: metadata.clone(),
178 },
179 );
180 let manifest = manifest_builder.try_build()?;
181 let region_id = metadata.region_id;
182
183 debug!(
184 "Build region manifest in {}, manifest: {:?}",
185 options.manifest_dir, manifest
186 );
187
188 let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
189 if flushed_entry_id > 0 {
190 actions.push(RegionMetaAction::Edit(RegionEdit {
191 files_to_add: vec![],
192 files_to_remove: vec![],
193 timestamp_ms: None,
194 compaction_time_window: None,
195 flushed_entry_id: Some(flushed_entry_id),
196 flushed_sequence: None,
197 }));
198 }
199
200 let action_list = RegionMetaActionList::new(actions);
202
203 store.save(version, &action_list.encode()?, false).await?;
206
207 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
208 manifest_version.store(version, Ordering::Relaxed);
209 Ok(Self {
210 store,
211 last_version: manifest_version,
212 checkpointer,
213 manifest: Arc::new(manifest),
214 stopped: false,
215 })
216 }
217
218 pub async fn open(
222 options: RegionManifestOptions,
223 total_manifest_size: Arc<AtomicU64>,
224 manifest_version: Arc<AtomicU64>,
225 ) -> Result<Option<Self>> {
226 let _t = MANIFEST_OP_ELAPSED
227 .with_label_values(&["open"])
228 .start_timer();
229
230 let mut store = ManifestObjectStore::new(
232 &options.manifest_dir,
233 options.object_store.clone(),
234 options.compress_type,
235 total_manifest_size,
236 );
237
238 let mut version = MIN_VERSION;
242 let checkpoint = Self::last_checkpoint(&mut store).await?;
243 let last_checkpoint_version = checkpoint
244 .as_ref()
245 .map(|(checkpoint, _)| checkpoint.last_version)
246 .unwrap_or(MIN_VERSION);
247 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
248 info!(
249 "Recover region manifest {} from checkpoint version {}",
250 options.manifest_dir, checkpoint.last_version
251 );
252 version = version.max(checkpoint.last_version + 1);
253 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
254 } else {
255 info!(
256 "Checkpoint not found in {}, build manifest from scratch",
257 options.manifest_dir
258 );
259 RegionManifestBuilder::default()
260 };
261
262 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
264
265 for (manifest_version, raw_action_list) in manifests {
266 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
267 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
269 for action in action_list.actions {
270 match action {
271 RegionMetaAction::Change(action) => {
272 manifest_builder.apply_change(manifest_version, action);
273 }
274 RegionMetaAction::Edit(action) => {
275 manifest_builder.apply_edit(manifest_version, action);
276 }
277 RegionMetaAction::Remove(_) => {
278 debug!(
279 "Unhandled action in {}, action: {:?}",
280 options.manifest_dir, action
281 );
282 }
283 RegionMetaAction::Truncate(action) => {
284 manifest_builder.apply_truncate(manifest_version, action);
285 }
286 }
287 }
288 }
289
290 if !manifest_builder.contains_metadata() {
292 debug!("No region manifest in {}", options.manifest_dir);
293 return Ok(None);
294 }
295
296 let manifest = manifest_builder.try_build()?;
297 debug!(
298 "Recovered region manifest from {}, manifest: {:?}",
299 options.manifest_dir, manifest
300 );
301 let version = manifest.manifest_version;
302
303 let checkpointer = Checkpointer::new(
304 manifest.metadata.region_id,
305 options,
306 store.clone(),
307 last_checkpoint_version,
308 );
309 manifest_version.store(version, Ordering::Relaxed);
310 Ok(Some(Self {
311 store,
312 last_version: manifest_version,
313 checkpointer,
314 manifest: Arc::new(manifest),
315 stopped: false,
316 }))
317 }
318
319 pub async fn stop(&mut self) {
321 self.stopped = true;
322 }
323
324 pub async fn install_manifest_to(
330 &mut self,
331 target_version: ManifestVersion,
332 ) -> Result<ManifestVersion> {
333 let _t = MANIFEST_OP_ELAPSED
334 .with_label_values(&["install_manifest_to"])
335 .start_timer();
336
337 let last_version = self.last_version();
338 if last_version >= target_version {
340 debug!(
341 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
342 target_version, last_version, self.manifest.metadata.region_id
343 );
344 return Ok(last_version);
345 }
346
347 ensure!(
348 !self.stopped,
349 RegionStoppedSnafu {
350 region_id: self.manifest.metadata.region_id,
351 }
352 );
353
354 let region_id = self.manifest.metadata.region_id;
355 let mut manifests = self
357 .store
358 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
360 .await?;
361
362 if manifests.is_empty() {
369 info!(
370 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
371 last_version, self.manifest.metadata.region_id
372 );
373 let last_version = self.install_last_checkpoint().await?;
374 if last_version >= target_version {
376 return Ok(last_version);
377 }
378
379 manifests = self
381 .store
382 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
384 .await?;
385 }
386
387 if manifests.is_empty() {
388 return NoManifestsSnafu {
389 region_id: self.manifest.metadata.region_id,
390 start_version: last_version + 1,
391 end_version: target_version + 1,
392 last_version,
393 }
394 .fail();
395 }
396
397 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
398 let mut manifest_builder =
399 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
400
401 for (manifest_version, raw_action_list) in manifests {
402 self.store
403 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
404 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
405 for action in action_list.actions {
406 match action {
407 RegionMetaAction::Change(action) => {
408 manifest_builder.apply_change(manifest_version, action);
409 }
410 RegionMetaAction::Edit(action) => {
411 manifest_builder.apply_edit(manifest_version, action);
412 }
413 RegionMetaAction::Remove(_) => {
414 debug!(
415 "Unhandled action for region {}, action: {:?}",
416 self.manifest.metadata.region_id, action
417 );
418 }
419 RegionMetaAction::Truncate(action) => {
420 manifest_builder.apply_truncate(manifest_version, action);
421 }
422 }
423 }
424 }
425
426 let new_manifest = manifest_builder.try_build()?;
427 ensure!(
428 new_manifest.manifest_version >= target_version,
429 InstallManifestToSnafu {
430 region_id: self.manifest.metadata.region_id,
431 target_version,
432 available_version: new_manifest.manifest_version,
433 last_version,
434 }
435 );
436
437 let version = self.last_version();
438 self.manifest = Arc::new(new_manifest);
439 let last_version = self.set_version(self.manifest.manifest_version);
440 info!(
441 "Install manifest changes from {} to {}, region: {}",
442 version, last_version, self.manifest.metadata.region_id
443 );
444
445 Ok(last_version)
446 }
447
448 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
450 let last_version = self.last_version();
451 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
452 else {
453 return NoCheckpointSnafu {
454 region_id: self.manifest.metadata.region_id,
455 last_version,
456 }
457 .fail();
458 };
459 self.store.reset_manifest_size();
460 self.store
461 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
462 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
463 let manifest = builder.try_build()?;
464 let last_version = self.set_version(manifest.manifest_version);
465 self.manifest = Arc::new(manifest);
466 info!(
467 "Installed region manifest from checkpoint: {}, region: {}",
468 checkpoint.last_version, self.manifest.metadata.region_id
469 );
470
471 Ok(last_version)
472 }
473
474 pub async fn update(
476 &mut self,
477 action_list: RegionMetaActionList,
478 region_state: RegionRoleState,
479 ) -> Result<ManifestVersion> {
480 let _t = MANIFEST_OP_ELAPSED
481 .with_label_values(&["update"])
482 .start_timer();
483
484 ensure!(
485 !self.stopped,
486 RegionStoppedSnafu {
487 region_id: self.manifest.metadata.region_id,
488 }
489 );
490
491 let version = self.increase_version();
492 let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
493 self.store
494 .save(version, &action_list.encode()?, is_staging)
495 .await?;
496
497 let mut manifest_builder =
498 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
499 for action in action_list.actions {
500 match action {
501 RegionMetaAction::Change(action) => {
502 manifest_builder.apply_change(version, action);
503 }
504 RegionMetaAction::Edit(action) => {
505 manifest_builder.apply_edit(version, action);
506 }
507 RegionMetaAction::Remove(_) => {
508 debug!(
509 "Unhandled action for region {}, action: {:?}",
510 self.manifest.metadata.region_id, action
511 );
512 }
513 RegionMetaAction::Truncate(action) => {
514 manifest_builder.apply_truncate(version, action);
515 }
516 }
517 }
518 let new_manifest = manifest_builder.try_build()?;
519 let updated_manifest = self
520 .checkpointer
521 .update_manifest_removed_files(new_manifest)?;
522 self.manifest = Arc::new(updated_manifest);
523
524 self.checkpointer
525 .maybe_do_checkpoint(self.manifest.as_ref(), region_state);
526
527 Ok(version)
528 }
529
530 pub fn manifest(&self) -> Arc<RegionManifest> {
532 self.manifest.clone()
533 }
534
535 pub fn manifest_usage(&self) -> u64 {
537 self.store.total_manifest_size()
538 }
539
540 pub async fn has_update(&self) -> Result<bool> {
546 let last_version = self.last_version();
547
548 let streamer =
549 self.store
550 .manifest_lister()
551 .await?
552 .context(error::EmptyManifestDirSnafu {
553 manifest_dir: self.store.manifest_dir(),
554 })?;
555
556 let need_update = streamer
557 .try_any(|entry| async move {
558 let file_name = entry.name();
559 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
560 let version = file_version(file_name);
561 if version > last_version {
562 return true;
563 }
564 }
565 false
566 })
567 .await
568 .context(error::OpenDalSnafu)?;
569
570 Ok(need_update)
571 }
572
573 fn increase_version(&mut self) -> ManifestVersion {
575 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
576 previous + 1
577 }
578
579 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
581 self.last_version.store(version, Ordering::Relaxed);
582 version
583 }
584
585 fn last_version(&self) -> ManifestVersion {
586 self.last_version.load(Ordering::Relaxed)
587 }
588
589 pub(crate) async fn last_checkpoint(
594 store: &mut ManifestObjectStore,
595 ) -> Result<Option<(RegionCheckpoint, u64)>> {
596 let last_checkpoint = store.load_last_checkpoint().await?;
597
598 if let Some((_, bytes)) = last_checkpoint {
599 let checkpoint = RegionCheckpoint::decode(&bytes)?;
600 Ok(Some((checkpoint, bytes.len() as u64)))
601 } else {
602 Ok(None)
603 }
604 }
605
606 pub fn store(&self) -> ManifestObjectStore {
607 self.store.clone()
608 }
609
610 #[cfg(test)]
611 pub(crate) fn checkpointer(&self) -> &Checkpointer {
612 &self.checkpointer
613 }
614}
615
616#[cfg(test)]
617impl RegionManifestManager {
618 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
619 let manifest = self.manifest();
620 assert_eq!(manifest.metadata, *expect);
621 assert_eq!(self.manifest.manifest_version, self.last_version());
622 assert_eq!(last_version, self.last_version());
623 }
624}
625
626#[cfg(test)]
627mod test {
628 use std::time::Duration;
629
630 use api::v1::SemanticType;
631 use common_datasource::compression::CompressionType;
632 use common_test_util::temp_dir::create_temp_dir;
633 use datatypes::prelude::ConcreteDataType;
634 use datatypes::schema::ColumnSchema;
635 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
636
637 use super::*;
638 use crate::manifest::action::{RegionChange, RegionEdit};
639 use crate::manifest::tests::utils::basic_region_metadata;
640 use crate::test_util::TestEnv;
641
642 #[tokio::test]
643 async fn create_manifest_manager() {
644 let metadata = Arc::new(basic_region_metadata());
645 let env = TestEnv::new().await;
646 let manager = env
647 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
648 .await
649 .unwrap()
650 .unwrap();
651
652 manager.validate_manifest(&metadata, 0);
653 }
654
655 #[tokio::test]
656 async fn open_manifest_manager() {
657 let env = TestEnv::new().await;
658 assert!(env
660 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
661 .await
662 .unwrap()
663 .is_none());
664
665 let metadata = Arc::new(basic_region_metadata());
667 let mut manager = env
668 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
669 .await
670 .unwrap()
671 .unwrap();
672 manager.stop().await;
674
675 let manager = env
677 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
678 .await
679 .unwrap()
680 .unwrap();
681
682 manager.validate_manifest(&metadata, 0);
683 }
684
685 #[tokio::test]
686 async fn manifest_with_partition_expr_roundtrip() {
687 let env = TestEnv::new().await;
688 let expr_json =
689 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
690 let mut metadata = basic_region_metadata();
691 metadata.partition_expr = Some(expr_json.to_string());
692 let metadata = Arc::new(metadata);
693 let mut manager = env
694 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
695 .await
696 .unwrap()
697 .unwrap();
698
699 let manifest = manager.manifest();
701 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
702
703 manager.stop().await;
704
705 let manager = env
707 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
708 .await
709 .unwrap()
710 .unwrap();
711 let manifest = manager.manifest();
712 assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
713 }
714
715 #[tokio::test]
716 async fn region_change_add_column() {
717 let metadata = Arc::new(basic_region_metadata());
718 let env = TestEnv::new().await;
719 let mut manager = env
720 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
721 .await
722 .unwrap()
723 .unwrap();
724
725 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
726 new_metadata_builder.push_column_metadata(ColumnMetadata {
727 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
728 semantic_type: SemanticType::Field,
729 column_id: 252,
730 });
731 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
732
733 let action_list =
734 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
735 metadata: new_metadata.clone(),
736 }));
737
738 let current_version = manager
739 .update(
740 action_list,
741 RegionRoleState::Leader(RegionLeaderState::Writable),
742 )
743 .await
744 .unwrap();
745 assert_eq!(current_version, 1);
746 manager.validate_manifest(&new_metadata, 1);
747
748 manager.stop().await;
750 let manager = env
751 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
752 .await
753 .unwrap()
754 .unwrap();
755 manager.validate_manifest(&new_metadata, 1);
756 }
757
758 async fn manifest_dir_usage(path: &str) -> u64 {
760 let mut size = 0;
761 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
762 while let Ok(dir_entry) = read_dir.next_entry().await {
763 let Some(entry) = dir_entry else {
764 break;
765 };
766 if entry.file_type().await.unwrap().is_file() {
767 let file_name = entry.file_name().into_string().unwrap();
768 if file_name.contains(".checkpoint") || file_name.contains(".json") {
769 let file_size = entry.metadata().await.unwrap().len() as usize;
770 debug!("File: {file_name:?}, size: {file_size}");
771 size += file_size;
772 }
773 }
774 }
775 size as u64
776 }
777
778 #[tokio::test]
779 async fn test_manifest_size() {
780 let metadata = Arc::new(basic_region_metadata());
781 let data_home = create_temp_dir("");
782 let data_home_path = data_home.path().to_str().unwrap().to_string();
783 let env = TestEnv::with_data_home(data_home).await;
784
785 let manifest_dir = format!("{}/manifest", data_home_path);
786
787 let mut manager = env
788 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
789 .await
790 .unwrap()
791 .unwrap();
792
793 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
794 new_metadata_builder.push_column_metadata(ColumnMetadata {
795 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
796 semantic_type: SemanticType::Field,
797 column_id: 252,
798 });
799 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
800
801 let action_list =
802 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
803 metadata: new_metadata.clone(),
804 }));
805
806 let current_version = manager
807 .update(
808 action_list,
809 RegionRoleState::Leader(RegionLeaderState::Writable),
810 )
811 .await
812 .unwrap();
813 assert_eq!(current_version, 1);
814 manager.validate_manifest(&new_metadata, 1);
815
816 let manifest_size = manager.manifest_usage();
818 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
819
820 for _ in 0..10 {
822 manager
823 .update(
824 RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
825 files_to_add: vec![],
826 files_to_remove: vec![],
827 timestamp_ms: None,
828 compaction_time_window: None,
829 flushed_entry_id: None,
830 flushed_sequence: None,
831 })]),
832 RegionRoleState::Leader(RegionLeaderState::Writable),
833 )
834 .await
835 .unwrap();
836 }
837
838 while manager.checkpointer.is_doing_checkpoint() {
839 tokio::time::sleep(Duration::from_millis(10)).await;
840 }
841
842 let manifest_size = manager.manifest_usage();
844 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
845
846 manager.stop().await;
849 let manager = env
850 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
851 .await
852 .unwrap()
853 .unwrap();
854 manager.validate_manifest(&new_metadata, 11);
855
856 let manifest_size = manager.manifest_usage();
858 assert_eq!(manifest_size, 1669);
859 }
860}