1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES};
38use store_api::region_engine::{
39 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
40};
41use store_api::region_info::RegionInfoEntry;
42use store_api::region_request::{PathType, StagingPartitionDirective};
43use store_api::sst_entry::ManifestSstEntry;
44use store_api::storage::{FileId, RegionId, SequenceNumber};
45use tokio::sync::RwLockWriteGuard;
46pub use utils::*;
47
48use crate::access_layer::AccessLayerRef;
49use crate::engine::region_hook::{PendingManifestHook, RegionHookRef};
50use crate::error::{
51 FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
52 RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
53};
54use crate::manifest::action::{
55 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
56};
57use crate::manifest::manager::RegionManifestManager;
58use crate::region::version::{VersionControlRef, VersionRef};
59use crate::request::{OnFailure, OptionOutputTx};
60use crate::sst::file::FileMeta;
61use crate::sst::file_purger::FilePurgerRef;
62use crate::sst::location::{index_file_path, sst_file_path};
63use crate::time_provider::TimeProviderRef;
64
65const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
67
68#[derive(Debug)]
70pub struct RegionUsage {
71 pub region_id: RegionId,
72 pub wal_usage: u64,
73 pub sst_usage: u64,
74 pub manifest_usage: u64,
75}
76
77impl RegionUsage {
78 pub fn disk_usage(&self) -> u64 {
79 self.wal_usage + self.sst_usage + self.manifest_usage
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum RegionLeaderState {
85 Writable,
87 Staging,
89 EnteringStaging,
91 Altering,
93 Dropping,
95 Truncating,
97 Editing,
99 Downgrading,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum RegionRoleState {
105 Leader(RegionLeaderState),
106 Follower,
107}
108
109impl RegionRoleState {
110 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
112 match self {
113 RegionRoleState::Leader(leader_state) => Some(leader_state),
114 RegionRoleState::Follower => None,
115 }
116 }
117
118 pub(crate) fn as_str(&self) -> &'static str {
119 match self {
120 RegionRoleState::Follower => "Follower",
121 RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)",
122 RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)",
123 RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
124 "Leader(EnteringStaging)"
125 }
126 RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)",
127 RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)",
128 RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)",
129 RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)",
130 RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)",
131 }
132 }
133}
134
135#[derive(Debug)]
141pub struct MitoRegion {
142 pub(crate) region_id: RegionId,
147
148 pub(crate) version_control: VersionControlRef,
152 pub(crate) access_layer: AccessLayerRef,
154 pub(crate) manifest_ctx: ManifestContextRef,
156 pub(crate) file_purger: FilePurgerRef,
158 pub(crate) provider: Provider,
160 last_flush_millis: AtomicI64,
162 last_schedule_compaction_millis: AtomicI64,
164 time_provider: TimeProviderRef,
166 pub(crate) topic_latest_entry_id: AtomicU64,
176 pub(crate) written_bytes: Arc<AtomicU64>,
178 stats: ManifestStats,
180}
181
182pub type MitoRegionRef = Arc<MitoRegion>;
183
184#[derive(Debug, Clone)]
185pub(crate) struct StagingPartitionInfo {
186 pub(crate) partition_directive: StagingPartitionDirective,
187 pub(crate) partition_rule_version: u64,
188}
189
190impl StagingPartitionInfo {
191 pub(crate) fn partition_expr(&self) -> Option<&str> {
193 self.partition_directive.partition_expr()
194 }
195
196 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
198 let partition_rule_version = match &partition_directive {
199 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
200 partition_expr_version(Some(expr))
201 }
202 StagingPartitionDirective::RejectAllWrites => 0,
203 };
204 Self {
205 partition_directive,
206 partition_rule_version,
207 }
208 }
209}
210
211impl MitoRegion {
212 fn remove_region_metrics(&self) {
213 let region_id = self.region_id.as_u64().to_string();
214 let labels = &[region_id.as_str()];
215 let _ = REGION_QUERY_CPU_TIME.remove_label_values(labels);
216 let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(labels);
217 }
218
219 pub(crate) async fn stop(&self) {
221 self.manifest_ctx
222 .manifest_manager
223 .write()
224 .await
225 .stop()
226 .await;
227
228 info!(
229 "Stopped region manifest manager, region_id: {}",
230 self.region_id
231 );
232 }
233
234 pub fn metadata(&self) -> RegionMetadataRef {
236 let version_data = self.version_control.current();
237 version_data.version.metadata.clone()
238 }
239
240 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
242 let version_data = self.version_control.current();
243 version_data.version.metadata.primary_key_encoding
244 }
245
246 pub(crate) fn version(&self) -> VersionRef {
248 let version_data = self.version_control.current();
249 version_data.version
250 }
251
252 pub(crate) fn last_flush_millis(&self) -> i64 {
254 self.last_flush_millis.load(Ordering::Relaxed)
255 }
256
257 pub(crate) fn update_flush_millis(&self) {
259 let now = self.time_provider.current_time_millis();
260 self.last_flush_millis.store(now, Ordering::Relaxed);
261 }
262
263 pub(crate) fn last_schedule_compaction_millis(&self) -> i64 {
265 self.last_schedule_compaction_millis.load(Ordering::Relaxed)
266 }
267
268 pub(crate) fn update_schedule_compaction_millis(&self) {
270 let now = self.time_provider.current_time_millis();
271 self.last_schedule_compaction_millis
272 .store(now, Ordering::Relaxed);
273 }
274
275 pub(crate) fn table_dir(&self) -> &str {
277 self.access_layer.table_dir()
278 }
279
280 pub(crate) fn path_type(&self) -> PathType {
282 self.access_layer.path_type()
283 }
284
285 pub(crate) fn is_writable(&self) -> bool {
287 matches!(
288 self.manifest_ctx.state.load(),
289 RegionRoleState::Leader(RegionLeaderState::Writable)
290 | RegionRoleState::Leader(RegionLeaderState::Staging)
291 )
292 }
293
294 pub(crate) fn is_flushable(&self) -> bool {
296 matches!(
297 self.manifest_ctx.state.load(),
298 RegionRoleState::Leader(RegionLeaderState::Writable)
299 | RegionRoleState::Leader(RegionLeaderState::Staging)
300 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
301 )
302 }
303
304 pub(crate) fn should_abort_index(&self) -> bool {
306 matches!(
307 self.manifest_ctx.state.load(),
308 RegionRoleState::Follower
309 | RegionRoleState::Leader(RegionLeaderState::Dropping)
310 | RegionRoleState::Leader(RegionLeaderState::Truncating)
311 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
312 | RegionRoleState::Leader(RegionLeaderState::Staging)
313 )
314 }
315
316 pub(crate) fn is_downgrading(&self) -> bool {
318 matches!(
319 self.manifest_ctx.state.load(),
320 RegionRoleState::Leader(RegionLeaderState::Downgrading)
321 )
322 }
323
324 pub(crate) fn is_staging(&self) -> bool {
326 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
327 }
328
329 pub(crate) fn is_enter_staging(&self) -> bool {
331 self.manifest_ctx.state.load()
332 == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
333 }
334
335 pub fn region_id(&self) -> RegionId {
336 self.region_id
337 }
338
339 pub fn find_committed_sequence(&self) -> SequenceNumber {
340 self.version_control.committed_sequence()
341 }
342
343 pub fn flushed_sequence(&self) -> SequenceNumber {
349 self.version_control.current().version.flushed_sequence
350 }
351
352 pub fn is_follower(&self) -> bool {
354 self.manifest_ctx.state.load() == RegionRoleState::Follower
355 }
356
357 pub(crate) fn state(&self) -> RegionRoleState {
359 self.manifest_ctx.state.load()
360 }
361
362 pub(crate) fn set_role(&self, next_role: RegionRole) {
364 self.manifest_ctx.set_role(next_role, self.region_id);
365 }
366
367 pub(crate) fn region_role(&self) -> RegionRole {
368 match self.state() {
369 RegionRoleState::Follower => RegionRole::Follower,
370 RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
371 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
372 RegionRole::DowngradingLeader
373 }
374 RegionRoleState::Leader(_) => RegionRole::Leader,
375 }
376 }
377
378 pub(crate) fn set_altering(&self) -> Result<()> {
381 self.compare_exchange_state(
382 RegionLeaderState::Writable,
383 RegionRoleState::Leader(RegionLeaderState::Altering),
384 )
385 }
386
387 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
390 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
391 }
392
393 pub(crate) fn set_truncating(&self) -> Result<()> {
396 self.compare_exchange_state(
397 RegionLeaderState::Writable,
398 RegionRoleState::Leader(RegionLeaderState::Truncating),
399 )
400 }
401
402 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
405 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
406 }
407
408 pub(crate) async fn set_staging(
414 &self,
415 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
416 ) -> Result<()> {
417 manager.store().clear_staging_manifests().await?;
418
419 self.compare_exchange_state(
420 RegionLeaderState::Writable,
421 RegionRoleState::Leader(RegionLeaderState::Staging),
422 )
423 }
424
425 pub(crate) fn set_entering_staging(&self) -> Result<()> {
427 self.compare_exchange_state(
428 RegionLeaderState::Writable,
429 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
430 )
431 }
432
433 pub fn exit_staging(&self) -> Result<()> {
438 self.manifest_ctx.exit_staging(
439 self.region_id,
440 RegionRoleState::Leader(RegionLeaderState::Writable),
441 )
442 }
443
444 pub(crate) async fn set_role_state_gracefully(
446 &self,
447 state: SettableRegionRoleState,
448 ) -> Result<()> {
449 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
450 self.manifest_ctx.manifest_manager.write().await;
451 let current_state = self.state();
452
453 let hook_payload: Option<PendingManifestHook> = match state {
454 SettableRegionRoleState::Leader => {
455 match current_state {
458 RegionRoleState::Leader(RegionLeaderState::Staging) => {
459 info!("Exiting staging mode for region {}", self.region_id);
460 self.exit_staging_on_success(&mut manager).await?
462 }
463 RegionRoleState::Leader(RegionLeaderState::Writable) => {
464 info!("Region {} already in normal leader mode", self.region_id);
466 None
467 }
468 _ => {
469 return Err(RegionStateSnafu {
471 region_id: self.region_id,
472 state: current_state,
473 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
474 }
475 .build());
476 }
477 }
478 }
479
480 SettableRegionRoleState::StagingLeader => {
481 match current_state {
484 RegionRoleState::Leader(RegionLeaderState::Writable) => {
485 info!("Entering staging mode for region {}", self.region_id);
486 self.set_staging(&mut manager).await?;
487 }
488 RegionRoleState::Leader(RegionLeaderState::Staging) => {
489 info!("Region {} already in staging mode", self.region_id);
491 }
492 _ => {
493 return Err(RegionStateSnafu {
494 region_id: self.region_id,
495 state: current_state,
496 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
497 }
498 .build());
499 }
500 }
501 None
502 }
503
504 SettableRegionRoleState::Follower => {
505 match current_state {
507 RegionRoleState::Leader(RegionLeaderState::Staging) => {
508 info!(
509 "Exiting staging and demoting region {} to follower",
510 self.region_id
511 );
512 self.exit_staging()?;
513 self.set_role(RegionRole::Follower);
514 }
515 RegionRoleState::Leader(_) => {
516 info!("Demoting region {} from leader to follower", self.region_id);
517 self.set_role(RegionRole::Follower);
518 }
519 RegionRoleState::Follower => {
520 info!("Region {} already in follower mode", self.region_id);
522 }
523 }
524 None
525 }
526
527 SettableRegionRoleState::DowngradingLeader => {
528 match current_state {
530 RegionRoleState::Leader(RegionLeaderState::Staging) => {
531 info!(
532 "Exiting staging and entering downgrade for region {}",
533 self.region_id
534 );
535 self.exit_staging()?;
536 self.set_role(RegionRole::DowngradingLeader);
537 }
538 RegionRoleState::Leader(RegionLeaderState::Writable) => {
539 info!("Starting downgrade for region {}", self.region_id);
540 self.set_role(RegionRole::DowngradingLeader);
541 }
542 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
543 info!("Region {} already in downgrading mode", self.region_id);
545 }
546 _ => {
547 warn!(
548 "Cannot start downgrade for region {} from state {:?}",
549 self.region_id, current_state
550 );
551 }
552 }
553 None
554 }
555 };
556
557 let mut backfill_hook_payload: Option<PendingManifestHook> = None;
559 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
560 let manifest_meta = &manager.manifest().metadata;
562 let current_version = self.version();
563 let current_meta = ¤t_version.metadata;
564 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
565 let action = RegionMetaAction::Change(RegionChange {
566 metadata: current_meta.clone(),
567 sst_format: current_version.options.sst_format.unwrap_or_default(),
568 append_mode: None,
569 });
570 let action_list = RegionMetaActionList::with_action(action);
571 match self
572 .manifest_ctx
573 .update_locked(&mut manager, action_list, false)
574 .await
575 {
576 Ok(pending) => {
577 info!(
578 "Successfully persisted backfilled metadata for region {}, version: {}",
579 self.region_id,
580 pending.version()
581 );
582 backfill_hook_payload = Some(pending);
583 }
584 Err(e) => {
585 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
586 }
587 }
588 }
589 }
590
591 drop(manager);
592
593 let merged = match (hook_payload, backfill_hook_payload) {
596 (Some(staging), Some(backfill)) => Some(staging.merge(backfill)),
597 (Some(payload), None) => Some(payload),
598 (None, Some(payload)) => Some(payload),
599 (None, None) => None,
600 };
601
602 if let Some(pending) = merged {
603 pending.fire().await;
604 }
605
606 Ok(())
607 }
608
609 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
612 if let Err(e) = self
613 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
614 {
615 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
616 }
617 }
618
619 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
622 if let Err(e) =
623 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
624 {
625 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
626 }
627 }
628
629 pub(crate) fn region_statistic(&self) -> RegionStatistic {
631 let version = self.version();
632 let memtables = &version.memtables;
633 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
634
635 let sst_usage = version.ssts.owned_sst_usage(self.region_id);
636 let index_usage = version.ssts.owned_index_usage(self.region_id);
637 let flushed_entry_id = version.flushed_entry_id;
638
639 let wal_usage = self.estimated_wal_usage(memtable_usage);
640 let manifest_usage = self.stats.total_manifest_size();
641 let num_rows = version.ssts.owned_num_rows(self.region_id) + version.memtables.num_rows();
642 let num_files = version.ssts.owned_num_files(self.region_id);
643 let manifest_version = self.stats.manifest_version();
644 let file_removed_cnt = self.stats.file_removed_cnt();
645
646 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
647 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
648
649 RegionStatistic {
650 num_rows,
651 memtable_size: memtable_usage,
652 wal_size: wal_usage,
653 manifest_size: manifest_usage,
654 sst_size: sst_usage,
655 sst_num: num_files,
656 index_size: index_usage,
657 manifest: RegionManifestInfo::Mito {
658 manifest_version,
659 flushed_entry_id,
660 file_removed_cnt,
661 },
662 data_topic_latest_entry_id: topic_latest_entry_id,
663 metadata_topic_latest_entry_id: topic_latest_entry_id,
664 written_bytes,
665 }
666 }
667
668 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
671 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
672 }
673
674 fn compare_exchange_state(
677 &self,
678 expect: RegionLeaderState,
679 state: RegionRoleState,
680 ) -> Result<()> {
681 self.manifest_ctx
682 .state
683 .compare_exchange(RegionRoleState::Leader(expect), state)
684 .map_err(|actual| {
685 RegionStateSnafu {
686 region_id: self.region_id,
687 state: actual,
688 expect: RegionRoleState::Leader(expect),
689 }
690 .build()
691 })?;
692 Ok(())
693 }
694
695 pub fn access_layer(&self) -> AccessLayerRef {
696 self.access_layer.clone()
697 }
698
699 pub(crate) fn region_info_entry(&self, node_id: Option<u64>) -> RegionInfoEntry {
701 let region_id = self.region_id;
702 let version = self.version();
703 let state = self.state();
704 let role = self.region_role();
705 let region_options = serde_json::to_string(&version.options)
706 .unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string());
707 let sst_format = match version.options.sst_format.unwrap_or_default() {
708 crate::sst::FormatType::PrimaryKey => "primary_key",
709 crate::sst::FormatType::Flat => "flat",
710 }
711 .to_string();
712
713 RegionInfoEntry {
714 region_id,
715 table_id: region_id.table_id(),
716 region_number: region_id.region_number(),
717 region_group: region_id.region_group(),
718 region_sequence: region_id.region_sequence(),
719 state: state.as_str().to_string(),
720 role: role.to_string(),
721 writable: self.is_writable(),
722 committed_sequence: self.find_committed_sequence(),
723 flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0),
724 manifest_version: self.stats.manifest_version(),
725 compaction_time_window: version
726 .compaction_time_window
727 .map(|duration| humantime::format_duration(duration).to_string()),
728 region_options,
729 sst_format,
730 node_id,
731 }
732 }
733
734 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
736 let table_dir = self.table_dir();
737 let path_type = self.access_layer.path_type();
738
739 let visible_ssts = self
740 .version()
741 .ssts
742 .levels()
743 .iter()
744 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
745 .collect::<HashSet<_>>();
746
747 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
748 let staging_files = self
749 .manifest_ctx
750 .staging_manifest()
751 .await
752 .map(|m| m.files.clone())
753 .unwrap_or_default();
754 let files = manifest_files
755 .into_iter()
756 .chain(staging_files)
757 .collect::<HashMap<_, _>>();
758
759 files
760 .values()
761 .map(|meta| {
762 let region_id = self.region_id;
763 let origin_region_id = meta.region_id;
764 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
765 {
766 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
767 (
768 meta.index_version,
769 Some(index_file_path),
770 Some(meta.index_file_size),
771 )
772 } else {
773 (0, None, None)
774 };
775 let visible = visible_ssts.contains(&meta.file_id);
776 ManifestSstEntry {
777 table_dir: table_dir.to_string(),
778 region_id,
779 table_id: region_id.table_id(),
780 region_number: region_id.region_number(),
781 region_group: region_id.region_group(),
782 region_sequence: region_id.region_sequence(),
783 file_id: meta.file_id.to_string(),
784 index_version,
785 level: meta.level,
786 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
787 file_size: meta.file_size,
788 index_file_path,
789 index_file_size,
790 num_rows: meta.num_rows,
791 num_row_groups: meta.num_row_groups,
792 num_series: Some(meta.num_series),
793 min_ts: meta.time_range.0,
794 max_ts: meta.time_range.1,
795 sequence: meta.sequence.map(|s| s.get()),
796 origin_region_id,
797 node_id: None,
798 visible,
799 primary_key_min: meta.primary_key_min.clone(),
800 primary_key_max: meta.primary_key_max.clone(),
801 }
802 })
803 .collect()
804 }
805
806 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
808 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
809
810 file_ids
811 .iter()
812 .map(|file_id| manifest_files.get(file_id).cloned())
813 .collect::<Vec<_>>()
814 }
815
816 pub(crate) async fn exit_staging_on_success(
826 &self,
827 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
828 ) -> Result<Option<PendingManifestHook>> {
829 let current_state = self.manifest_ctx.current_state();
830 ensure!(
831 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
832 RegionStateSnafu {
833 region_id: self.region_id,
834 state: current_state,
835 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
836 }
837 );
838
839 let merged_actions = match manager.merge_staged_actions(current_state).await? {
841 Some(actions) => actions,
842 None => {
843 info!(
844 "No staged manifests to merge for region {}, exiting staging mode without changes",
845 self.region_id
846 );
847 self.exit_staging()?;
849 return Ok(None);
850 }
851 };
852 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
853 let expect_partition_expr_change = merged_actions
854 .actions
855 .iter()
856 .any(|a| a.is_partition_expr_change());
857 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
858 ensure!(
859 !(expect_change && expect_partition_expr_change),
860 UnexpectedSnafu {
861 reason: "unexpected both change and partition expr change actions in merged actions"
862 }
863 );
864 ensure!(
865 expect_change || expect_partition_expr_change,
866 UnexpectedSnafu {
867 reason: "expect a change or partition expr change action in merged actions"
868 }
869 );
870 ensure!(
871 expect_edit,
872 UnexpectedSnafu {
873 reason: "expect an edit action in merged actions"
874 }
875 );
876
877 let (merged_partition_expr_change, merged_change, merged_edit) =
878 merged_actions.clone().split_region_change_and_edit();
879 if let Some(change) = &merged_change {
880 let current_column_metadatas = &self.version().metadata.column_metadatas;
884 ensure!(
885 change.metadata.column_metadatas == *current_column_metadatas,
886 UnexpectedSnafu {
887 reason: "change action alters column metadata in staging exit"
888 }
889 );
890 }
891
892 let pending = self
895 .manifest_ctx
896 .update_locked(manager, merged_actions, false)
897 .await?;
898 let new_version = pending.version();
899 info!(
900 "Successfully submitted merged staged manifests for region {}, new version: {}",
901 self.region_id, new_version
902 );
903
904 if let Some(change) = merged_partition_expr_change {
906 let mut new_metadata = self.version().metadata.as_ref().clone();
907 new_metadata.set_partition_expr(change.partition_expr);
908 self.version_control.alter_metadata(new_metadata.into());
909 }
910 if let Some(change) = merged_change {
911 self.version_control.alter_metadata(change.metadata);
912 }
913 self.version_control
914 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
915
916 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
918 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
919 }
920 self.exit_staging()?;
921
922 Ok(Some(pending))
924 }
925
926 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
932 let is_staging = self.is_staging();
933 if is_staging {
934 let staging_partition_info = self.manifest_ctx.staging_partition_info();
935 if staging_partition_info.is_none() {
936 warn!(
937 "Staging partition expr is none for region {} in staging state",
938 self.region_id
939 );
940 }
941 staging_partition_info
942 .as_ref()
943 .and_then(|info| info.partition_expr().map(ToString::to_string))
944 } else {
945 let version = self.version();
946 version.metadata.partition_expr.clone()
947 }
948 }
949
950 pub fn expected_partition_expr_version(&self) -> u64 {
951 if self.is_staging() {
952 self.manifest_ctx
953 .staging_partition_info()
954 .as_ref()
955 .map(|info| info.partition_rule_version)
956 .unwrap_or_default()
957 } else {
958 self.version().metadata.partition_expr_version
959 }
960 }
961
962 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
964 if !self.is_staging() {
965 return false;
966 }
967 self.manifest_ctx
968 .staging_partition_info()
969 .as_ref()
970 .map(|info| {
971 matches!(
972 info.partition_directive,
973 StagingPartitionDirective::RejectAllWrites
974 )
975 })
976 .unwrap_or(false)
977 }
978}
979
980impl Drop for MitoRegion {
981 fn drop(&mut self) {
982 self.remove_region_metrics();
983 }
984}
985
986#[derive(Debug)]
988pub(crate) struct ManifestContext {
989 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
993 state: AtomicCell<RegionRoleState>,
996 staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
1001 hook: Option<RegionHookRef>,
1003}
1004
1005impl ManifestContext {
1006 pub(crate) fn new(
1007 manager: RegionManifestManager,
1008 state: RegionRoleState,
1009 hook: Option<RegionHookRef>,
1010 ) -> Self {
1011 ManifestContext {
1012 manifest_manager: tokio::sync::RwLock::new(manager),
1013 state: AtomicCell::new(state),
1014 staging_partition_info: Mutex::new(None),
1015 hook,
1016 }
1017 }
1018
1019 pub(crate) fn hook(&self) -> Option<RegionHookRef> {
1021 self.hook.clone()
1022 }
1023
1024 pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
1025 self.staging_partition_info.lock().unwrap().clone()
1026 }
1027
1028 pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
1029 let mut current = self.staging_partition_info.lock().unwrap();
1030 debug_assert!(current.is_none());
1031 *current = Some(staging_partition_info);
1032 }
1033
1034 fn clear_staging_partition_info(&self) {
1035 *self.staging_partition_info.lock().unwrap() = None;
1036 }
1037
1038 pub(crate) fn exit_staging(
1039 &self,
1040 region_id: RegionId,
1041 next_state: RegionRoleState,
1042 ) -> Result<()> {
1043 self.state
1044 .compare_exchange(
1045 RegionRoleState::Leader(RegionLeaderState::Staging),
1046 next_state,
1047 )
1048 .map_err(|actual| {
1049 RegionStateSnafu {
1050 region_id,
1051 state: actual,
1052 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1053 }
1054 .build()
1055 })?;
1056 self.clear_staging_partition_info();
1057 Ok(())
1058 }
1059
1060 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
1061 self.manifest_manager
1062 .read()
1063 .await
1064 .manifest()
1065 .manifest_version
1066 }
1067
1068 pub(crate) async fn has_update(&self) -> Result<bool> {
1069 self.manifest_manager.read().await.has_update().await
1070 }
1071
1072 pub(crate) fn current_state(&self) -> RegionRoleState {
1074 self.state.load()
1075 }
1076
1077 pub(crate) async fn install_manifest_to(
1083 &self,
1084 version: ManifestVersion,
1085 ) -> Result<Arc<RegionManifest>> {
1086 let mut manager = self.manifest_manager.write().await;
1087 manager.install_manifest_to(version).await?;
1088
1089 Ok(manager.manifest())
1090 }
1091
1092 pub(crate) async fn update_manifest(
1094 &self,
1095 expect_state: RegionLeaderState,
1096 action_list: RegionMetaActionList,
1097 is_staging: bool,
1098 ) -> Result<ManifestVersion> {
1099 self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| {
1100 if expect_state != RegionLeaderState::Downgrading {
1105 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1106 info!(
1107 "Region {} is in downgrading leader state, updating manifest. Expect state is {:?}",
1108 region_id, expect_state
1109 );
1110 }
1111 ensure!(
1112 current_state == RegionRoleState::Leader(expect_state)
1113 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
1114 UpdateManifestSnafu {
1115 region_id,
1116 state: current_state,
1117 }
1118 );
1119 } else {
1120 ensure!(
1121 current_state == RegionRoleState::Leader(expect_state),
1122 RegionStateSnafu {
1123 region_id,
1124 state: current_state,
1125 expect: RegionRoleState::Leader(expect_state),
1126 }
1127 );
1128 }
1129
1130 Ok(())
1131 })
1132 .await
1133 }
1134
1135 pub(crate) async fn update_manifest_for_compaction(
1152 &self,
1153 action_list: RegionMetaActionList,
1154 ) -> Result<ManifestVersion> {
1155 self.update_manifest_with_state_check(action_list, false, |current_state, region_id| {
1156 ensure!(
1157 matches!(
1158 current_state,
1159 RegionRoleState::Leader(RegionLeaderState::Writable)
1160 | RegionRoleState::Leader(RegionLeaderState::Editing)
1161 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1162 ),
1163 UpdateManifestSnafu {
1164 region_id,
1165 state: current_state,
1166 }
1167 );
1168
1169 Ok(())
1170 })
1171 .await
1172 }
1173
1174 pub(crate) async fn update_locked(
1184 &self,
1185 manager: &mut RegionManifestManager,
1186 action_list: RegionMetaActionList,
1187 is_staging: bool,
1188 ) -> Result<PendingManifestHook> {
1189 let region_id = manager.manifest().metadata.region_id;
1190 let action_list_for_hook = self.hook.as_ref().map(|_| action_list.clone());
1193 let version = manager
1194 .update(action_list, is_staging)
1195 .await
1196 .inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?;
1197
1198 Ok(PendingManifestHook::new(
1199 region_id,
1200 action_list_for_hook,
1201 version,
1202 self.hook.clone(),
1203 ))
1204 }
1205
1206 async fn update_manifest_with_state_check(
1207 &self,
1208 action_list: RegionMetaActionList,
1209 is_staging: bool,
1210 check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>,
1211 ) -> Result<ManifestVersion> {
1212 let mut manager = self.manifest_manager.write().await;
1214 let manifest = manager.manifest();
1216 let current_state = self.state.load();
1219 check_state(current_state, manifest.metadata.region_id)?;
1220
1221 for action in &action_list.actions {
1222 let RegionMetaAction::Edit(edit) = &action else {
1224 continue;
1225 };
1226
1227 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1229 continue;
1230 };
1231
1232 if let Some(flushed_entry_id) = edit.flushed_entry_id {
1234 let is_newer_entry = truncated_entry_id < flushed_entry_id;
1244 let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1245 && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1246 manifest.flushed_sequence < flushed_sequence
1247 });
1248
1249 ensure!(
1250 is_newer_entry || is_same_entry_with_newer_sequence,
1251 RegionTruncatedSnafu {
1252 region_id: manifest.metadata.region_id,
1253 }
1254 );
1255 }
1256
1257 if !edit.files_to_remove.is_empty() {
1259 for file in &edit.files_to_remove {
1261 ensure!(
1262 manifest.files.contains_key(&file.file_id),
1263 RegionTruncatedSnafu {
1264 region_id: manifest.metadata.region_id,
1265 }
1266 );
1267 }
1268 }
1269 }
1270
1271 let region_id = manifest.metadata.region_id;
1273 let pending = self
1274 .update_locked(&mut manager, action_list, is_staging)
1275 .await?;
1276 let version = pending.version();
1277
1278 drop(manager);
1282
1283 if self.state.load() == RegionRoleState::Follower {
1284 warn!(
1285 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1286 region_id
1287 );
1288 }
1289
1290 pending.fire().await;
1291
1292 Ok(version)
1293 }
1294
1295 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1329 match next_role {
1330 RegionRole::Follower => {
1331 if self
1332 .exit_staging(region_id, RegionRoleState::Follower)
1333 .is_ok()
1334 {
1335 info!(
1336 "Convert region {} to follower, previous role state: {:?}",
1337 region_id,
1338 RegionRoleState::Leader(RegionLeaderState::Staging)
1339 );
1340 return;
1341 }
1342 match self.state.fetch_update(|state| {
1343 if !matches!(state, RegionRoleState::Follower) {
1344 Some(RegionRoleState::Follower)
1345 } else {
1346 None
1347 }
1348 }) {
1349 Ok(state) => info!(
1350 "Convert region {} to follower, previous role state: {:?}",
1351 region_id, state
1352 ),
1353 Err(state) => {
1354 if state != RegionRoleState::Follower {
1355 warn!(
1356 "Failed to convert region {} to follower, current role state: {:?}",
1357 region_id, state
1358 )
1359 }
1360 }
1361 }
1362 }
1363 RegionRole::Leader => {
1364 if self
1365 .exit_staging(
1366 region_id,
1367 RegionRoleState::Leader(RegionLeaderState::Writable),
1368 )
1369 .is_ok()
1370 {
1371 info!(
1372 "Convert region {} to leader, previous role state: {:?}",
1373 region_id,
1374 RegionRoleState::Leader(RegionLeaderState::Staging)
1375 );
1376 return;
1377 }
1378 match self.state.fetch_update(|state| {
1379 if matches!(
1380 state,
1381 RegionRoleState::Follower
1382 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1383 ) {
1384 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1385 } else {
1386 None
1387 }
1388 }) {
1389 Ok(state) => info!(
1390 "Convert region {} to leader, previous role state: {:?}",
1391 region_id, state
1392 ),
1393 Err(state) => {
1394 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1395 warn!(
1396 "Failed to convert region {} to leader, current role state: {:?}",
1397 region_id, state
1398 )
1399 }
1400 }
1401 }
1402 }
1403 RegionRole::StagingLeader => {
1404 info!(
1405 "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1406 region_id
1407 );
1408 }
1409 RegionRole::DowngradingLeader => {
1410 if self
1411 .exit_staging(
1412 region_id,
1413 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1414 )
1415 .is_ok()
1416 {
1417 info!(
1418 "Convert region {} to downgrading region, previous role state: {:?}",
1419 region_id,
1420 RegionRoleState::Leader(RegionLeaderState::Staging)
1421 );
1422 return;
1423 }
1424 match self.state.compare_exchange(
1425 RegionRoleState::Leader(RegionLeaderState::Writable),
1426 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1427 ) {
1428 Ok(state) => info!(
1429 "Convert region {} to downgrading region, previous role state: {:?}",
1430 region_id, state
1431 ),
1432 Err(state) => {
1433 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1434 warn!(
1435 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1436 region_id, state
1437 )
1438 }
1439 }
1440 }
1441 }
1442 }
1443 }
1444
1445 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1447 self.manifest_manager.read().await.manifest()
1448 }
1449
1450 pub(crate) async fn staging_manifest(
1452 &self,
1453 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1454 self.manifest_manager.read().await.staging_manifest()
1455 }
1456}
1457
1458pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1459
1460#[derive(Debug, Default)]
1462pub(crate) struct RegionMap {
1463 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1464}
1465
1466impl RegionMap {
1467 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1469 let regions = self.regions.read().unwrap();
1470 regions.contains_key(®ion_id)
1471 }
1472
1473 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1475 let mut regions = self.regions.write().unwrap();
1476 regions.insert(region.region_id, region);
1477 }
1478
1479 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1481 let regions = self.regions.read().unwrap();
1482 regions.get(®ion_id).cloned()
1483 }
1484
1485 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1489 let region = self
1490 .get_region(region_id)
1491 .context(RegionNotFoundSnafu { region_id })?;
1492 ensure!(
1493 region.is_writable(),
1494 RegionStateSnafu {
1495 region_id,
1496 state: region.state(),
1497 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1498 }
1499 );
1500 Ok(region)
1501 }
1502
1503 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1507 let region = self
1508 .get_region(region_id)
1509 .context(RegionNotFoundSnafu { region_id })?;
1510 ensure!(
1511 region.is_follower(),
1512 RegionStateSnafu {
1513 region_id,
1514 state: region.state(),
1515 expect: RegionRoleState::Follower,
1516 }
1517 );
1518
1519 Ok(region)
1520 }
1521
1522 pub(crate) fn get_region_or<F: OnFailure>(
1526 &self,
1527 region_id: RegionId,
1528 cb: &mut F,
1529 ) -> Option<MitoRegionRef> {
1530 match self
1531 .get_region(region_id)
1532 .context(RegionNotFoundSnafu { region_id })
1533 {
1534 Ok(region) => Some(region),
1535 Err(e) => {
1536 cb.on_failure(e);
1537 None
1538 }
1539 }
1540 }
1541
1542 pub(crate) fn writable_region_or<F: OnFailure>(
1546 &self,
1547 region_id: RegionId,
1548 cb: &mut F,
1549 ) -> Option<MitoRegionRef> {
1550 match self.writable_region(region_id) {
1551 Ok(region) => Some(region),
1552 Err(e) => {
1553 cb.on_failure(e);
1554 None
1555 }
1556 }
1557 }
1558
1559 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1563 let region = self.writable_region(region_id)?;
1564 if region.is_staging() {
1565 return Err(crate::error::RegionStateSnafu {
1566 region_id,
1567 state: region.state(),
1568 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1569 }
1570 .build());
1571 }
1572 Ok(region)
1573 }
1574
1575 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1579 let region = self
1580 .get_region(region_id)
1581 .context(RegionNotFoundSnafu { region_id })?;
1582 ensure!(
1583 region.is_staging(),
1584 RegionStateSnafu {
1585 region_id,
1586 state: region.state(),
1587 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1588 }
1589 );
1590 Ok(region)
1591 }
1592
1593 pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1597 let region = self
1598 .get_region(region_id)
1599 .context(RegionNotFoundSnafu { region_id })?;
1600 ensure!(
1601 region.is_flushable(),
1602 FlushableRegionStateSnafu {
1603 region_id,
1604 state: region.state(),
1605 }
1606 );
1607 Ok(region)
1608 }
1609
1610 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1612 let mut regions = self.regions.write().unwrap();
1613 regions.remove(®ion_id)
1614 }
1615
1616 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1618 let regions = self.regions.read().unwrap();
1619 regions.values().cloned().collect()
1620 }
1621
1622 pub(crate) fn clear(&self) {
1624 self.regions.write().unwrap().clear();
1625 }
1626}
1627
1628pub(crate) type RegionMapRef = Arc<RegionMap>;
1629
1630#[derive(Debug, Default)]
1632pub(crate) struct OpeningRegions {
1633 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1634}
1635
1636impl OpeningRegions {
1637 pub(crate) fn wait_for_opening_region(
1639 &self,
1640 region_id: RegionId,
1641 sender: OptionOutputTx,
1642 ) -> Option<OptionOutputTx> {
1643 let mut regions = self.regions.write().unwrap();
1644 match regions.entry(region_id) {
1645 Entry::Occupied(mut senders) => {
1646 senders.get_mut().push(sender);
1647 None
1648 }
1649 Entry::Vacant(_) => Some(sender),
1650 }
1651 }
1652
1653 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1655 let regions = self.regions.read().unwrap();
1656 regions.contains_key(®ion_id)
1657 }
1658
1659 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1661 let mut regions = self.regions.write().unwrap();
1662 regions.insert(region, vec![sender]);
1663 }
1664
1665 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1667 let mut regions = self.regions.write().unwrap();
1668 regions.remove(®ion_id).unwrap_or_default()
1669 }
1670
1671 #[cfg(test)]
1672 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1673 let regions = self.regions.read().unwrap();
1674 if let Some(senders) = regions.get(®ion_id) {
1675 senders.len()
1676 } else {
1677 0
1678 }
1679 }
1680}
1681
1682pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1683
1684#[derive(Debug, Default)]
1686pub(crate) struct CatchupRegions {
1687 regions: RwLock<HashSet<RegionId>>,
1688}
1689
1690impl CatchupRegions {
1691 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1693 let regions = self.regions.read().unwrap();
1694 regions.contains(®ion_id)
1695 }
1696
1697 pub(crate) fn insert_region(&self, region_id: RegionId) {
1699 let mut regions = self.regions.write().unwrap();
1700 regions.insert(region_id);
1701 }
1702
1703 pub(crate) fn remove_region(&self, region_id: RegionId) {
1705 let mut regions = self.regions.write().unwrap();
1706 regions.remove(®ion_id);
1707 }
1708}
1709
1710pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1711
1712#[derive(Default, Debug, Clone)]
1714pub struct ManifestStats {
1715 pub(crate) total_manifest_size: Arc<AtomicU64>,
1716 pub(crate) manifest_version: Arc<AtomicU64>,
1717 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1718}
1719
1720impl ManifestStats {
1721 fn total_manifest_size(&self) -> u64 {
1722 self.total_manifest_size.load(Ordering::Relaxed)
1723 }
1724
1725 fn manifest_version(&self) -> u64 {
1726 self.manifest_version.load(Ordering::Relaxed)
1727 }
1728
1729 fn file_removed_cnt(&self) -> u64 {
1730 self.file_removed_cnt.load(Ordering::Relaxed)
1731 }
1732}
1733
1734pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1736 match partition_expr_str {
1737 None => Ok(None),
1738 Some("") => Ok(None),
1739 Some(json_str) => {
1740 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1741 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1742 Ok(expr)
1743 }
1744 }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749 use std::sync::Arc;
1750 use std::sync::atomic::AtomicU64;
1751
1752 use common_datasource::compression::CompressionType;
1753 use common_test_util::temp_dir::create_temp_dir;
1754 use crossbeam_utils::atomic::AtomicCell;
1755 use object_store::ObjectStore;
1756 use object_store::services::Fs;
1757 use store_api::logstore::provider::Provider;
1758 use store_api::region_engine::RegionRole;
1759 use store_api::region_request::PathType;
1760 use store_api::storage::{FileId, RegionId};
1761
1762 use crate::access_layer::AccessLayer;
1763 use crate::error::Error;
1764 use crate::manifest::action::{
1765 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1766 };
1767 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1768 use crate::region::{
1769 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1770 };
1771 use crate::sst::FormatType;
1772 use crate::sst::index::intermediate::IntermediateManager;
1773 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1774 use crate::test_util::scheduler_util::SchedulerEnv;
1775 use crate::test_util::version_util::VersionControlBuilder;
1776 use crate::time_provider::StdTimeProvider;
1777
1778 #[test]
1779 fn test_region_state_lock_free() {
1780 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1781 }
1782
1783 #[test]
1784 fn test_region_role_state_as_str() {
1785 assert_eq!("Follower", RegionRoleState::Follower.as_str());
1786 assert_eq!(
1787 "Leader(Writable)",
1788 RegionRoleState::Leader(RegionLeaderState::Writable).as_str()
1789 );
1790 assert_eq!(
1791 "Leader(Staging)",
1792 RegionRoleState::Leader(RegionLeaderState::Staging).as_str()
1793 );
1794 assert_eq!(
1795 "Leader(Downgrading)",
1796 RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str()
1797 );
1798 }
1799
1800 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1801 let builder = VersionControlBuilder::new();
1802 let version_control = Arc::new(builder.build());
1803 let metadata = version_control.current().version.metadata.clone();
1804
1805 let manager = RegionManifestManager::new(
1806 metadata.clone(),
1807 0,
1808 RegionManifestOptions {
1809 manifest_dir: "".to_string(),
1810 object_store: env.access_layer.object_store().clone(),
1811 compress_type: CompressionType::Uncompressed,
1812 checkpoint_distance: 10,
1813 remove_file_options: Default::default(),
1814 manifest_cache: None,
1815 },
1816 FormatType::PrimaryKey,
1817 &Default::default(),
1818 )
1819 .await
1820 .unwrap();
1821
1822 let manifest_ctx = Arc::new(ManifestContext::new(
1823 manager,
1824 RegionRoleState::Leader(RegionLeaderState::Writable),
1825 None,
1826 ));
1827
1828 MitoRegion {
1829 region_id: metadata.region_id,
1830 version_control,
1831 access_layer: env.access_layer.clone(),
1832 manifest_ctx,
1833 file_purger: crate::test_util::new_noop_file_purger(),
1834 provider: Provider::noop_provider(),
1835 last_flush_millis: Default::default(),
1836 last_schedule_compaction_millis: Default::default(),
1837 time_provider: Arc::new(StdTimeProvider),
1838 topic_latest_entry_id: Default::default(),
1839 written_bytes: Arc::new(AtomicU64::new(0)),
1840 stats: ManifestStats::default(),
1841 }
1842 }
1843
1844 fn empty_edit() -> RegionEdit {
1845 RegionEdit {
1846 files_to_add: Vec::new(),
1847 files_to_remove: Vec::new(),
1848 timestamp_ms: None,
1849 compaction_time_window: None,
1850 flushed_entry_id: None,
1851 flushed_sequence: None,
1852 committed_sequence: None,
1853 }
1854 }
1855
1856 #[tokio::test]
1857 async fn test_compaction_update_manifest_allows_editing_state() {
1858 let env = SchedulerEnv::new().await;
1859 let region = build_test_region(&env).await;
1860 region.set_editing(RegionLeaderState::Writable).unwrap();
1861
1862 let file_id = FileId::random();
1863 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit {
1864 files_to_add: vec![crate::sst::file::FileMeta {
1865 region_id: region.region_id,
1866 file_id,
1867 level: 1,
1868 ..Default::default()
1869 }],
1870 files_to_remove: Vec::new(),
1871 timestamp_ms: None,
1872 compaction_time_window: None,
1873 flushed_entry_id: None,
1874 flushed_sequence: None,
1875 committed_sequence: None,
1876 }));
1877
1878 region
1879 .manifest_ctx
1880 .update_manifest_for_compaction(action_list)
1881 .await
1882 .unwrap();
1883
1884 assert!(
1885 region
1886 .manifest_ctx
1887 .manifest()
1888 .await
1889 .files
1890 .contains_key(&file_id)
1891 );
1892 }
1893
1894 #[tokio::test]
1895 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1896 let env = SchedulerEnv::new().await;
1897 let region = build_test_region(&env).await;
1898
1899 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1900 region.set_staging(&mut manager).await.unwrap();
1901 manager
1902 .update(
1903 RegionMetaActionList::new(vec![
1904 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1905 partition_expr: Some("expr_a".to_string()),
1906 }),
1907 RegionMetaAction::Edit(empty_edit()),
1908 ]),
1909 true,
1910 )
1911 .await
1912 .unwrap();
1913
1914 let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
1915 drop(manager);
1916
1917 assert_eq!(
1918 region.version().metadata.partition_expr.as_deref(),
1919 Some("expr_a")
1920 );
1921 assert_eq!(
1922 region.state(),
1923 RegionRoleState::Leader(RegionLeaderState::Writable)
1924 );
1925 }
1926
1927 #[tokio::test]
1928 async fn test_exit_staging_change_with_same_columns_success() {
1929 let env = SchedulerEnv::new().await;
1930 let region = build_test_region(&env).await;
1931
1932 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1933 region.set_staging(&mut manager).await.unwrap();
1934
1935 let mut changed_metadata = region.version().metadata.as_ref().clone();
1936 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1937
1938 manager
1939 .update(
1940 RegionMetaActionList::new(vec![
1941 RegionMetaAction::Change(RegionChange {
1942 metadata: Arc::new(changed_metadata),
1943 sst_format: FormatType::PrimaryKey,
1944 append_mode: None,
1945 }),
1946 RegionMetaAction::Edit(empty_edit()),
1947 ]),
1948 true,
1949 )
1950 .await
1951 .unwrap();
1952
1953 let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
1954 drop(manager);
1955
1956 assert_eq!(
1957 region.version().metadata.partition_expr.as_deref(),
1958 Some("expr_b")
1959 );
1960 assert_eq!(
1961 region.state(),
1962 RegionRoleState::Leader(RegionLeaderState::Writable)
1963 );
1964 }
1965
1966 #[tokio::test]
1967 async fn test_exit_staging_change_with_different_columns_fails() {
1968 let env = SchedulerEnv::new().await;
1969 let region = build_test_region(&env).await;
1970
1971 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1972 region.set_staging(&mut manager).await.unwrap();
1973
1974 let mut changed_metadata = region.version().metadata.as_ref().clone();
1975 changed_metadata.column_metadatas.rotate_left(1);
1976
1977 manager
1978 .update(
1979 RegionMetaActionList::new(vec![
1980 RegionMetaAction::Change(RegionChange {
1981 metadata: Arc::new(changed_metadata),
1982 sst_format: FormatType::PrimaryKey,
1983 append_mode: None,
1984 }),
1985 RegionMetaAction::Edit(empty_edit()),
1986 ]),
1987 true,
1988 )
1989 .await
1990 .unwrap();
1991
1992 let result = region.exit_staging_on_success(&mut manager).await;
1993 assert!(matches!(result, Err(Error::Unexpected { .. })));
1994 }
1995
1996 #[tokio::test]
1997 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1998 let env = SchedulerEnv::new().await;
1999 let region = build_test_region(&env).await;
2000
2001 let mut manager = region.manifest_ctx.manifest_manager.write().await;
2002 region.set_staging(&mut manager).await.unwrap();
2003
2004 let mut changed_metadata = region.version().metadata.as_ref().clone();
2005 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
2006
2007 manager
2008 .update(
2009 RegionMetaActionList::new(vec![
2010 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
2011 partition_expr: Some("expr_c".to_string()),
2012 }),
2013 RegionMetaAction::Change(RegionChange {
2014 metadata: Arc::new(changed_metadata),
2015 sst_format: FormatType::PrimaryKey,
2016 append_mode: None,
2017 }),
2018 RegionMetaAction::Edit(empty_edit()),
2019 ]),
2020 true,
2021 )
2022 .await
2023 .unwrap();
2024
2025 let result = region.exit_staging_on_success(&mut manager).await;
2026 assert!(matches!(result, Err(Error::Unexpected { .. })));
2027 }
2028
2029 #[tokio::test]
2030 async fn test_set_region_state() {
2031 let env = SchedulerEnv::new().await;
2032 let builder = VersionControlBuilder::new();
2033 let version_control = Arc::new(builder.build());
2034 let manifest_ctx = env
2035 .mock_manifest_context(version_control.current().version.metadata.clone())
2036 .await;
2037
2038 let region_id = RegionId::new(1024, 0);
2039 manifest_ctx.set_role(RegionRole::Follower, region_id);
2041 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2042
2043 manifest_ctx.set_role(RegionRole::Leader, region_id);
2045 assert_eq!(
2046 manifest_ctx.state.load(),
2047 RegionRoleState::Leader(RegionLeaderState::Writable)
2048 );
2049
2050 manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
2052 assert_eq!(
2053 manifest_ctx.state.load(),
2054 RegionRoleState::Leader(RegionLeaderState::Writable)
2055 );
2056
2057 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2059 assert_eq!(
2060 manifest_ctx.state.load(),
2061 RegionRoleState::Leader(RegionLeaderState::Downgrading)
2062 );
2063
2064 manifest_ctx.set_role(RegionRole::Follower, region_id);
2066 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2067
2068 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2070 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2071
2072 manifest_ctx.set_role(RegionRole::Leader, region_id);
2074 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2075 assert_eq!(
2076 manifest_ctx.state.load(),
2077 RegionRoleState::Leader(RegionLeaderState::Downgrading)
2078 );
2079
2080 manifest_ctx.set_role(RegionRole::Leader, region_id);
2082 assert_eq!(
2083 manifest_ctx.state.load(),
2084 RegionRoleState::Leader(RegionLeaderState::Writable)
2085 );
2086 }
2087
2088 #[tokio::test]
2089 async fn test_staging_state_validation() {
2090 let env = SchedulerEnv::new().await;
2091 let builder = VersionControlBuilder::new();
2092 let version_control = Arc::new(builder.build());
2093
2094 let staging_ctx = {
2096 let manager = RegionManifestManager::new(
2097 version_control.current().version.metadata.clone(),
2098 0,
2099 RegionManifestOptions {
2100 manifest_dir: "".to_string(),
2101 object_store: env.access_layer.object_store().clone(),
2102 compress_type: CompressionType::Uncompressed,
2103 checkpoint_distance: 10,
2104 remove_file_options: Default::default(),
2105 manifest_cache: None,
2106 },
2107 FormatType::PrimaryKey,
2108 &Default::default(),
2109 )
2110 .await
2111 .unwrap();
2112 Arc::new(ManifestContext::new(
2113 manager,
2114 RegionRoleState::Leader(RegionLeaderState::Staging),
2115 None,
2116 ))
2117 };
2118
2119 assert_eq!(
2121 staging_ctx.current_state(),
2122 RegionRoleState::Leader(RegionLeaderState::Staging)
2123 );
2124
2125 let writable_ctx = env
2127 .mock_manifest_context(version_control.current().version.metadata.clone())
2128 .await;
2129
2130 assert_eq!(
2131 writable_ctx.current_state(),
2132 RegionRoleState::Leader(RegionLeaderState::Writable)
2133 );
2134 }
2135
2136 #[tokio::test]
2137 async fn test_staging_state_transitions() {
2138 let builder = VersionControlBuilder::new();
2139 let version_control = Arc::new(builder.build());
2140 let metadata = version_control.current().version.metadata.clone();
2141
2142 let temp_dir = create_temp_dir("");
2144 let path_str = temp_dir.path().display().to_string();
2145 let fs_builder = Fs::default().root(&path_str);
2146 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
2147
2148 let index_aux_path = temp_dir.path().join("index_aux");
2149 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
2150 .await
2151 .unwrap();
2152 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
2153 .await
2154 .unwrap();
2155
2156 let access_layer = Arc::new(AccessLayer::new(
2157 "",
2158 PathType::Bare,
2159 object_store,
2160 puffin_mgr,
2161 intm_mgr,
2162 ));
2163
2164 let manager = RegionManifestManager::new(
2165 metadata.clone(),
2166 0,
2167 RegionManifestOptions {
2168 manifest_dir: "".to_string(),
2169 object_store: access_layer.object_store().clone(),
2170 compress_type: CompressionType::Uncompressed,
2171 checkpoint_distance: 10,
2172 remove_file_options: Default::default(),
2173 manifest_cache: None,
2174 },
2175 FormatType::PrimaryKey,
2176 &Default::default(),
2177 )
2178 .await
2179 .unwrap();
2180
2181 let manifest_ctx = Arc::new(ManifestContext::new(
2182 manager,
2183 RegionRoleState::Leader(RegionLeaderState::Writable),
2184 None,
2185 ));
2186
2187 let region = MitoRegion {
2188 region_id: metadata.region_id,
2189 version_control,
2190 access_layer,
2191 manifest_ctx: manifest_ctx.clone(),
2192 file_purger: crate::test_util::new_noop_file_purger(),
2193 provider: Provider::noop_provider(),
2194 last_flush_millis: Default::default(),
2195 last_schedule_compaction_millis: Default::default(),
2196 time_provider: Arc::new(StdTimeProvider),
2197 topic_latest_entry_id: Default::default(),
2198 written_bytes: Arc::new(AtomicU64::new(0)),
2199 stats: ManifestStats::default(),
2200 };
2201
2202 assert_eq!(
2204 region.state(),
2205 RegionRoleState::Leader(RegionLeaderState::Writable)
2206 );
2207 assert!(!region.is_staging());
2208
2209 let mut manager = manifest_ctx.manifest_manager.write().await;
2211 region.set_staging(&mut manager).await.unwrap();
2212 drop(manager);
2213 assert_eq!(
2214 region.state(),
2215 RegionRoleState::Leader(RegionLeaderState::Staging)
2216 );
2217 assert!(region.is_staging());
2218
2219 region.exit_staging().unwrap();
2221 assert_eq!(
2222 region.state(),
2223 RegionRoleState::Leader(RegionLeaderState::Writable)
2224 );
2225 assert!(!region.is_staging());
2226
2227 {
2229 let manager = manifest_ctx.manifest_manager.write().await;
2231 let dummy_actions = RegionMetaActionList::new(vec![]);
2232 let dummy_bytes = dummy_actions.encode().unwrap();
2233
2234 manager.store().save(100, &dummy_bytes, true).await.unwrap();
2236 manager.store().save(101, &dummy_bytes, true).await.unwrap();
2237 drop(manager);
2238
2239 let manager = manifest_ctx.manifest_manager.read().await;
2241 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2242 assert_eq!(
2243 dirty_manifests.len(),
2244 2,
2245 "Should have 2 dirty staging files"
2246 );
2247 drop(manager);
2248
2249 let mut manager = manifest_ctx.manifest_manager.write().await;
2251 region.set_staging(&mut manager).await.unwrap();
2252 drop(manager);
2253
2254 let manager = manifest_ctx.manifest_manager.read().await;
2256 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2257 assert_eq!(
2258 cleaned_manifests.len(),
2259 0,
2260 "Dirty staging files should be cleaned up"
2261 );
2262 drop(manager);
2263
2264 region.exit_staging().unwrap();
2266 }
2267
2268 let mut manager = manifest_ctx.manifest_manager.write().await;
2270 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
2272 let mut manager = manifest_ctx.manifest_manager.write().await;
2273 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
2275 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
2278}