1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48 InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49 UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65#[derive(Debug)]
67pub struct RegionUsage {
68 pub region_id: RegionId,
69 pub wal_usage: u64,
70 pub sst_usage: u64,
71 pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75 pub fn disk_usage(&self) -> u64 {
76 self.wal_usage + self.sst_usage + self.manifest_usage
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82 Writable,
84 Staging,
86 EnteringStaging,
88 Altering,
90 Dropping,
92 Truncating,
94 Editing,
96 Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102 Leader(RegionLeaderState),
103 Follower,
104}
105
106impl RegionRoleState {
107 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109 match self {
110 RegionRoleState::Leader(leader_state) => Some(leader_state),
111 RegionRoleState::Follower => None,
112 }
113 }
114}
115
116#[derive(Debug)]
122pub struct MitoRegion {
123 pub(crate) region_id: RegionId,
128
129 pub(crate) version_control: VersionControlRef,
133 pub(crate) access_layer: AccessLayerRef,
135 pub(crate) manifest_ctx: ManifestContextRef,
137 pub(crate) file_purger: FilePurgerRef,
139 pub(crate) provider: Provider,
141 last_flush_millis: AtomicI64,
143 last_compaction_millis: AtomicI64,
145 time_provider: TimeProviderRef,
147 pub(crate) topic_latest_entry_id: AtomicU64,
157 pub(crate) written_bytes: Arc<AtomicU64>,
159 stats: ManifestStats,
161}
162
163pub type MitoRegionRef = Arc<MitoRegion>;
164
165#[derive(Debug, Clone)]
166pub(crate) struct StagingPartitionInfo {
167 pub(crate) partition_directive: StagingPartitionDirective,
168 pub(crate) partition_rule_version: u64,
169}
170
171impl StagingPartitionInfo {
172 pub(crate) fn partition_expr(&self) -> Option<&str> {
174 self.partition_directive.partition_expr()
175 }
176
177 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
179 let partition_rule_version = match &partition_directive {
180 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
181 partition_expr_version(Some(expr))
182 }
183 StagingPartitionDirective::RejectAllWrites => 0,
184 };
185 Self {
186 partition_directive,
187 partition_rule_version,
188 }
189 }
190}
191
192impl MitoRegion {
193 pub(crate) async fn stop(&self) {
195 self.manifest_ctx
196 .manifest_manager
197 .write()
198 .await
199 .stop()
200 .await;
201
202 info!(
203 "Stopped region manifest manager, region_id: {}",
204 self.region_id
205 );
206 }
207
208 pub fn metadata(&self) -> RegionMetadataRef {
210 let version_data = self.version_control.current();
211 version_data.version.metadata.clone()
212 }
213
214 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
216 let version_data = self.version_control.current();
217 version_data.version.metadata.primary_key_encoding
218 }
219
220 pub(crate) fn version(&self) -> VersionRef {
222 let version_data = self.version_control.current();
223 version_data.version
224 }
225
226 pub(crate) fn last_flush_millis(&self) -> i64 {
228 self.last_flush_millis.load(Ordering::Relaxed)
229 }
230
231 pub(crate) fn update_flush_millis(&self) {
233 let now = self.time_provider.current_time_millis();
234 self.last_flush_millis.store(now, Ordering::Relaxed);
235 }
236
237 pub(crate) fn last_compaction_millis(&self) -> i64 {
239 self.last_compaction_millis.load(Ordering::Relaxed)
240 }
241
242 pub(crate) fn update_compaction_millis(&self) {
244 let now = self.time_provider.current_time_millis();
245 self.last_compaction_millis.store(now, Ordering::Relaxed);
246 }
247
248 pub(crate) fn table_dir(&self) -> &str {
250 self.access_layer.table_dir()
251 }
252
253 pub(crate) fn path_type(&self) -> PathType {
255 self.access_layer.path_type()
256 }
257
258 pub(crate) fn is_writable(&self) -> bool {
260 matches!(
261 self.manifest_ctx.state.load(),
262 RegionRoleState::Leader(RegionLeaderState::Writable)
263 | RegionRoleState::Leader(RegionLeaderState::Staging)
264 )
265 }
266
267 pub(crate) fn is_flushable(&self) -> bool {
269 matches!(
270 self.manifest_ctx.state.load(),
271 RegionRoleState::Leader(RegionLeaderState::Writable)
272 | RegionRoleState::Leader(RegionLeaderState::Staging)
273 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
274 )
275 }
276
277 pub(crate) fn should_abort_index(&self) -> bool {
279 matches!(
280 self.manifest_ctx.state.load(),
281 RegionRoleState::Follower
282 | RegionRoleState::Leader(RegionLeaderState::Dropping)
283 | RegionRoleState::Leader(RegionLeaderState::Truncating)
284 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
285 | RegionRoleState::Leader(RegionLeaderState::Staging)
286 )
287 }
288
289 pub(crate) fn is_downgrading(&self) -> bool {
291 matches!(
292 self.manifest_ctx.state.load(),
293 RegionRoleState::Leader(RegionLeaderState::Downgrading)
294 )
295 }
296
297 pub(crate) fn is_staging(&self) -> bool {
299 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
300 }
301
302 pub(crate) fn is_enter_staging(&self) -> bool {
304 self.manifest_ctx.state.load()
305 == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
306 }
307
308 pub fn region_id(&self) -> RegionId {
309 self.region_id
310 }
311
312 pub fn find_committed_sequence(&self) -> SequenceNumber {
313 self.version_control.committed_sequence()
314 }
315
316 pub fn is_follower(&self) -> bool {
318 self.manifest_ctx.state.load() == RegionRoleState::Follower
319 }
320
321 pub(crate) fn state(&self) -> RegionRoleState {
323 self.manifest_ctx.state.load()
324 }
325
326 pub(crate) fn set_role(&self, next_role: RegionRole) {
328 self.manifest_ctx.set_role(next_role, self.region_id);
329 }
330
331 pub(crate) fn region_role(&self) -> RegionRole {
332 match self.state() {
333 RegionRoleState::Follower => RegionRole::Follower,
334 RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
335 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
336 RegionRole::DowngradingLeader
337 }
338 RegionRoleState::Leader(_) => RegionRole::Leader,
339 }
340 }
341
342 pub(crate) fn set_altering(&self) -> Result<()> {
345 self.compare_exchange_state(
346 RegionLeaderState::Writable,
347 RegionRoleState::Leader(RegionLeaderState::Altering),
348 )
349 }
350
351 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
354 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
355 }
356
357 pub(crate) fn set_truncating(&self) -> Result<()> {
360 self.compare_exchange_state(
361 RegionLeaderState::Writable,
362 RegionRoleState::Leader(RegionLeaderState::Truncating),
363 )
364 }
365
366 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
369 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
370 }
371
372 pub(crate) async fn set_staging(
378 &self,
379 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
380 ) -> Result<()> {
381 manager.store().clear_staging_manifests().await?;
382
383 self.compare_exchange_state(
384 RegionLeaderState::Writable,
385 RegionRoleState::Leader(RegionLeaderState::Staging),
386 )
387 }
388
389 pub(crate) fn set_entering_staging(&self) -> Result<()> {
391 self.compare_exchange_state(
392 RegionLeaderState::Writable,
393 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
394 )
395 }
396
397 pub fn exit_staging(&self) -> Result<()> {
402 self.manifest_ctx.exit_staging(
403 self.region_id,
404 RegionRoleState::Leader(RegionLeaderState::Writable),
405 )
406 }
407
408 pub(crate) async fn set_role_state_gracefully(
410 &self,
411 state: SettableRegionRoleState,
412 ) -> Result<()> {
413 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
414 self.manifest_ctx.manifest_manager.write().await;
415 let current_state = self.state();
416
417 match state {
418 SettableRegionRoleState::Leader => {
419 match current_state {
422 RegionRoleState::Leader(RegionLeaderState::Staging) => {
423 info!("Exiting staging mode for region {}", self.region_id);
424 self.exit_staging_on_success(&mut manager).await?;
426 }
427 RegionRoleState::Leader(RegionLeaderState::Writable) => {
428 info!("Region {} already in normal leader mode", self.region_id);
430 }
431 _ => {
432 return Err(RegionStateSnafu {
434 region_id: self.region_id,
435 state: current_state,
436 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
437 }
438 .build());
439 }
440 }
441 }
442
443 SettableRegionRoleState::StagingLeader => {
444 match current_state {
447 RegionRoleState::Leader(RegionLeaderState::Writable) => {
448 info!("Entering staging mode for region {}", self.region_id);
449 self.set_staging(&mut manager).await?;
450 }
451 RegionRoleState::Leader(RegionLeaderState::Staging) => {
452 info!("Region {} already in staging mode", self.region_id);
454 }
455 _ => {
456 return Err(RegionStateSnafu {
457 region_id: self.region_id,
458 state: current_state,
459 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
460 }
461 .build());
462 }
463 }
464 }
465
466 SettableRegionRoleState::Follower => {
467 match current_state {
469 RegionRoleState::Leader(RegionLeaderState::Staging) => {
470 info!(
471 "Exiting staging and demoting region {} to follower",
472 self.region_id
473 );
474 self.exit_staging()?;
475 self.set_role(RegionRole::Follower);
476 }
477 RegionRoleState::Leader(_) => {
478 info!("Demoting region {} from leader to follower", self.region_id);
479 self.set_role(RegionRole::Follower);
480 }
481 RegionRoleState::Follower => {
482 info!("Region {} already in follower mode", self.region_id);
484 }
485 }
486 }
487
488 SettableRegionRoleState::DowngradingLeader => {
489 match current_state {
491 RegionRoleState::Leader(RegionLeaderState::Staging) => {
492 info!(
493 "Exiting staging and entering downgrade for region {}",
494 self.region_id
495 );
496 self.exit_staging()?;
497 self.set_role(RegionRole::DowngradingLeader);
498 }
499 RegionRoleState::Leader(RegionLeaderState::Writable) => {
500 info!("Starting downgrade for region {}", self.region_id);
501 self.set_role(RegionRole::DowngradingLeader);
502 }
503 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
504 info!("Region {} already in downgrading mode", self.region_id);
506 }
507 _ => {
508 warn!(
509 "Cannot start downgrade for region {} from state {:?}",
510 self.region_id, current_state
511 );
512 }
513 }
514 }
515 }
516
517 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
519 let manifest_meta = &manager.manifest().metadata;
521 let current_version = self.version();
522 let current_meta = ¤t_version.metadata;
523 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
524 let action = RegionMetaAction::Change(RegionChange {
525 metadata: current_meta.clone(),
526 sst_format: current_version.options.sst_format.unwrap_or_default(),
527 append_mode: None,
528 });
529 let result = manager
530 .update(RegionMetaActionList::with_action(action), false)
531 .await;
532
533 match result {
534 Ok(version) => {
535 info!(
536 "Successfully persisted backfilled metadata for region {}, version: {}",
537 self.region_id, version
538 );
539 }
540 Err(e) => {
541 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
542 }
543 }
544 }
545 }
546
547 drop(manager);
548
549 Ok(())
550 }
551
552 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
555 if let Err(e) = self
556 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
557 {
558 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
559 }
560 }
561
562 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
565 if let Err(e) =
566 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
567 {
568 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
569 }
570 }
571
572 pub(crate) fn region_statistic(&self) -> RegionStatistic {
574 let version = self.version();
575 let memtables = &version.memtables;
576 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
577
578 let sst_usage = version.ssts.sst_usage();
579 let index_usage = version.ssts.index_usage();
580 let flushed_entry_id = version.flushed_entry_id;
581
582 let wal_usage = self.estimated_wal_usage(memtable_usage);
583 let manifest_usage = self.stats.total_manifest_size();
584 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
585 let num_files = version.ssts.num_files();
586 let manifest_version = self.stats.manifest_version();
587 let file_removed_cnt = self.stats.file_removed_cnt();
588
589 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
590 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
591
592 RegionStatistic {
593 num_rows,
594 memtable_size: memtable_usage,
595 wal_size: wal_usage,
596 manifest_size: manifest_usage,
597 sst_size: sst_usage,
598 sst_num: num_files,
599 index_size: index_usage,
600 manifest: RegionManifestInfo::Mito {
601 manifest_version,
602 flushed_entry_id,
603 file_removed_cnt,
604 },
605 data_topic_latest_entry_id: topic_latest_entry_id,
606 metadata_topic_latest_entry_id: topic_latest_entry_id,
607 written_bytes,
608 }
609 }
610
611 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
614 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
615 }
616
617 fn compare_exchange_state(
620 &self,
621 expect: RegionLeaderState,
622 state: RegionRoleState,
623 ) -> Result<()> {
624 self.manifest_ctx
625 .state
626 .compare_exchange(RegionRoleState::Leader(expect), state)
627 .map_err(|actual| {
628 RegionStateSnafu {
629 region_id: self.region_id,
630 state: actual,
631 expect: RegionRoleState::Leader(expect),
632 }
633 .build()
634 })?;
635 Ok(())
636 }
637
638 pub fn access_layer(&self) -> AccessLayerRef {
639 self.access_layer.clone()
640 }
641
642 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
644 let table_dir = self.table_dir();
645 let path_type = self.access_layer.path_type();
646
647 let visible_ssts = self
648 .version()
649 .ssts
650 .levels()
651 .iter()
652 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
653 .collect::<HashSet<_>>();
654
655 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
656 let staging_files = self
657 .manifest_ctx
658 .staging_manifest()
659 .await
660 .map(|m| m.files.clone())
661 .unwrap_or_default();
662 let files = manifest_files
663 .into_iter()
664 .chain(staging_files)
665 .collect::<HashMap<_, _>>();
666
667 files
668 .values()
669 .map(|meta| {
670 let region_id = self.region_id;
671 let origin_region_id = meta.region_id;
672 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
673 {
674 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
675 (
676 meta.index_version,
677 Some(index_file_path),
678 Some(meta.index_file_size),
679 )
680 } else {
681 (0, None, None)
682 };
683 let visible = visible_ssts.contains(&meta.file_id);
684 ManifestSstEntry {
685 table_dir: table_dir.to_string(),
686 region_id,
687 table_id: region_id.table_id(),
688 region_number: region_id.region_number(),
689 region_group: region_id.region_group(),
690 region_sequence: region_id.region_sequence(),
691 file_id: meta.file_id.to_string(),
692 index_version,
693 level: meta.level,
694 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
695 file_size: meta.file_size,
696 index_file_path,
697 index_file_size,
698 num_rows: meta.num_rows,
699 num_row_groups: meta.num_row_groups,
700 num_series: Some(meta.num_series),
701 min_ts: meta.time_range.0,
702 max_ts: meta.time_range.1,
703 sequence: meta.sequence.map(|s| s.get()),
704 origin_region_id,
705 node_id: None,
706 visible,
707 }
708 })
709 .collect()
710 }
711
712 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
714 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
715
716 file_ids
717 .iter()
718 .map(|file_id| manifest_files.get(file_id).cloned())
719 .collect::<Vec<_>>()
720 }
721
722 pub(crate) async fn exit_staging_on_success(
724 &self,
725 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
726 ) -> Result<()> {
727 let current_state = self.manifest_ctx.current_state();
728 ensure!(
729 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
730 RegionStateSnafu {
731 region_id: self.region_id,
732 state: current_state,
733 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
734 }
735 );
736
737 let merged_actions = match manager.merge_staged_actions(current_state).await? {
739 Some(actions) => actions,
740 None => {
741 info!(
742 "No staged manifests to merge for region {}, exiting staging mode without changes",
743 self.region_id
744 );
745 self.exit_staging()?;
747 return Ok(());
748 }
749 };
750 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
751 let expect_partition_expr_change = merged_actions
752 .actions
753 .iter()
754 .any(|a| a.is_partition_expr_change());
755 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
756 ensure!(
757 !(expect_change && expect_partition_expr_change),
758 UnexpectedSnafu {
759 reason: "unexpected both change and partition expr change actions in merged actions"
760 }
761 );
762 ensure!(
763 expect_change || expect_partition_expr_change,
764 UnexpectedSnafu {
765 reason: "expect a change or partition expr change action in merged actions"
766 }
767 );
768 ensure!(
769 expect_edit,
770 UnexpectedSnafu {
771 reason: "expect an edit action in merged actions"
772 }
773 );
774
775 let (merged_partition_expr_change, merged_change, merged_edit) =
776 merged_actions.clone().split_region_change_and_edit();
777 if let Some(change) = &merged_change {
778 let current_column_metadatas = &self.version().metadata.column_metadatas;
782 ensure!(
783 change.metadata.column_metadatas == *current_column_metadatas,
784 UnexpectedSnafu {
785 reason: "change action alters column metadata in staging exit"
786 }
787 );
788 }
789
790 let new_version = manager.update(merged_actions, false).await?;
793 info!(
794 "Successfully submitted merged staged manifests for region {}, new version: {}",
795 self.region_id, new_version
796 );
797
798 if let Some(change) = merged_partition_expr_change {
800 let mut new_metadata = self.version().metadata.as_ref().clone();
801 new_metadata.set_partition_expr(change.partition_expr);
802 self.version_control.alter_metadata(new_metadata.into());
803 }
804 if let Some(change) = merged_change {
805 self.version_control.alter_metadata(change.metadata);
806 }
807 self.version_control
808 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
809
810 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
812 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
813 }
814 self.exit_staging()?;
815
816 Ok(())
817 }
818
819 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
825 let is_staging = self.is_staging();
826 if is_staging {
827 let staging_partition_info = self.manifest_ctx.staging_partition_info();
828 if staging_partition_info.is_none() {
829 warn!(
830 "Staging partition expr is none for region {} in staging state",
831 self.region_id
832 );
833 }
834 staging_partition_info
835 .as_ref()
836 .and_then(|info| info.partition_expr().map(ToString::to_string))
837 } else {
838 let version = self.version();
839 version.metadata.partition_expr.clone()
840 }
841 }
842
843 pub fn expected_partition_expr_version(&self) -> u64 {
844 if self.is_staging() {
845 self.manifest_ctx
846 .staging_partition_info()
847 .as_ref()
848 .map(|info| info.partition_rule_version)
849 .unwrap_or_default()
850 } else {
851 self.version().metadata.partition_expr_version
852 }
853 }
854
855 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
857 if !self.is_staging() {
858 return false;
859 }
860 self.manifest_ctx
861 .staging_partition_info()
862 .as_ref()
863 .map(|info| {
864 matches!(
865 info.partition_directive,
866 StagingPartitionDirective::RejectAllWrites
867 )
868 })
869 .unwrap_or(false)
870 }
871}
872
873#[derive(Debug)]
875pub(crate) struct ManifestContext {
876 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
878 state: AtomicCell<RegionRoleState>,
881 staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
886}
887
888impl ManifestContext {
889 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
890 ManifestContext {
891 manifest_manager: tokio::sync::RwLock::new(manager),
892 state: AtomicCell::new(state),
893 staging_partition_info: Mutex::new(None),
894 }
895 }
896
897 pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
898 self.staging_partition_info.lock().unwrap().clone()
899 }
900
901 pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
902 let mut current = self.staging_partition_info.lock().unwrap();
903 debug_assert!(current.is_none());
904 *current = Some(staging_partition_info);
905 }
906
907 fn clear_staging_partition_info(&self) {
908 *self.staging_partition_info.lock().unwrap() = None;
909 }
910
911 pub(crate) fn exit_staging(
912 &self,
913 region_id: RegionId,
914 next_state: RegionRoleState,
915 ) -> Result<()> {
916 self.state
917 .compare_exchange(
918 RegionRoleState::Leader(RegionLeaderState::Staging),
919 next_state,
920 )
921 .map_err(|actual| {
922 RegionStateSnafu {
923 region_id,
924 state: actual,
925 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
926 }
927 .build()
928 })?;
929 self.clear_staging_partition_info();
930 Ok(())
931 }
932
933 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
934 self.manifest_manager
935 .read()
936 .await
937 .manifest()
938 .manifest_version
939 }
940
941 pub(crate) async fn has_update(&self) -> Result<bool> {
942 self.manifest_manager.read().await.has_update().await
943 }
944
945 pub(crate) fn current_state(&self) -> RegionRoleState {
947 self.state.load()
948 }
949
950 pub(crate) async fn install_manifest_to(
956 &self,
957 version: ManifestVersion,
958 ) -> Result<Arc<RegionManifest>> {
959 let mut manager = self.manifest_manager.write().await;
960 manager.install_manifest_to(version).await?;
961
962 Ok(manager.manifest())
963 }
964
965 pub(crate) async fn update_manifest(
967 &self,
968 expect_state: RegionLeaderState,
969 action_list: RegionMetaActionList,
970 is_staging: bool,
971 ) -> Result<ManifestVersion> {
972 let mut manager = self.manifest_manager.write().await;
974 let manifest = manager.manifest();
976 let current_state = self.state.load();
979
980 if expect_state != RegionLeaderState::Downgrading {
985 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
986 info!(
987 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
988 manifest.metadata.region_id, expect_state
989 );
990 }
991 ensure!(
992 current_state == RegionRoleState::Leader(expect_state)
993 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
994 UpdateManifestSnafu {
995 region_id: manifest.metadata.region_id,
996 state: current_state,
997 }
998 );
999 } else {
1000 ensure!(
1001 current_state == RegionRoleState::Leader(expect_state),
1002 RegionStateSnafu {
1003 region_id: manifest.metadata.region_id,
1004 state: current_state,
1005 expect: RegionRoleState::Leader(expect_state),
1006 }
1007 );
1008 }
1009
1010 for action in &action_list.actions {
1011 let RegionMetaAction::Edit(edit) = &action else {
1013 continue;
1014 };
1015
1016 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1018 continue;
1019 };
1020
1021 if let Some(flushed_entry_id) = edit.flushed_entry_id {
1023 let is_newer_entry = truncated_entry_id < flushed_entry_id;
1033 let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1034 && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1035 manifest.flushed_sequence < flushed_sequence
1036 });
1037
1038 ensure!(
1039 is_newer_entry || is_same_entry_with_newer_sequence,
1040 RegionTruncatedSnafu {
1041 region_id: manifest.metadata.region_id,
1042 }
1043 );
1044 }
1045
1046 if !edit.files_to_remove.is_empty() {
1048 for file in &edit.files_to_remove {
1050 ensure!(
1051 manifest.files.contains_key(&file.file_id),
1052 RegionTruncatedSnafu {
1053 region_id: manifest.metadata.region_id,
1054 }
1055 );
1056 }
1057 }
1058 }
1059
1060 let version = manager.update(action_list, is_staging).await.inspect_err(
1062 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1063 )?;
1064
1065 if self.state.load() == RegionRoleState::Follower {
1066 warn!(
1067 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1068 manifest.metadata.region_id
1069 );
1070 }
1071
1072 Ok(version)
1073 }
1074
1075 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1109 match next_role {
1110 RegionRole::Follower => {
1111 if self
1112 .exit_staging(region_id, RegionRoleState::Follower)
1113 .is_ok()
1114 {
1115 info!(
1116 "Convert region {} to follower, previous role state: {:?}",
1117 region_id,
1118 RegionRoleState::Leader(RegionLeaderState::Staging)
1119 );
1120 return;
1121 }
1122 match self.state.fetch_update(|state| {
1123 if !matches!(state, RegionRoleState::Follower) {
1124 Some(RegionRoleState::Follower)
1125 } else {
1126 None
1127 }
1128 }) {
1129 Ok(state) => info!(
1130 "Convert region {} to follower, previous role state: {:?}",
1131 region_id, state
1132 ),
1133 Err(state) => {
1134 if state != RegionRoleState::Follower {
1135 warn!(
1136 "Failed to convert region {} to follower, current role state: {:?}",
1137 region_id, state
1138 )
1139 }
1140 }
1141 }
1142 }
1143 RegionRole::Leader => {
1144 if self
1145 .exit_staging(
1146 region_id,
1147 RegionRoleState::Leader(RegionLeaderState::Writable),
1148 )
1149 .is_ok()
1150 {
1151 info!(
1152 "Convert region {} to leader, previous role state: {:?}",
1153 region_id,
1154 RegionRoleState::Leader(RegionLeaderState::Staging)
1155 );
1156 return;
1157 }
1158 match self.state.fetch_update(|state| {
1159 if matches!(
1160 state,
1161 RegionRoleState::Follower
1162 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1163 ) {
1164 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1165 } else {
1166 None
1167 }
1168 }) {
1169 Ok(state) => info!(
1170 "Convert region {} to leader, previous role state: {:?}",
1171 region_id, state
1172 ),
1173 Err(state) => {
1174 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1175 warn!(
1176 "Failed to convert region {} to leader, current role state: {:?}",
1177 region_id, state
1178 )
1179 }
1180 }
1181 }
1182 }
1183 RegionRole::StagingLeader => {
1184 info!(
1185 "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1186 region_id
1187 );
1188 }
1189 RegionRole::DowngradingLeader => {
1190 if self
1191 .exit_staging(
1192 region_id,
1193 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1194 )
1195 .is_ok()
1196 {
1197 info!(
1198 "Convert region {} to downgrading region, previous role state: {:?}",
1199 region_id,
1200 RegionRoleState::Leader(RegionLeaderState::Staging)
1201 );
1202 return;
1203 }
1204 match self.state.compare_exchange(
1205 RegionRoleState::Leader(RegionLeaderState::Writable),
1206 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1207 ) {
1208 Ok(state) => info!(
1209 "Convert region {} to downgrading region, previous role state: {:?}",
1210 region_id, state
1211 ),
1212 Err(state) => {
1213 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1214 warn!(
1215 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1216 region_id, state
1217 )
1218 }
1219 }
1220 }
1221 }
1222 }
1223 }
1224
1225 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1227 self.manifest_manager.read().await.manifest()
1228 }
1229
1230 pub(crate) async fn staging_manifest(
1232 &self,
1233 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1234 self.manifest_manager.read().await.staging_manifest()
1235 }
1236}
1237
1238pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1239
1240#[derive(Debug, Default)]
1242pub(crate) struct RegionMap {
1243 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1244}
1245
1246impl RegionMap {
1247 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1249 let regions = self.regions.read().unwrap();
1250 regions.contains_key(®ion_id)
1251 }
1252
1253 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1255 let mut regions = self.regions.write().unwrap();
1256 regions.insert(region.region_id, region);
1257 }
1258
1259 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1261 let regions = self.regions.read().unwrap();
1262 regions.get(®ion_id).cloned()
1263 }
1264
1265 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1269 let region = self
1270 .get_region(region_id)
1271 .context(RegionNotFoundSnafu { region_id })?;
1272 ensure!(
1273 region.is_writable(),
1274 RegionStateSnafu {
1275 region_id,
1276 state: region.state(),
1277 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1278 }
1279 );
1280 Ok(region)
1281 }
1282
1283 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1287 let region = self
1288 .get_region(region_id)
1289 .context(RegionNotFoundSnafu { region_id })?;
1290 ensure!(
1291 region.is_follower(),
1292 RegionStateSnafu {
1293 region_id,
1294 state: region.state(),
1295 expect: RegionRoleState::Follower,
1296 }
1297 );
1298
1299 Ok(region)
1300 }
1301
1302 pub(crate) fn get_region_or<F: OnFailure>(
1306 &self,
1307 region_id: RegionId,
1308 cb: &mut F,
1309 ) -> Option<MitoRegionRef> {
1310 match self
1311 .get_region(region_id)
1312 .context(RegionNotFoundSnafu { region_id })
1313 {
1314 Ok(region) => Some(region),
1315 Err(e) => {
1316 cb.on_failure(e);
1317 None
1318 }
1319 }
1320 }
1321
1322 pub(crate) fn writable_region_or<F: OnFailure>(
1326 &self,
1327 region_id: RegionId,
1328 cb: &mut F,
1329 ) -> Option<MitoRegionRef> {
1330 match self.writable_region(region_id) {
1331 Ok(region) => Some(region),
1332 Err(e) => {
1333 cb.on_failure(e);
1334 None
1335 }
1336 }
1337 }
1338
1339 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1343 let region = self.writable_region(region_id)?;
1344 if region.is_staging() {
1345 return Err(crate::error::RegionStateSnafu {
1346 region_id,
1347 state: region.state(),
1348 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1349 }
1350 .build());
1351 }
1352 Ok(region)
1353 }
1354
1355 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1359 let region = self
1360 .get_region(region_id)
1361 .context(RegionNotFoundSnafu { region_id })?;
1362 ensure!(
1363 region.is_staging(),
1364 RegionStateSnafu {
1365 region_id,
1366 state: region.state(),
1367 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1368 }
1369 );
1370 Ok(region)
1371 }
1372
1373 fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1378 let region = self
1379 .get_region(region_id)
1380 .context(RegionNotFoundSnafu { region_id })?;
1381 if region.is_flushable() {
1382 Ok(Some(region))
1383 } else {
1384 Ok(None)
1385 }
1386 }
1387
1388 pub(crate) fn flushable_region_or<F: OnFailure>(
1393 &self,
1394 region_id: RegionId,
1395 cb: &mut F,
1396 ) -> Option<MitoRegionRef> {
1397 match self.flushable_region(region_id) {
1398 Ok(region) => region,
1399 Err(e) => {
1400 cb.on_failure(e);
1401 None
1402 }
1403 }
1404 }
1405
1406 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1408 let mut regions = self.regions.write().unwrap();
1409 regions.remove(®ion_id)
1410 }
1411
1412 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1414 let regions = self.regions.read().unwrap();
1415 regions.values().cloned().collect()
1416 }
1417
1418 pub(crate) fn clear(&self) {
1420 self.regions.write().unwrap().clear();
1421 }
1422}
1423
1424pub(crate) type RegionMapRef = Arc<RegionMap>;
1425
1426#[derive(Debug, Default)]
1428pub(crate) struct OpeningRegions {
1429 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1430}
1431
1432impl OpeningRegions {
1433 pub(crate) fn wait_for_opening_region(
1435 &self,
1436 region_id: RegionId,
1437 sender: OptionOutputTx,
1438 ) -> Option<OptionOutputTx> {
1439 let mut regions = self.regions.write().unwrap();
1440 match regions.entry(region_id) {
1441 Entry::Occupied(mut senders) => {
1442 senders.get_mut().push(sender);
1443 None
1444 }
1445 Entry::Vacant(_) => Some(sender),
1446 }
1447 }
1448
1449 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1451 let regions = self.regions.read().unwrap();
1452 regions.contains_key(®ion_id)
1453 }
1454
1455 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1457 let mut regions = self.regions.write().unwrap();
1458 regions.insert(region, vec![sender]);
1459 }
1460
1461 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1463 let mut regions = self.regions.write().unwrap();
1464 regions.remove(®ion_id).unwrap_or_default()
1465 }
1466
1467 #[cfg(test)]
1468 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1469 let regions = self.regions.read().unwrap();
1470 if let Some(senders) = regions.get(®ion_id) {
1471 senders.len()
1472 } else {
1473 0
1474 }
1475 }
1476}
1477
1478pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1479
1480#[derive(Debug, Default)]
1482pub(crate) struct CatchupRegions {
1483 regions: RwLock<HashSet<RegionId>>,
1484}
1485
1486impl CatchupRegions {
1487 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1489 let regions = self.regions.read().unwrap();
1490 regions.contains(®ion_id)
1491 }
1492
1493 pub(crate) fn insert_region(&self, region_id: RegionId) {
1495 let mut regions = self.regions.write().unwrap();
1496 regions.insert(region_id);
1497 }
1498
1499 pub(crate) fn remove_region(&self, region_id: RegionId) {
1501 let mut regions = self.regions.write().unwrap();
1502 regions.remove(®ion_id);
1503 }
1504}
1505
1506pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1507
1508#[derive(Default, Debug, Clone)]
1510pub struct ManifestStats {
1511 pub(crate) total_manifest_size: Arc<AtomicU64>,
1512 pub(crate) manifest_version: Arc<AtomicU64>,
1513 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1514}
1515
1516impl ManifestStats {
1517 fn total_manifest_size(&self) -> u64 {
1518 self.total_manifest_size.load(Ordering::Relaxed)
1519 }
1520
1521 fn manifest_version(&self) -> u64 {
1522 self.manifest_version.load(Ordering::Relaxed)
1523 }
1524
1525 fn file_removed_cnt(&self) -> u64 {
1526 self.file_removed_cnt.load(Ordering::Relaxed)
1527 }
1528}
1529
1530pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1532 match partition_expr_str {
1533 None => Ok(None),
1534 Some("") => Ok(None),
1535 Some(json_str) => {
1536 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1537 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1538 Ok(expr)
1539 }
1540 }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use std::sync::Arc;
1546 use std::sync::atomic::AtomicU64;
1547
1548 use common_datasource::compression::CompressionType;
1549 use common_test_util::temp_dir::create_temp_dir;
1550 use crossbeam_utils::atomic::AtomicCell;
1551 use object_store::ObjectStore;
1552 use object_store::services::Fs;
1553 use store_api::logstore::provider::Provider;
1554 use store_api::region_engine::RegionRole;
1555 use store_api::region_request::PathType;
1556 use store_api::storage::RegionId;
1557
1558 use crate::access_layer::AccessLayer;
1559 use crate::error::Error;
1560 use crate::manifest::action::{
1561 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1562 };
1563 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1564 use crate::region::{
1565 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1566 };
1567 use crate::sst::FormatType;
1568 use crate::sst::index::intermediate::IntermediateManager;
1569 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1570 use crate::test_util::scheduler_util::SchedulerEnv;
1571 use crate::test_util::version_util::VersionControlBuilder;
1572 use crate::time_provider::StdTimeProvider;
1573
1574 #[test]
1575 fn test_region_state_lock_free() {
1576 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1577 }
1578
1579 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1580 let builder = VersionControlBuilder::new();
1581 let version_control = Arc::new(builder.build());
1582 let metadata = version_control.current().version.metadata.clone();
1583
1584 let manager = RegionManifestManager::new(
1585 metadata.clone(),
1586 0,
1587 RegionManifestOptions {
1588 manifest_dir: "".to_string(),
1589 object_store: env.access_layer.object_store().clone(),
1590 compress_type: CompressionType::Uncompressed,
1591 checkpoint_distance: 10,
1592 remove_file_options: Default::default(),
1593 manifest_cache: None,
1594 },
1595 FormatType::PrimaryKey,
1596 &Default::default(),
1597 )
1598 .await
1599 .unwrap();
1600
1601 let manifest_ctx = Arc::new(ManifestContext::new(
1602 manager,
1603 RegionRoleState::Leader(RegionLeaderState::Writable),
1604 ));
1605
1606 MitoRegion {
1607 region_id: metadata.region_id,
1608 version_control,
1609 access_layer: env.access_layer.clone(),
1610 manifest_ctx,
1611 file_purger: crate::test_util::new_noop_file_purger(),
1612 provider: Provider::noop_provider(),
1613 last_flush_millis: Default::default(),
1614 last_compaction_millis: Default::default(),
1615 time_provider: Arc::new(StdTimeProvider),
1616 topic_latest_entry_id: Default::default(),
1617 written_bytes: Arc::new(AtomicU64::new(0)),
1618 stats: ManifestStats::default(),
1619 }
1620 }
1621
1622 fn empty_edit() -> RegionEdit {
1623 RegionEdit {
1624 files_to_add: Vec::new(),
1625 files_to_remove: Vec::new(),
1626 timestamp_ms: None,
1627 compaction_time_window: None,
1628 flushed_entry_id: None,
1629 flushed_sequence: None,
1630 committed_sequence: None,
1631 }
1632 }
1633
1634 #[tokio::test]
1635 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1636 let env = SchedulerEnv::new().await;
1637 let region = build_test_region(&env).await;
1638
1639 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1640 region.set_staging(&mut manager).await.unwrap();
1641 manager
1642 .update(
1643 RegionMetaActionList::new(vec![
1644 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1645 partition_expr: Some("expr_a".to_string()),
1646 }),
1647 RegionMetaAction::Edit(empty_edit()),
1648 ]),
1649 true,
1650 )
1651 .await
1652 .unwrap();
1653
1654 region.exit_staging_on_success(&mut manager).await.unwrap();
1655 drop(manager);
1656
1657 assert_eq!(
1658 region.version().metadata.partition_expr.as_deref(),
1659 Some("expr_a")
1660 );
1661 assert_eq!(
1662 region.state(),
1663 RegionRoleState::Leader(RegionLeaderState::Writable)
1664 );
1665 }
1666
1667 #[tokio::test]
1668 async fn test_exit_staging_change_with_same_columns_success() {
1669 let env = SchedulerEnv::new().await;
1670 let region = build_test_region(&env).await;
1671
1672 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1673 region.set_staging(&mut manager).await.unwrap();
1674
1675 let mut changed_metadata = region.version().metadata.as_ref().clone();
1676 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1677
1678 manager
1679 .update(
1680 RegionMetaActionList::new(vec![
1681 RegionMetaAction::Change(RegionChange {
1682 metadata: Arc::new(changed_metadata),
1683 sst_format: FormatType::PrimaryKey,
1684 append_mode: None,
1685 }),
1686 RegionMetaAction::Edit(empty_edit()),
1687 ]),
1688 true,
1689 )
1690 .await
1691 .unwrap();
1692
1693 region.exit_staging_on_success(&mut manager).await.unwrap();
1694 drop(manager);
1695
1696 assert_eq!(
1697 region.version().metadata.partition_expr.as_deref(),
1698 Some("expr_b")
1699 );
1700 assert_eq!(
1701 region.state(),
1702 RegionRoleState::Leader(RegionLeaderState::Writable)
1703 );
1704 }
1705
1706 #[tokio::test]
1707 async fn test_exit_staging_change_with_different_columns_fails() {
1708 let env = SchedulerEnv::new().await;
1709 let region = build_test_region(&env).await;
1710
1711 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1712 region.set_staging(&mut manager).await.unwrap();
1713
1714 let mut changed_metadata = region.version().metadata.as_ref().clone();
1715 changed_metadata.column_metadatas.rotate_left(1);
1716
1717 manager
1718 .update(
1719 RegionMetaActionList::new(vec![
1720 RegionMetaAction::Change(RegionChange {
1721 metadata: Arc::new(changed_metadata),
1722 sst_format: FormatType::PrimaryKey,
1723 append_mode: None,
1724 }),
1725 RegionMetaAction::Edit(empty_edit()),
1726 ]),
1727 true,
1728 )
1729 .await
1730 .unwrap();
1731
1732 let result = region.exit_staging_on_success(&mut manager).await;
1733 assert!(matches!(result, Err(Error::Unexpected { .. })));
1734 }
1735
1736 #[tokio::test]
1737 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1738 let env = SchedulerEnv::new().await;
1739 let region = build_test_region(&env).await;
1740
1741 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1742 region.set_staging(&mut manager).await.unwrap();
1743
1744 let mut changed_metadata = region.version().metadata.as_ref().clone();
1745 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1746
1747 manager
1748 .update(
1749 RegionMetaActionList::new(vec![
1750 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1751 partition_expr: Some("expr_c".to_string()),
1752 }),
1753 RegionMetaAction::Change(RegionChange {
1754 metadata: Arc::new(changed_metadata),
1755 sst_format: FormatType::PrimaryKey,
1756 append_mode: None,
1757 }),
1758 RegionMetaAction::Edit(empty_edit()),
1759 ]),
1760 true,
1761 )
1762 .await
1763 .unwrap();
1764
1765 let result = region.exit_staging_on_success(&mut manager).await;
1766 assert!(matches!(result, Err(Error::Unexpected { .. })));
1767 }
1768
1769 #[tokio::test]
1770 async fn test_set_region_state() {
1771 let env = SchedulerEnv::new().await;
1772 let builder = VersionControlBuilder::new();
1773 let version_control = Arc::new(builder.build());
1774 let manifest_ctx = env
1775 .mock_manifest_context(version_control.current().version.metadata.clone())
1776 .await;
1777
1778 let region_id = RegionId::new(1024, 0);
1779 manifest_ctx.set_role(RegionRole::Follower, region_id);
1781 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1782
1783 manifest_ctx.set_role(RegionRole::Leader, region_id);
1785 assert_eq!(
1786 manifest_ctx.state.load(),
1787 RegionRoleState::Leader(RegionLeaderState::Writable)
1788 );
1789
1790 manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
1792 assert_eq!(
1793 manifest_ctx.state.load(),
1794 RegionRoleState::Leader(RegionLeaderState::Writable)
1795 );
1796
1797 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1799 assert_eq!(
1800 manifest_ctx.state.load(),
1801 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1802 );
1803
1804 manifest_ctx.set_role(RegionRole::Follower, region_id);
1806 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1807
1808 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1810 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1811
1812 manifest_ctx.set_role(RegionRole::Leader, region_id);
1814 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1815 assert_eq!(
1816 manifest_ctx.state.load(),
1817 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1818 );
1819
1820 manifest_ctx.set_role(RegionRole::Leader, region_id);
1822 assert_eq!(
1823 manifest_ctx.state.load(),
1824 RegionRoleState::Leader(RegionLeaderState::Writable)
1825 );
1826 }
1827
1828 #[tokio::test]
1829 async fn test_staging_state_validation() {
1830 let env = SchedulerEnv::new().await;
1831 let builder = VersionControlBuilder::new();
1832 let version_control = Arc::new(builder.build());
1833
1834 let staging_ctx = {
1836 let manager = RegionManifestManager::new(
1837 version_control.current().version.metadata.clone(),
1838 0,
1839 RegionManifestOptions {
1840 manifest_dir: "".to_string(),
1841 object_store: env.access_layer.object_store().clone(),
1842 compress_type: CompressionType::Uncompressed,
1843 checkpoint_distance: 10,
1844 remove_file_options: Default::default(),
1845 manifest_cache: None,
1846 },
1847 FormatType::PrimaryKey,
1848 &Default::default(),
1849 )
1850 .await
1851 .unwrap();
1852 Arc::new(ManifestContext::new(
1853 manager,
1854 RegionRoleState::Leader(RegionLeaderState::Staging),
1855 ))
1856 };
1857
1858 assert_eq!(
1860 staging_ctx.current_state(),
1861 RegionRoleState::Leader(RegionLeaderState::Staging)
1862 );
1863
1864 let writable_ctx = env
1866 .mock_manifest_context(version_control.current().version.metadata.clone())
1867 .await;
1868
1869 assert_eq!(
1870 writable_ctx.current_state(),
1871 RegionRoleState::Leader(RegionLeaderState::Writable)
1872 );
1873 }
1874
1875 #[tokio::test]
1876 async fn test_staging_state_transitions() {
1877 let builder = VersionControlBuilder::new();
1878 let version_control = Arc::new(builder.build());
1879 let metadata = version_control.current().version.metadata.clone();
1880
1881 let temp_dir = create_temp_dir("");
1883 let path_str = temp_dir.path().display().to_string();
1884 let fs_builder = Fs::default().root(&path_str);
1885 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1886
1887 let index_aux_path = temp_dir.path().join("index_aux");
1888 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1889 .await
1890 .unwrap();
1891 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1892 .await
1893 .unwrap();
1894
1895 let access_layer = Arc::new(AccessLayer::new(
1896 "",
1897 PathType::Bare,
1898 object_store,
1899 puffin_mgr,
1900 intm_mgr,
1901 ));
1902
1903 let manager = RegionManifestManager::new(
1904 metadata.clone(),
1905 0,
1906 RegionManifestOptions {
1907 manifest_dir: "".to_string(),
1908 object_store: access_layer.object_store().clone(),
1909 compress_type: CompressionType::Uncompressed,
1910 checkpoint_distance: 10,
1911 remove_file_options: Default::default(),
1912 manifest_cache: None,
1913 },
1914 FormatType::PrimaryKey,
1915 &Default::default(),
1916 )
1917 .await
1918 .unwrap();
1919
1920 let manifest_ctx = Arc::new(ManifestContext::new(
1921 manager,
1922 RegionRoleState::Leader(RegionLeaderState::Writable),
1923 ));
1924
1925 let region = MitoRegion {
1926 region_id: metadata.region_id,
1927 version_control,
1928 access_layer,
1929 manifest_ctx: manifest_ctx.clone(),
1930 file_purger: crate::test_util::new_noop_file_purger(),
1931 provider: Provider::noop_provider(),
1932 last_flush_millis: Default::default(),
1933 last_compaction_millis: Default::default(),
1934 time_provider: Arc::new(StdTimeProvider),
1935 topic_latest_entry_id: Default::default(),
1936 written_bytes: Arc::new(AtomicU64::new(0)),
1937 stats: ManifestStats::default(),
1938 };
1939
1940 assert_eq!(
1942 region.state(),
1943 RegionRoleState::Leader(RegionLeaderState::Writable)
1944 );
1945 assert!(!region.is_staging());
1946
1947 let mut manager = manifest_ctx.manifest_manager.write().await;
1949 region.set_staging(&mut manager).await.unwrap();
1950 drop(manager);
1951 assert_eq!(
1952 region.state(),
1953 RegionRoleState::Leader(RegionLeaderState::Staging)
1954 );
1955 assert!(region.is_staging());
1956
1957 region.exit_staging().unwrap();
1959 assert_eq!(
1960 region.state(),
1961 RegionRoleState::Leader(RegionLeaderState::Writable)
1962 );
1963 assert!(!region.is_staging());
1964
1965 {
1967 let manager = manifest_ctx.manifest_manager.write().await;
1969 let dummy_actions = RegionMetaActionList::new(vec![]);
1970 let dummy_bytes = dummy_actions.encode().unwrap();
1971
1972 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1974 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1975 drop(manager);
1976
1977 let manager = manifest_ctx.manifest_manager.read().await;
1979 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1980 assert_eq!(
1981 dirty_manifests.len(),
1982 2,
1983 "Should have 2 dirty staging files"
1984 );
1985 drop(manager);
1986
1987 let mut manager = manifest_ctx.manifest_manager.write().await;
1989 region.set_staging(&mut manager).await.unwrap();
1990 drop(manager);
1991
1992 let manager = manifest_ctx.manifest_manager.read().await;
1994 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1995 assert_eq!(
1996 cleaned_manifests.len(),
1997 0,
1998 "Dirty staging files should be cleaned up"
1999 );
2000 drop(manager);
2001
2002 region.exit_staging().unwrap();
2004 }
2005
2006 let mut manager = manifest_ctx.manifest_manager.write().await;
2008 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
2010 let mut manager = manifest_ctx.manifest_manager.write().await;
2011 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
2013 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
2016}