1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48 InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49 UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65#[derive(Debug)]
67pub struct RegionUsage {
68 pub region_id: RegionId,
69 pub wal_usage: u64,
70 pub sst_usage: u64,
71 pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75 pub fn disk_usage(&self) -> u64 {
76 self.wal_usage + self.sst_usage + self.manifest_usage
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82 Writable,
84 Staging,
86 EnteringStaging,
88 Altering,
90 Dropping,
92 Truncating,
94 Editing,
96 Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102 Leader(RegionLeaderState),
103 Follower,
104}
105
106impl RegionRoleState {
107 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109 match self {
110 RegionRoleState::Leader(leader_state) => Some(leader_state),
111 RegionRoleState::Follower => None,
112 }
113 }
114}
115
116#[derive(Debug)]
122pub struct MitoRegion {
123 pub(crate) region_id: RegionId,
128
129 pub(crate) version_control: VersionControlRef,
133 pub(crate) access_layer: AccessLayerRef,
135 pub(crate) manifest_ctx: ManifestContextRef,
137 pub(crate) file_purger: FilePurgerRef,
139 pub(crate) provider: Provider,
141 last_flush_millis: AtomicI64,
143 last_compaction_millis: AtomicI64,
145 time_provider: TimeProviderRef,
147 pub(crate) topic_latest_entry_id: AtomicU64,
157 pub(crate) written_bytes: Arc<AtomicU64>,
159 stats: ManifestStats,
161}
162
163pub type MitoRegionRef = Arc<MitoRegion>;
164
165#[derive(Debug, Clone)]
166pub(crate) struct StagingPartitionInfo {
167 pub(crate) partition_directive: StagingPartitionDirective,
168 pub(crate) partition_rule_version: u64,
169}
170
171impl StagingPartitionInfo {
172 pub(crate) fn partition_expr(&self) -> Option<&str> {
174 self.partition_directive.partition_expr()
175 }
176
177 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
179 let partition_rule_version = match &partition_directive {
180 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
181 partition_expr_version(Some(expr))
182 }
183 StagingPartitionDirective::RejectAllWrites => 0,
184 };
185 Self {
186 partition_directive,
187 partition_rule_version,
188 }
189 }
190}
191
192impl MitoRegion {
193 pub(crate) async fn stop(&self) {
195 self.manifest_ctx
196 .manifest_manager
197 .write()
198 .await
199 .stop()
200 .await;
201
202 info!(
203 "Stopped region manifest manager, region_id: {}",
204 self.region_id
205 );
206 }
207
208 pub fn metadata(&self) -> RegionMetadataRef {
210 let version_data = self.version_control.current();
211 version_data.version.metadata.clone()
212 }
213
214 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
216 let version_data = self.version_control.current();
217 version_data.version.metadata.primary_key_encoding
218 }
219
220 pub(crate) fn version(&self) -> VersionRef {
222 let version_data = self.version_control.current();
223 version_data.version
224 }
225
226 pub(crate) fn last_flush_millis(&self) -> i64 {
228 self.last_flush_millis.load(Ordering::Relaxed)
229 }
230
231 pub(crate) fn update_flush_millis(&self) {
233 let now = self.time_provider.current_time_millis();
234 self.last_flush_millis.store(now, Ordering::Relaxed);
235 }
236
237 pub(crate) fn last_compaction_millis(&self) -> i64 {
239 self.last_compaction_millis.load(Ordering::Relaxed)
240 }
241
242 pub(crate) fn update_compaction_millis(&self) {
244 let now = self.time_provider.current_time_millis();
245 self.last_compaction_millis.store(now, Ordering::Relaxed);
246 }
247
248 pub(crate) fn table_dir(&self) -> &str {
250 self.access_layer.table_dir()
251 }
252
253 pub(crate) fn path_type(&self) -> PathType {
255 self.access_layer.path_type()
256 }
257
258 pub(crate) fn is_writable(&self) -> bool {
260 matches!(
261 self.manifest_ctx.state.load(),
262 RegionRoleState::Leader(RegionLeaderState::Writable)
263 | RegionRoleState::Leader(RegionLeaderState::Staging)
264 )
265 }
266
267 pub(crate) fn is_flushable(&self) -> bool {
269 matches!(
270 self.manifest_ctx.state.load(),
271 RegionRoleState::Leader(RegionLeaderState::Writable)
272 | RegionRoleState::Leader(RegionLeaderState::Staging)
273 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
274 )
275 }
276
277 pub(crate) fn should_abort_index(&self) -> bool {
279 matches!(
280 self.manifest_ctx.state.load(),
281 RegionRoleState::Follower
282 | RegionRoleState::Leader(RegionLeaderState::Dropping)
283 | RegionRoleState::Leader(RegionLeaderState::Truncating)
284 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
285 | RegionRoleState::Leader(RegionLeaderState::Staging)
286 )
287 }
288
289 pub(crate) fn is_downgrading(&self) -> bool {
291 matches!(
292 self.manifest_ctx.state.load(),
293 RegionRoleState::Leader(RegionLeaderState::Downgrading)
294 )
295 }
296
297 pub(crate) fn is_staging(&self) -> bool {
299 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
300 }
301
302 pub(crate) fn is_enter_staging(&self) -> bool {
304 self.manifest_ctx.state.load()
305 == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
306 }
307
308 pub fn region_id(&self) -> RegionId {
309 self.region_id
310 }
311
312 pub fn find_committed_sequence(&self) -> SequenceNumber {
313 self.version_control.committed_sequence()
314 }
315
316 pub fn flushed_sequence(&self) -> SequenceNumber {
322 self.version_control.current().version.flushed_sequence
323 }
324
325 pub fn is_follower(&self) -> bool {
327 self.manifest_ctx.state.load() == RegionRoleState::Follower
328 }
329
330 pub(crate) fn state(&self) -> RegionRoleState {
332 self.manifest_ctx.state.load()
333 }
334
335 pub(crate) fn set_role(&self, next_role: RegionRole) {
337 self.manifest_ctx.set_role(next_role, self.region_id);
338 }
339
340 pub(crate) fn region_role(&self) -> RegionRole {
341 match self.state() {
342 RegionRoleState::Follower => RegionRole::Follower,
343 RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
344 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
345 RegionRole::DowngradingLeader
346 }
347 RegionRoleState::Leader(_) => RegionRole::Leader,
348 }
349 }
350
351 pub(crate) fn set_altering(&self) -> Result<()> {
354 self.compare_exchange_state(
355 RegionLeaderState::Writable,
356 RegionRoleState::Leader(RegionLeaderState::Altering),
357 )
358 }
359
360 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
363 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
364 }
365
366 pub(crate) fn set_truncating(&self) -> Result<()> {
369 self.compare_exchange_state(
370 RegionLeaderState::Writable,
371 RegionRoleState::Leader(RegionLeaderState::Truncating),
372 )
373 }
374
375 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
378 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
379 }
380
381 pub(crate) async fn set_staging(
387 &self,
388 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
389 ) -> Result<()> {
390 manager.store().clear_staging_manifests().await?;
391
392 self.compare_exchange_state(
393 RegionLeaderState::Writable,
394 RegionRoleState::Leader(RegionLeaderState::Staging),
395 )
396 }
397
398 pub(crate) fn set_entering_staging(&self) -> Result<()> {
400 self.compare_exchange_state(
401 RegionLeaderState::Writable,
402 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
403 )
404 }
405
406 pub fn exit_staging(&self) -> Result<()> {
411 self.manifest_ctx.exit_staging(
412 self.region_id,
413 RegionRoleState::Leader(RegionLeaderState::Writable),
414 )
415 }
416
417 pub(crate) async fn set_role_state_gracefully(
419 &self,
420 state: SettableRegionRoleState,
421 ) -> Result<()> {
422 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
423 self.manifest_ctx.manifest_manager.write().await;
424 let current_state = self.state();
425
426 match state {
427 SettableRegionRoleState::Leader => {
428 match current_state {
431 RegionRoleState::Leader(RegionLeaderState::Staging) => {
432 info!("Exiting staging mode for region {}", self.region_id);
433 self.exit_staging_on_success(&mut manager).await?;
435 }
436 RegionRoleState::Leader(RegionLeaderState::Writable) => {
437 info!("Region {} already in normal leader mode", self.region_id);
439 }
440 _ => {
441 return Err(RegionStateSnafu {
443 region_id: self.region_id,
444 state: current_state,
445 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
446 }
447 .build());
448 }
449 }
450 }
451
452 SettableRegionRoleState::StagingLeader => {
453 match current_state {
456 RegionRoleState::Leader(RegionLeaderState::Writable) => {
457 info!("Entering staging mode for region {}", self.region_id);
458 self.set_staging(&mut manager).await?;
459 }
460 RegionRoleState::Leader(RegionLeaderState::Staging) => {
461 info!("Region {} already in staging mode", self.region_id);
463 }
464 _ => {
465 return Err(RegionStateSnafu {
466 region_id: self.region_id,
467 state: current_state,
468 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
469 }
470 .build());
471 }
472 }
473 }
474
475 SettableRegionRoleState::Follower => {
476 match current_state {
478 RegionRoleState::Leader(RegionLeaderState::Staging) => {
479 info!(
480 "Exiting staging and demoting region {} to follower",
481 self.region_id
482 );
483 self.exit_staging()?;
484 self.set_role(RegionRole::Follower);
485 }
486 RegionRoleState::Leader(_) => {
487 info!("Demoting region {} from leader to follower", self.region_id);
488 self.set_role(RegionRole::Follower);
489 }
490 RegionRoleState::Follower => {
491 info!("Region {} already in follower mode", self.region_id);
493 }
494 }
495 }
496
497 SettableRegionRoleState::DowngradingLeader => {
498 match current_state {
500 RegionRoleState::Leader(RegionLeaderState::Staging) => {
501 info!(
502 "Exiting staging and entering downgrade for region {}",
503 self.region_id
504 );
505 self.exit_staging()?;
506 self.set_role(RegionRole::DowngradingLeader);
507 }
508 RegionRoleState::Leader(RegionLeaderState::Writable) => {
509 info!("Starting downgrade for region {}", self.region_id);
510 self.set_role(RegionRole::DowngradingLeader);
511 }
512 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
513 info!("Region {} already in downgrading mode", self.region_id);
515 }
516 _ => {
517 warn!(
518 "Cannot start downgrade for region {} from state {:?}",
519 self.region_id, current_state
520 );
521 }
522 }
523 }
524 }
525
526 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
528 let manifest_meta = &manager.manifest().metadata;
530 let current_version = self.version();
531 let current_meta = ¤t_version.metadata;
532 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
533 let action = RegionMetaAction::Change(RegionChange {
534 metadata: current_meta.clone(),
535 sst_format: current_version.options.sst_format.unwrap_or_default(),
536 append_mode: None,
537 });
538 let result = manager
539 .update(RegionMetaActionList::with_action(action), false)
540 .await;
541
542 match result {
543 Ok(version) => {
544 info!(
545 "Successfully persisted backfilled metadata for region {}, version: {}",
546 self.region_id, version
547 );
548 }
549 Err(e) => {
550 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
551 }
552 }
553 }
554 }
555
556 drop(manager);
557
558 Ok(())
559 }
560
561 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
564 if let Err(e) = self
565 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
566 {
567 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
568 }
569 }
570
571 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
574 if let Err(e) =
575 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
576 {
577 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
578 }
579 }
580
581 pub(crate) fn region_statistic(&self) -> RegionStatistic {
583 let version = self.version();
584 let memtables = &version.memtables;
585 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
586
587 let sst_usage = version.ssts.sst_usage();
588 let index_usage = version.ssts.index_usage();
589 let flushed_entry_id = version.flushed_entry_id;
590
591 let wal_usage = self.estimated_wal_usage(memtable_usage);
592 let manifest_usage = self.stats.total_manifest_size();
593 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
594 let num_files = version.ssts.num_files();
595 let manifest_version = self.stats.manifest_version();
596 let file_removed_cnt = self.stats.file_removed_cnt();
597
598 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
599 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
600
601 RegionStatistic {
602 num_rows,
603 memtable_size: memtable_usage,
604 wal_size: wal_usage,
605 manifest_size: manifest_usage,
606 sst_size: sst_usage,
607 sst_num: num_files,
608 index_size: index_usage,
609 manifest: RegionManifestInfo::Mito {
610 manifest_version,
611 flushed_entry_id,
612 file_removed_cnt,
613 },
614 data_topic_latest_entry_id: topic_latest_entry_id,
615 metadata_topic_latest_entry_id: topic_latest_entry_id,
616 written_bytes,
617 }
618 }
619
620 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
623 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
624 }
625
626 fn compare_exchange_state(
629 &self,
630 expect: RegionLeaderState,
631 state: RegionRoleState,
632 ) -> Result<()> {
633 self.manifest_ctx
634 .state
635 .compare_exchange(RegionRoleState::Leader(expect), state)
636 .map_err(|actual| {
637 RegionStateSnafu {
638 region_id: self.region_id,
639 state: actual,
640 expect: RegionRoleState::Leader(expect),
641 }
642 .build()
643 })?;
644 Ok(())
645 }
646
647 pub fn access_layer(&self) -> AccessLayerRef {
648 self.access_layer.clone()
649 }
650
651 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
653 let table_dir = self.table_dir();
654 let path_type = self.access_layer.path_type();
655
656 let visible_ssts = self
657 .version()
658 .ssts
659 .levels()
660 .iter()
661 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
662 .collect::<HashSet<_>>();
663
664 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
665 let staging_files = self
666 .manifest_ctx
667 .staging_manifest()
668 .await
669 .map(|m| m.files.clone())
670 .unwrap_or_default();
671 let files = manifest_files
672 .into_iter()
673 .chain(staging_files)
674 .collect::<HashMap<_, _>>();
675
676 files
677 .values()
678 .map(|meta| {
679 let region_id = self.region_id;
680 let origin_region_id = meta.region_id;
681 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
682 {
683 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
684 (
685 meta.index_version,
686 Some(index_file_path),
687 Some(meta.index_file_size),
688 )
689 } else {
690 (0, None, None)
691 };
692 let visible = visible_ssts.contains(&meta.file_id);
693 ManifestSstEntry {
694 table_dir: table_dir.to_string(),
695 region_id,
696 table_id: region_id.table_id(),
697 region_number: region_id.region_number(),
698 region_group: region_id.region_group(),
699 region_sequence: region_id.region_sequence(),
700 file_id: meta.file_id.to_string(),
701 index_version,
702 level: meta.level,
703 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
704 file_size: meta.file_size,
705 index_file_path,
706 index_file_size,
707 num_rows: meta.num_rows,
708 num_row_groups: meta.num_row_groups,
709 num_series: Some(meta.num_series),
710 min_ts: meta.time_range.0,
711 max_ts: meta.time_range.1,
712 sequence: meta.sequence.map(|s| s.get()),
713 origin_region_id,
714 node_id: None,
715 visible,
716 }
717 })
718 .collect()
719 }
720
721 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
723 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
724
725 file_ids
726 .iter()
727 .map(|file_id| manifest_files.get(file_id).cloned())
728 .collect::<Vec<_>>()
729 }
730
731 pub(crate) async fn exit_staging_on_success(
733 &self,
734 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
735 ) -> Result<()> {
736 let current_state = self.manifest_ctx.current_state();
737 ensure!(
738 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
739 RegionStateSnafu {
740 region_id: self.region_id,
741 state: current_state,
742 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
743 }
744 );
745
746 let merged_actions = match manager.merge_staged_actions(current_state).await? {
748 Some(actions) => actions,
749 None => {
750 info!(
751 "No staged manifests to merge for region {}, exiting staging mode without changes",
752 self.region_id
753 );
754 self.exit_staging()?;
756 return Ok(());
757 }
758 };
759 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
760 let expect_partition_expr_change = merged_actions
761 .actions
762 .iter()
763 .any(|a| a.is_partition_expr_change());
764 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
765 ensure!(
766 !(expect_change && expect_partition_expr_change),
767 UnexpectedSnafu {
768 reason: "unexpected both change and partition expr change actions in merged actions"
769 }
770 );
771 ensure!(
772 expect_change || expect_partition_expr_change,
773 UnexpectedSnafu {
774 reason: "expect a change or partition expr change action in merged actions"
775 }
776 );
777 ensure!(
778 expect_edit,
779 UnexpectedSnafu {
780 reason: "expect an edit action in merged actions"
781 }
782 );
783
784 let (merged_partition_expr_change, merged_change, merged_edit) =
785 merged_actions.clone().split_region_change_and_edit();
786 if let Some(change) = &merged_change {
787 let current_column_metadatas = &self.version().metadata.column_metadatas;
791 ensure!(
792 change.metadata.column_metadatas == *current_column_metadatas,
793 UnexpectedSnafu {
794 reason: "change action alters column metadata in staging exit"
795 }
796 );
797 }
798
799 let new_version = manager.update(merged_actions, false).await?;
802 info!(
803 "Successfully submitted merged staged manifests for region {}, new version: {}",
804 self.region_id, new_version
805 );
806
807 if let Some(change) = merged_partition_expr_change {
809 let mut new_metadata = self.version().metadata.as_ref().clone();
810 new_metadata.set_partition_expr(change.partition_expr);
811 self.version_control.alter_metadata(new_metadata.into());
812 }
813 if let Some(change) = merged_change {
814 self.version_control.alter_metadata(change.metadata);
815 }
816 self.version_control
817 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
818
819 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
821 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
822 }
823 self.exit_staging()?;
824
825 Ok(())
826 }
827
828 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
834 let is_staging = self.is_staging();
835 if is_staging {
836 let staging_partition_info = self.manifest_ctx.staging_partition_info();
837 if staging_partition_info.is_none() {
838 warn!(
839 "Staging partition expr is none for region {} in staging state",
840 self.region_id
841 );
842 }
843 staging_partition_info
844 .as_ref()
845 .and_then(|info| info.partition_expr().map(ToString::to_string))
846 } else {
847 let version = self.version();
848 version.metadata.partition_expr.clone()
849 }
850 }
851
852 pub fn expected_partition_expr_version(&self) -> u64 {
853 if self.is_staging() {
854 self.manifest_ctx
855 .staging_partition_info()
856 .as_ref()
857 .map(|info| info.partition_rule_version)
858 .unwrap_or_default()
859 } else {
860 self.version().metadata.partition_expr_version
861 }
862 }
863
864 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
866 if !self.is_staging() {
867 return false;
868 }
869 self.manifest_ctx
870 .staging_partition_info()
871 .as_ref()
872 .map(|info| {
873 matches!(
874 info.partition_directive,
875 StagingPartitionDirective::RejectAllWrites
876 )
877 })
878 .unwrap_or(false)
879 }
880}
881
882#[derive(Debug)]
884pub(crate) struct ManifestContext {
885 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
887 state: AtomicCell<RegionRoleState>,
890 staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
895}
896
897impl ManifestContext {
898 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
899 ManifestContext {
900 manifest_manager: tokio::sync::RwLock::new(manager),
901 state: AtomicCell::new(state),
902 staging_partition_info: Mutex::new(None),
903 }
904 }
905
906 pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
907 self.staging_partition_info.lock().unwrap().clone()
908 }
909
910 pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
911 let mut current = self.staging_partition_info.lock().unwrap();
912 debug_assert!(current.is_none());
913 *current = Some(staging_partition_info);
914 }
915
916 fn clear_staging_partition_info(&self) {
917 *self.staging_partition_info.lock().unwrap() = None;
918 }
919
920 pub(crate) fn exit_staging(
921 &self,
922 region_id: RegionId,
923 next_state: RegionRoleState,
924 ) -> Result<()> {
925 self.state
926 .compare_exchange(
927 RegionRoleState::Leader(RegionLeaderState::Staging),
928 next_state,
929 )
930 .map_err(|actual| {
931 RegionStateSnafu {
932 region_id,
933 state: actual,
934 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
935 }
936 .build()
937 })?;
938 self.clear_staging_partition_info();
939 Ok(())
940 }
941
942 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
943 self.manifest_manager
944 .read()
945 .await
946 .manifest()
947 .manifest_version
948 }
949
950 pub(crate) async fn has_update(&self) -> Result<bool> {
951 self.manifest_manager.read().await.has_update().await
952 }
953
954 pub(crate) fn current_state(&self) -> RegionRoleState {
956 self.state.load()
957 }
958
959 pub(crate) async fn install_manifest_to(
965 &self,
966 version: ManifestVersion,
967 ) -> Result<Arc<RegionManifest>> {
968 let mut manager = self.manifest_manager.write().await;
969 manager.install_manifest_to(version).await?;
970
971 Ok(manager.manifest())
972 }
973
974 pub(crate) async fn update_manifest(
976 &self,
977 expect_state: RegionLeaderState,
978 action_list: RegionMetaActionList,
979 is_staging: bool,
980 ) -> Result<ManifestVersion> {
981 let mut manager = self.manifest_manager.write().await;
983 let manifest = manager.manifest();
985 let current_state = self.state.load();
988
989 if expect_state != RegionLeaderState::Downgrading {
994 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
995 info!(
996 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
997 manifest.metadata.region_id, expect_state
998 );
999 }
1000 ensure!(
1001 current_state == RegionRoleState::Leader(expect_state)
1002 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
1003 UpdateManifestSnafu {
1004 region_id: manifest.metadata.region_id,
1005 state: current_state,
1006 }
1007 );
1008 } else {
1009 ensure!(
1010 current_state == RegionRoleState::Leader(expect_state),
1011 RegionStateSnafu {
1012 region_id: manifest.metadata.region_id,
1013 state: current_state,
1014 expect: RegionRoleState::Leader(expect_state),
1015 }
1016 );
1017 }
1018
1019 for action in &action_list.actions {
1020 let RegionMetaAction::Edit(edit) = &action else {
1022 continue;
1023 };
1024
1025 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1027 continue;
1028 };
1029
1030 if let Some(flushed_entry_id) = edit.flushed_entry_id {
1032 let is_newer_entry = truncated_entry_id < flushed_entry_id;
1042 let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1043 && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1044 manifest.flushed_sequence < flushed_sequence
1045 });
1046
1047 ensure!(
1048 is_newer_entry || is_same_entry_with_newer_sequence,
1049 RegionTruncatedSnafu {
1050 region_id: manifest.metadata.region_id,
1051 }
1052 );
1053 }
1054
1055 if !edit.files_to_remove.is_empty() {
1057 for file in &edit.files_to_remove {
1059 ensure!(
1060 manifest.files.contains_key(&file.file_id),
1061 RegionTruncatedSnafu {
1062 region_id: manifest.metadata.region_id,
1063 }
1064 );
1065 }
1066 }
1067 }
1068
1069 let version = manager.update(action_list, is_staging).await.inspect_err(
1071 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1072 )?;
1073
1074 if self.state.load() == RegionRoleState::Follower {
1075 warn!(
1076 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1077 manifest.metadata.region_id
1078 );
1079 }
1080
1081 Ok(version)
1082 }
1083
1084 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1118 match next_role {
1119 RegionRole::Follower => {
1120 if self
1121 .exit_staging(region_id, RegionRoleState::Follower)
1122 .is_ok()
1123 {
1124 info!(
1125 "Convert region {} to follower, previous role state: {:?}",
1126 region_id,
1127 RegionRoleState::Leader(RegionLeaderState::Staging)
1128 );
1129 return;
1130 }
1131 match self.state.fetch_update(|state| {
1132 if !matches!(state, RegionRoleState::Follower) {
1133 Some(RegionRoleState::Follower)
1134 } else {
1135 None
1136 }
1137 }) {
1138 Ok(state) => info!(
1139 "Convert region {} to follower, previous role state: {:?}",
1140 region_id, state
1141 ),
1142 Err(state) => {
1143 if state != RegionRoleState::Follower {
1144 warn!(
1145 "Failed to convert region {} to follower, current role state: {:?}",
1146 region_id, state
1147 )
1148 }
1149 }
1150 }
1151 }
1152 RegionRole::Leader => {
1153 if self
1154 .exit_staging(
1155 region_id,
1156 RegionRoleState::Leader(RegionLeaderState::Writable),
1157 )
1158 .is_ok()
1159 {
1160 info!(
1161 "Convert region {} to leader, previous role state: {:?}",
1162 region_id,
1163 RegionRoleState::Leader(RegionLeaderState::Staging)
1164 );
1165 return;
1166 }
1167 match self.state.fetch_update(|state| {
1168 if matches!(
1169 state,
1170 RegionRoleState::Follower
1171 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1172 ) {
1173 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1174 } else {
1175 None
1176 }
1177 }) {
1178 Ok(state) => info!(
1179 "Convert region {} to leader, previous role state: {:?}",
1180 region_id, state
1181 ),
1182 Err(state) => {
1183 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1184 warn!(
1185 "Failed to convert region {} to leader, current role state: {:?}",
1186 region_id, state
1187 )
1188 }
1189 }
1190 }
1191 }
1192 RegionRole::StagingLeader => {
1193 info!(
1194 "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1195 region_id
1196 );
1197 }
1198 RegionRole::DowngradingLeader => {
1199 if self
1200 .exit_staging(
1201 region_id,
1202 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1203 )
1204 .is_ok()
1205 {
1206 info!(
1207 "Convert region {} to downgrading region, previous role state: {:?}",
1208 region_id,
1209 RegionRoleState::Leader(RegionLeaderState::Staging)
1210 );
1211 return;
1212 }
1213 match self.state.compare_exchange(
1214 RegionRoleState::Leader(RegionLeaderState::Writable),
1215 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1216 ) {
1217 Ok(state) => info!(
1218 "Convert region {} to downgrading region, previous role state: {:?}",
1219 region_id, state
1220 ),
1221 Err(state) => {
1222 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1223 warn!(
1224 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1225 region_id, state
1226 )
1227 }
1228 }
1229 }
1230 }
1231 }
1232 }
1233
1234 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1236 self.manifest_manager.read().await.manifest()
1237 }
1238
1239 pub(crate) async fn staging_manifest(
1241 &self,
1242 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1243 self.manifest_manager.read().await.staging_manifest()
1244 }
1245}
1246
1247pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1248
1249#[derive(Debug, Default)]
1251pub(crate) struct RegionMap {
1252 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1253}
1254
1255impl RegionMap {
1256 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1258 let regions = self.regions.read().unwrap();
1259 regions.contains_key(®ion_id)
1260 }
1261
1262 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1264 let mut regions = self.regions.write().unwrap();
1265 regions.insert(region.region_id, region);
1266 }
1267
1268 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1270 let regions = self.regions.read().unwrap();
1271 regions.get(®ion_id).cloned()
1272 }
1273
1274 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1278 let region = self
1279 .get_region(region_id)
1280 .context(RegionNotFoundSnafu { region_id })?;
1281 ensure!(
1282 region.is_writable(),
1283 RegionStateSnafu {
1284 region_id,
1285 state: region.state(),
1286 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1287 }
1288 );
1289 Ok(region)
1290 }
1291
1292 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1296 let region = self
1297 .get_region(region_id)
1298 .context(RegionNotFoundSnafu { region_id })?;
1299 ensure!(
1300 region.is_follower(),
1301 RegionStateSnafu {
1302 region_id,
1303 state: region.state(),
1304 expect: RegionRoleState::Follower,
1305 }
1306 );
1307
1308 Ok(region)
1309 }
1310
1311 pub(crate) fn get_region_or<F: OnFailure>(
1315 &self,
1316 region_id: RegionId,
1317 cb: &mut F,
1318 ) -> Option<MitoRegionRef> {
1319 match self
1320 .get_region(region_id)
1321 .context(RegionNotFoundSnafu { region_id })
1322 {
1323 Ok(region) => Some(region),
1324 Err(e) => {
1325 cb.on_failure(e);
1326 None
1327 }
1328 }
1329 }
1330
1331 pub(crate) fn writable_region_or<F: OnFailure>(
1335 &self,
1336 region_id: RegionId,
1337 cb: &mut F,
1338 ) -> Option<MitoRegionRef> {
1339 match self.writable_region(region_id) {
1340 Ok(region) => Some(region),
1341 Err(e) => {
1342 cb.on_failure(e);
1343 None
1344 }
1345 }
1346 }
1347
1348 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1352 let region = self.writable_region(region_id)?;
1353 if region.is_staging() {
1354 return Err(crate::error::RegionStateSnafu {
1355 region_id,
1356 state: region.state(),
1357 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1358 }
1359 .build());
1360 }
1361 Ok(region)
1362 }
1363
1364 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1368 let region = self
1369 .get_region(region_id)
1370 .context(RegionNotFoundSnafu { region_id })?;
1371 ensure!(
1372 region.is_staging(),
1373 RegionStateSnafu {
1374 region_id,
1375 state: region.state(),
1376 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1377 }
1378 );
1379 Ok(region)
1380 }
1381
1382 fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1387 let region = self
1388 .get_region(region_id)
1389 .context(RegionNotFoundSnafu { region_id })?;
1390 if region.is_flushable() {
1391 Ok(Some(region))
1392 } else {
1393 Ok(None)
1394 }
1395 }
1396
1397 pub(crate) fn flushable_region_or<F: OnFailure>(
1402 &self,
1403 region_id: RegionId,
1404 cb: &mut F,
1405 ) -> Option<MitoRegionRef> {
1406 match self.flushable_region(region_id) {
1407 Ok(region) => region,
1408 Err(e) => {
1409 cb.on_failure(e);
1410 None
1411 }
1412 }
1413 }
1414
1415 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1417 let mut regions = self.regions.write().unwrap();
1418 regions.remove(®ion_id)
1419 }
1420
1421 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1423 let regions = self.regions.read().unwrap();
1424 regions.values().cloned().collect()
1425 }
1426
1427 pub(crate) fn clear(&self) {
1429 self.regions.write().unwrap().clear();
1430 }
1431}
1432
1433pub(crate) type RegionMapRef = Arc<RegionMap>;
1434
1435#[derive(Debug, Default)]
1437pub(crate) struct OpeningRegions {
1438 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1439}
1440
1441impl OpeningRegions {
1442 pub(crate) fn wait_for_opening_region(
1444 &self,
1445 region_id: RegionId,
1446 sender: OptionOutputTx,
1447 ) -> Option<OptionOutputTx> {
1448 let mut regions = self.regions.write().unwrap();
1449 match regions.entry(region_id) {
1450 Entry::Occupied(mut senders) => {
1451 senders.get_mut().push(sender);
1452 None
1453 }
1454 Entry::Vacant(_) => Some(sender),
1455 }
1456 }
1457
1458 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1460 let regions = self.regions.read().unwrap();
1461 regions.contains_key(®ion_id)
1462 }
1463
1464 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1466 let mut regions = self.regions.write().unwrap();
1467 regions.insert(region, vec![sender]);
1468 }
1469
1470 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1472 let mut regions = self.regions.write().unwrap();
1473 regions.remove(®ion_id).unwrap_or_default()
1474 }
1475
1476 #[cfg(test)]
1477 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1478 let regions = self.regions.read().unwrap();
1479 if let Some(senders) = regions.get(®ion_id) {
1480 senders.len()
1481 } else {
1482 0
1483 }
1484 }
1485}
1486
1487pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1488
1489#[derive(Debug, Default)]
1491pub(crate) struct CatchupRegions {
1492 regions: RwLock<HashSet<RegionId>>,
1493}
1494
1495impl CatchupRegions {
1496 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1498 let regions = self.regions.read().unwrap();
1499 regions.contains(®ion_id)
1500 }
1501
1502 pub(crate) fn insert_region(&self, region_id: RegionId) {
1504 let mut regions = self.regions.write().unwrap();
1505 regions.insert(region_id);
1506 }
1507
1508 pub(crate) fn remove_region(&self, region_id: RegionId) {
1510 let mut regions = self.regions.write().unwrap();
1511 regions.remove(®ion_id);
1512 }
1513}
1514
1515pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1516
1517#[derive(Default, Debug, Clone)]
1519pub struct ManifestStats {
1520 pub(crate) total_manifest_size: Arc<AtomicU64>,
1521 pub(crate) manifest_version: Arc<AtomicU64>,
1522 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1523}
1524
1525impl ManifestStats {
1526 fn total_manifest_size(&self) -> u64 {
1527 self.total_manifest_size.load(Ordering::Relaxed)
1528 }
1529
1530 fn manifest_version(&self) -> u64 {
1531 self.manifest_version.load(Ordering::Relaxed)
1532 }
1533
1534 fn file_removed_cnt(&self) -> u64 {
1535 self.file_removed_cnt.load(Ordering::Relaxed)
1536 }
1537}
1538
1539pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1541 match partition_expr_str {
1542 None => Ok(None),
1543 Some("") => Ok(None),
1544 Some(json_str) => {
1545 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1546 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1547 Ok(expr)
1548 }
1549 }
1550}
1551
1552#[cfg(test)]
1553mod tests {
1554 use std::sync::Arc;
1555 use std::sync::atomic::AtomicU64;
1556
1557 use common_datasource::compression::CompressionType;
1558 use common_test_util::temp_dir::create_temp_dir;
1559 use crossbeam_utils::atomic::AtomicCell;
1560 use object_store::ObjectStore;
1561 use object_store::services::Fs;
1562 use store_api::logstore::provider::Provider;
1563 use store_api::region_engine::RegionRole;
1564 use store_api::region_request::PathType;
1565 use store_api::storage::RegionId;
1566
1567 use crate::access_layer::AccessLayer;
1568 use crate::error::Error;
1569 use crate::manifest::action::{
1570 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1571 };
1572 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1573 use crate::region::{
1574 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1575 };
1576 use crate::sst::FormatType;
1577 use crate::sst::index::intermediate::IntermediateManager;
1578 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1579 use crate::test_util::scheduler_util::SchedulerEnv;
1580 use crate::test_util::version_util::VersionControlBuilder;
1581 use crate::time_provider::StdTimeProvider;
1582
1583 #[test]
1584 fn test_region_state_lock_free() {
1585 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1586 }
1587
1588 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1589 let builder = VersionControlBuilder::new();
1590 let version_control = Arc::new(builder.build());
1591 let metadata = version_control.current().version.metadata.clone();
1592
1593 let manager = RegionManifestManager::new(
1594 metadata.clone(),
1595 0,
1596 RegionManifestOptions {
1597 manifest_dir: "".to_string(),
1598 object_store: env.access_layer.object_store().clone(),
1599 compress_type: CompressionType::Uncompressed,
1600 checkpoint_distance: 10,
1601 remove_file_options: Default::default(),
1602 manifest_cache: None,
1603 },
1604 FormatType::PrimaryKey,
1605 &Default::default(),
1606 )
1607 .await
1608 .unwrap();
1609
1610 let manifest_ctx = Arc::new(ManifestContext::new(
1611 manager,
1612 RegionRoleState::Leader(RegionLeaderState::Writable),
1613 ));
1614
1615 MitoRegion {
1616 region_id: metadata.region_id,
1617 version_control,
1618 access_layer: env.access_layer.clone(),
1619 manifest_ctx,
1620 file_purger: crate::test_util::new_noop_file_purger(),
1621 provider: Provider::noop_provider(),
1622 last_flush_millis: Default::default(),
1623 last_compaction_millis: Default::default(),
1624 time_provider: Arc::new(StdTimeProvider),
1625 topic_latest_entry_id: Default::default(),
1626 written_bytes: Arc::new(AtomicU64::new(0)),
1627 stats: ManifestStats::default(),
1628 }
1629 }
1630
1631 fn empty_edit() -> RegionEdit {
1632 RegionEdit {
1633 files_to_add: Vec::new(),
1634 files_to_remove: Vec::new(),
1635 timestamp_ms: None,
1636 compaction_time_window: None,
1637 flushed_entry_id: None,
1638 flushed_sequence: None,
1639 committed_sequence: None,
1640 }
1641 }
1642
1643 #[tokio::test]
1644 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1645 let env = SchedulerEnv::new().await;
1646 let region = build_test_region(&env).await;
1647
1648 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1649 region.set_staging(&mut manager).await.unwrap();
1650 manager
1651 .update(
1652 RegionMetaActionList::new(vec![
1653 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1654 partition_expr: Some("expr_a".to_string()),
1655 }),
1656 RegionMetaAction::Edit(empty_edit()),
1657 ]),
1658 true,
1659 )
1660 .await
1661 .unwrap();
1662
1663 region.exit_staging_on_success(&mut manager).await.unwrap();
1664 drop(manager);
1665
1666 assert_eq!(
1667 region.version().metadata.partition_expr.as_deref(),
1668 Some("expr_a")
1669 );
1670 assert_eq!(
1671 region.state(),
1672 RegionRoleState::Leader(RegionLeaderState::Writable)
1673 );
1674 }
1675
1676 #[tokio::test]
1677 async fn test_exit_staging_change_with_same_columns_success() {
1678 let env = SchedulerEnv::new().await;
1679 let region = build_test_region(&env).await;
1680
1681 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1682 region.set_staging(&mut manager).await.unwrap();
1683
1684 let mut changed_metadata = region.version().metadata.as_ref().clone();
1685 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1686
1687 manager
1688 .update(
1689 RegionMetaActionList::new(vec![
1690 RegionMetaAction::Change(RegionChange {
1691 metadata: Arc::new(changed_metadata),
1692 sst_format: FormatType::PrimaryKey,
1693 append_mode: None,
1694 }),
1695 RegionMetaAction::Edit(empty_edit()),
1696 ]),
1697 true,
1698 )
1699 .await
1700 .unwrap();
1701
1702 region.exit_staging_on_success(&mut manager).await.unwrap();
1703 drop(manager);
1704
1705 assert_eq!(
1706 region.version().metadata.partition_expr.as_deref(),
1707 Some("expr_b")
1708 );
1709 assert_eq!(
1710 region.state(),
1711 RegionRoleState::Leader(RegionLeaderState::Writable)
1712 );
1713 }
1714
1715 #[tokio::test]
1716 async fn test_exit_staging_change_with_different_columns_fails() {
1717 let env = SchedulerEnv::new().await;
1718 let region = build_test_region(&env).await;
1719
1720 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1721 region.set_staging(&mut manager).await.unwrap();
1722
1723 let mut changed_metadata = region.version().metadata.as_ref().clone();
1724 changed_metadata.column_metadatas.rotate_left(1);
1725
1726 manager
1727 .update(
1728 RegionMetaActionList::new(vec![
1729 RegionMetaAction::Change(RegionChange {
1730 metadata: Arc::new(changed_metadata),
1731 sst_format: FormatType::PrimaryKey,
1732 append_mode: None,
1733 }),
1734 RegionMetaAction::Edit(empty_edit()),
1735 ]),
1736 true,
1737 )
1738 .await
1739 .unwrap();
1740
1741 let result = region.exit_staging_on_success(&mut manager).await;
1742 assert!(matches!(result, Err(Error::Unexpected { .. })));
1743 }
1744
1745 #[tokio::test]
1746 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1747 let env = SchedulerEnv::new().await;
1748 let region = build_test_region(&env).await;
1749
1750 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1751 region.set_staging(&mut manager).await.unwrap();
1752
1753 let mut changed_metadata = region.version().metadata.as_ref().clone();
1754 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1755
1756 manager
1757 .update(
1758 RegionMetaActionList::new(vec![
1759 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1760 partition_expr: Some("expr_c".to_string()),
1761 }),
1762 RegionMetaAction::Change(RegionChange {
1763 metadata: Arc::new(changed_metadata),
1764 sst_format: FormatType::PrimaryKey,
1765 append_mode: None,
1766 }),
1767 RegionMetaAction::Edit(empty_edit()),
1768 ]),
1769 true,
1770 )
1771 .await
1772 .unwrap();
1773
1774 let result = region.exit_staging_on_success(&mut manager).await;
1775 assert!(matches!(result, Err(Error::Unexpected { .. })));
1776 }
1777
1778 #[tokio::test]
1779 async fn test_set_region_state() {
1780 let env = SchedulerEnv::new().await;
1781 let builder = VersionControlBuilder::new();
1782 let version_control = Arc::new(builder.build());
1783 let manifest_ctx = env
1784 .mock_manifest_context(version_control.current().version.metadata.clone())
1785 .await;
1786
1787 let region_id = RegionId::new(1024, 0);
1788 manifest_ctx.set_role(RegionRole::Follower, region_id);
1790 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1791
1792 manifest_ctx.set_role(RegionRole::Leader, region_id);
1794 assert_eq!(
1795 manifest_ctx.state.load(),
1796 RegionRoleState::Leader(RegionLeaderState::Writable)
1797 );
1798
1799 manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
1801 assert_eq!(
1802 manifest_ctx.state.load(),
1803 RegionRoleState::Leader(RegionLeaderState::Writable)
1804 );
1805
1806 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1808 assert_eq!(
1809 manifest_ctx.state.load(),
1810 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1811 );
1812
1813 manifest_ctx.set_role(RegionRole::Follower, region_id);
1815 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1816
1817 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1819 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1820
1821 manifest_ctx.set_role(RegionRole::Leader, region_id);
1823 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1824 assert_eq!(
1825 manifest_ctx.state.load(),
1826 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1827 );
1828
1829 manifest_ctx.set_role(RegionRole::Leader, region_id);
1831 assert_eq!(
1832 manifest_ctx.state.load(),
1833 RegionRoleState::Leader(RegionLeaderState::Writable)
1834 );
1835 }
1836
1837 #[tokio::test]
1838 async fn test_staging_state_validation() {
1839 let env = SchedulerEnv::new().await;
1840 let builder = VersionControlBuilder::new();
1841 let version_control = Arc::new(builder.build());
1842
1843 let staging_ctx = {
1845 let manager = RegionManifestManager::new(
1846 version_control.current().version.metadata.clone(),
1847 0,
1848 RegionManifestOptions {
1849 manifest_dir: "".to_string(),
1850 object_store: env.access_layer.object_store().clone(),
1851 compress_type: CompressionType::Uncompressed,
1852 checkpoint_distance: 10,
1853 remove_file_options: Default::default(),
1854 manifest_cache: None,
1855 },
1856 FormatType::PrimaryKey,
1857 &Default::default(),
1858 )
1859 .await
1860 .unwrap();
1861 Arc::new(ManifestContext::new(
1862 manager,
1863 RegionRoleState::Leader(RegionLeaderState::Staging),
1864 ))
1865 };
1866
1867 assert_eq!(
1869 staging_ctx.current_state(),
1870 RegionRoleState::Leader(RegionLeaderState::Staging)
1871 );
1872
1873 let writable_ctx = env
1875 .mock_manifest_context(version_control.current().version.metadata.clone())
1876 .await;
1877
1878 assert_eq!(
1879 writable_ctx.current_state(),
1880 RegionRoleState::Leader(RegionLeaderState::Writable)
1881 );
1882 }
1883
1884 #[tokio::test]
1885 async fn test_staging_state_transitions() {
1886 let builder = VersionControlBuilder::new();
1887 let version_control = Arc::new(builder.build());
1888 let metadata = version_control.current().version.metadata.clone();
1889
1890 let temp_dir = create_temp_dir("");
1892 let path_str = temp_dir.path().display().to_string();
1893 let fs_builder = Fs::default().root(&path_str);
1894 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1895
1896 let index_aux_path = temp_dir.path().join("index_aux");
1897 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1898 .await
1899 .unwrap();
1900 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1901 .await
1902 .unwrap();
1903
1904 let access_layer = Arc::new(AccessLayer::new(
1905 "",
1906 PathType::Bare,
1907 object_store,
1908 puffin_mgr,
1909 intm_mgr,
1910 ));
1911
1912 let manager = RegionManifestManager::new(
1913 metadata.clone(),
1914 0,
1915 RegionManifestOptions {
1916 manifest_dir: "".to_string(),
1917 object_store: access_layer.object_store().clone(),
1918 compress_type: CompressionType::Uncompressed,
1919 checkpoint_distance: 10,
1920 remove_file_options: Default::default(),
1921 manifest_cache: None,
1922 },
1923 FormatType::PrimaryKey,
1924 &Default::default(),
1925 )
1926 .await
1927 .unwrap();
1928
1929 let manifest_ctx = Arc::new(ManifestContext::new(
1930 manager,
1931 RegionRoleState::Leader(RegionLeaderState::Writable),
1932 ));
1933
1934 let region = MitoRegion {
1935 region_id: metadata.region_id,
1936 version_control,
1937 access_layer,
1938 manifest_ctx: manifest_ctx.clone(),
1939 file_purger: crate::test_util::new_noop_file_purger(),
1940 provider: Provider::noop_provider(),
1941 last_flush_millis: Default::default(),
1942 last_compaction_millis: Default::default(),
1943 time_provider: Arc::new(StdTimeProvider),
1944 topic_latest_entry_id: Default::default(),
1945 written_bytes: Arc::new(AtomicU64::new(0)),
1946 stats: ManifestStats::default(),
1947 };
1948
1949 assert_eq!(
1951 region.state(),
1952 RegionRoleState::Leader(RegionLeaderState::Writable)
1953 );
1954 assert!(!region.is_staging());
1955
1956 let mut manager = manifest_ctx.manifest_manager.write().await;
1958 region.set_staging(&mut manager).await.unwrap();
1959 drop(manager);
1960 assert_eq!(
1961 region.state(),
1962 RegionRoleState::Leader(RegionLeaderState::Staging)
1963 );
1964 assert!(region.is_staging());
1965
1966 region.exit_staging().unwrap();
1968 assert_eq!(
1969 region.state(),
1970 RegionRoleState::Leader(RegionLeaderState::Writable)
1971 );
1972 assert!(!region.is_staging());
1973
1974 {
1976 let manager = manifest_ctx.manifest_manager.write().await;
1978 let dummy_actions = RegionMetaActionList::new(vec![]);
1979 let dummy_bytes = dummy_actions.encode().unwrap();
1980
1981 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1983 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1984 drop(manager);
1985
1986 let manager = manifest_ctx.manifest_manager.read().await;
1988 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1989 assert_eq!(
1990 dirty_manifests.len(),
1991 2,
1992 "Should have 2 dirty staging files"
1993 );
1994 drop(manager);
1995
1996 let mut manager = manifest_ctx.manifest_manager.write().await;
1998 region.set_staging(&mut manager).await.unwrap();
1999 drop(manager);
2000
2001 let manager = manifest_ctx.manifest_manager.read().await;
2003 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2004 assert_eq!(
2005 cleaned_manifests.len(),
2006 0,
2007 "Dirty staging files should be cleaned up"
2008 );
2009 drop(manager);
2010
2011 region.exit_staging().unwrap();
2013 }
2014
2015 let mut manager = manifest_ctx.manifest_manager.write().await;
2017 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
2019 let mut manager = manifest_ctx.manifest_manager.write().await;
2020 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
2022 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
2025}