1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_info::RegionInfoEntry;
41use store_api::region_request::{PathType, StagingPartitionDirective};
42use store_api::sst_entry::ManifestSstEntry;
43use store_api::storage::{FileId, RegionId, SequenceNumber};
44use tokio::sync::RwLockWriteGuard;
45pub use utils::*;
46
47use crate::access_layer::AccessLayerRef;
48use crate::error::{
49 FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
50 RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
51};
52use crate::manifest::action::{
53 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
54};
55use crate::manifest::manager::RegionManifestManager;
56use crate::region::version::{VersionControlRef, VersionRef};
57use crate::request::{OnFailure, OptionOutputTx};
58use crate::sst::file::FileMeta;
59use crate::sst::file_purger::FilePurgerRef;
60use crate::sst::location::{index_file_path, sst_file_path};
61use crate::time_provider::TimeProviderRef;
62
63const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
65
66#[derive(Debug)]
68pub struct RegionUsage {
69 pub region_id: RegionId,
70 pub wal_usage: u64,
71 pub sst_usage: u64,
72 pub manifest_usage: u64,
73}
74
75impl RegionUsage {
76 pub fn disk_usage(&self) -> u64 {
77 self.wal_usage + self.sst_usage + self.manifest_usage
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum RegionLeaderState {
83 Writable,
85 Staging,
87 EnteringStaging,
89 Altering,
91 Dropping,
93 Truncating,
95 Editing,
97 Downgrading,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum RegionRoleState {
103 Leader(RegionLeaderState),
104 Follower,
105}
106
107impl RegionRoleState {
108 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
110 match self {
111 RegionRoleState::Leader(leader_state) => Some(leader_state),
112 RegionRoleState::Follower => None,
113 }
114 }
115
116 pub(crate) fn as_str(&self) -> &'static str {
117 match self {
118 RegionRoleState::Follower => "Follower",
119 RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)",
120 RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)",
121 RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
122 "Leader(EnteringStaging)"
123 }
124 RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)",
125 RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)",
126 RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)",
127 RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)",
128 RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)",
129 }
130 }
131}
132
133#[derive(Debug)]
139pub struct MitoRegion {
140 pub(crate) region_id: RegionId,
145
146 pub(crate) version_control: VersionControlRef,
150 pub(crate) access_layer: AccessLayerRef,
152 pub(crate) manifest_ctx: ManifestContextRef,
154 pub(crate) file_purger: FilePurgerRef,
156 pub(crate) provider: Provider,
158 last_flush_millis: AtomicI64,
160 last_compaction_millis: AtomicI64,
162 time_provider: TimeProviderRef,
164 pub(crate) topic_latest_entry_id: AtomicU64,
174 pub(crate) written_bytes: Arc<AtomicU64>,
176 stats: ManifestStats,
178}
179
180pub type MitoRegionRef = Arc<MitoRegion>;
181
182#[derive(Debug, Clone)]
183pub(crate) struct StagingPartitionInfo {
184 pub(crate) partition_directive: StagingPartitionDirective,
185 pub(crate) partition_rule_version: u64,
186}
187
188impl StagingPartitionInfo {
189 pub(crate) fn partition_expr(&self) -> Option<&str> {
191 self.partition_directive.partition_expr()
192 }
193
194 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
196 let partition_rule_version = match &partition_directive {
197 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
198 partition_expr_version(Some(expr))
199 }
200 StagingPartitionDirective::RejectAllWrites => 0,
201 };
202 Self {
203 partition_directive,
204 partition_rule_version,
205 }
206 }
207}
208
209impl MitoRegion {
210 pub(crate) async fn stop(&self) {
212 self.manifest_ctx
213 .manifest_manager
214 .write()
215 .await
216 .stop()
217 .await;
218
219 info!(
220 "Stopped region manifest manager, region_id: {}",
221 self.region_id
222 );
223 }
224
225 pub fn metadata(&self) -> RegionMetadataRef {
227 let version_data = self.version_control.current();
228 version_data.version.metadata.clone()
229 }
230
231 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
233 let version_data = self.version_control.current();
234 version_data.version.metadata.primary_key_encoding
235 }
236
237 pub(crate) fn version(&self) -> VersionRef {
239 let version_data = self.version_control.current();
240 version_data.version
241 }
242
243 pub(crate) fn last_flush_millis(&self) -> i64 {
245 self.last_flush_millis.load(Ordering::Relaxed)
246 }
247
248 pub(crate) fn update_flush_millis(&self) {
250 let now = self.time_provider.current_time_millis();
251 self.last_flush_millis.store(now, Ordering::Relaxed);
252 }
253
254 pub(crate) fn last_compaction_millis(&self) -> i64 {
256 self.last_compaction_millis.load(Ordering::Relaxed)
257 }
258
259 pub(crate) fn update_compaction_millis(&self) {
261 let now = self.time_provider.current_time_millis();
262 self.last_compaction_millis.store(now, Ordering::Relaxed);
263 }
264
265 pub(crate) fn table_dir(&self) -> &str {
267 self.access_layer.table_dir()
268 }
269
270 pub(crate) fn path_type(&self) -> PathType {
272 self.access_layer.path_type()
273 }
274
275 pub(crate) fn is_writable(&self) -> bool {
277 matches!(
278 self.manifest_ctx.state.load(),
279 RegionRoleState::Leader(RegionLeaderState::Writable)
280 | RegionRoleState::Leader(RegionLeaderState::Staging)
281 )
282 }
283
284 pub(crate) fn is_flushable(&self) -> bool {
286 matches!(
287 self.manifest_ctx.state.load(),
288 RegionRoleState::Leader(RegionLeaderState::Writable)
289 | RegionRoleState::Leader(RegionLeaderState::Staging)
290 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
291 )
292 }
293
294 pub(crate) fn should_abort_index(&self) -> bool {
296 matches!(
297 self.manifest_ctx.state.load(),
298 RegionRoleState::Follower
299 | RegionRoleState::Leader(RegionLeaderState::Dropping)
300 | RegionRoleState::Leader(RegionLeaderState::Truncating)
301 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
302 | RegionRoleState::Leader(RegionLeaderState::Staging)
303 )
304 }
305
306 pub(crate) fn is_downgrading(&self) -> bool {
308 matches!(
309 self.manifest_ctx.state.load(),
310 RegionRoleState::Leader(RegionLeaderState::Downgrading)
311 )
312 }
313
314 pub(crate) fn is_staging(&self) -> bool {
316 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
317 }
318
319 pub(crate) fn is_enter_staging(&self) -> bool {
321 self.manifest_ctx.state.load()
322 == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
323 }
324
325 pub fn region_id(&self) -> RegionId {
326 self.region_id
327 }
328
329 pub fn find_committed_sequence(&self) -> SequenceNumber {
330 self.version_control.committed_sequence()
331 }
332
333 pub fn flushed_sequence(&self) -> SequenceNumber {
339 self.version_control.current().version.flushed_sequence
340 }
341
342 pub fn is_follower(&self) -> bool {
344 self.manifest_ctx.state.load() == RegionRoleState::Follower
345 }
346
347 pub(crate) fn state(&self) -> RegionRoleState {
349 self.manifest_ctx.state.load()
350 }
351
352 pub(crate) fn set_role(&self, next_role: RegionRole) {
354 self.manifest_ctx.set_role(next_role, self.region_id);
355 }
356
357 pub(crate) fn region_role(&self) -> RegionRole {
358 match self.state() {
359 RegionRoleState::Follower => RegionRole::Follower,
360 RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
361 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
362 RegionRole::DowngradingLeader
363 }
364 RegionRoleState::Leader(_) => RegionRole::Leader,
365 }
366 }
367
368 pub(crate) fn set_altering(&self) -> Result<()> {
371 self.compare_exchange_state(
372 RegionLeaderState::Writable,
373 RegionRoleState::Leader(RegionLeaderState::Altering),
374 )
375 }
376
377 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
380 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
381 }
382
383 pub(crate) fn set_truncating(&self) -> Result<()> {
386 self.compare_exchange_state(
387 RegionLeaderState::Writable,
388 RegionRoleState::Leader(RegionLeaderState::Truncating),
389 )
390 }
391
392 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
395 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
396 }
397
398 pub(crate) async fn set_staging(
404 &self,
405 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
406 ) -> Result<()> {
407 manager.store().clear_staging_manifests().await?;
408
409 self.compare_exchange_state(
410 RegionLeaderState::Writable,
411 RegionRoleState::Leader(RegionLeaderState::Staging),
412 )
413 }
414
415 pub(crate) fn set_entering_staging(&self) -> Result<()> {
417 self.compare_exchange_state(
418 RegionLeaderState::Writable,
419 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
420 )
421 }
422
423 pub fn exit_staging(&self) -> Result<()> {
428 self.manifest_ctx.exit_staging(
429 self.region_id,
430 RegionRoleState::Leader(RegionLeaderState::Writable),
431 )
432 }
433
434 pub(crate) async fn set_role_state_gracefully(
436 &self,
437 state: SettableRegionRoleState,
438 ) -> Result<()> {
439 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
440 self.manifest_ctx.manifest_manager.write().await;
441 let current_state = self.state();
442
443 match state {
444 SettableRegionRoleState::Leader => {
445 match current_state {
448 RegionRoleState::Leader(RegionLeaderState::Staging) => {
449 info!("Exiting staging mode for region {}", self.region_id);
450 self.exit_staging_on_success(&mut manager).await?;
452 }
453 RegionRoleState::Leader(RegionLeaderState::Writable) => {
454 info!("Region {} already in normal leader mode", self.region_id);
456 }
457 _ => {
458 return Err(RegionStateSnafu {
460 region_id: self.region_id,
461 state: current_state,
462 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
463 }
464 .build());
465 }
466 }
467 }
468
469 SettableRegionRoleState::StagingLeader => {
470 match current_state {
473 RegionRoleState::Leader(RegionLeaderState::Writable) => {
474 info!("Entering staging mode for region {}", self.region_id);
475 self.set_staging(&mut manager).await?;
476 }
477 RegionRoleState::Leader(RegionLeaderState::Staging) => {
478 info!("Region {} already in staging mode", self.region_id);
480 }
481 _ => {
482 return Err(RegionStateSnafu {
483 region_id: self.region_id,
484 state: current_state,
485 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
486 }
487 .build());
488 }
489 }
490 }
491
492 SettableRegionRoleState::Follower => {
493 match current_state {
495 RegionRoleState::Leader(RegionLeaderState::Staging) => {
496 info!(
497 "Exiting staging and demoting region {} to follower",
498 self.region_id
499 );
500 self.exit_staging()?;
501 self.set_role(RegionRole::Follower);
502 }
503 RegionRoleState::Leader(_) => {
504 info!("Demoting region {} from leader to follower", self.region_id);
505 self.set_role(RegionRole::Follower);
506 }
507 RegionRoleState::Follower => {
508 info!("Region {} already in follower mode", self.region_id);
510 }
511 }
512 }
513
514 SettableRegionRoleState::DowngradingLeader => {
515 match current_state {
517 RegionRoleState::Leader(RegionLeaderState::Staging) => {
518 info!(
519 "Exiting staging and entering downgrade for region {}",
520 self.region_id
521 );
522 self.exit_staging()?;
523 self.set_role(RegionRole::DowngradingLeader);
524 }
525 RegionRoleState::Leader(RegionLeaderState::Writable) => {
526 info!("Starting downgrade for region {}", self.region_id);
527 self.set_role(RegionRole::DowngradingLeader);
528 }
529 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
530 info!("Region {} already in downgrading mode", self.region_id);
532 }
533 _ => {
534 warn!(
535 "Cannot start downgrade for region {} from state {:?}",
536 self.region_id, current_state
537 );
538 }
539 }
540 }
541 }
542
543 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
545 let manifest_meta = &manager.manifest().metadata;
547 let current_version = self.version();
548 let current_meta = ¤t_version.metadata;
549 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
550 let action = RegionMetaAction::Change(RegionChange {
551 metadata: current_meta.clone(),
552 sst_format: current_version.options.sst_format.unwrap_or_default(),
553 append_mode: None,
554 });
555 let result = manager
556 .update(RegionMetaActionList::with_action(action), false)
557 .await;
558
559 match result {
560 Ok(version) => {
561 info!(
562 "Successfully persisted backfilled metadata for region {}, version: {}",
563 self.region_id, version
564 );
565 }
566 Err(e) => {
567 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
568 }
569 }
570 }
571 }
572
573 drop(manager);
574
575 Ok(())
576 }
577
578 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
581 if let Err(e) = self
582 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
583 {
584 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
585 }
586 }
587
588 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
591 if let Err(e) =
592 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
593 {
594 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
595 }
596 }
597
598 pub(crate) fn region_statistic(&self) -> RegionStatistic {
600 let version = self.version();
601 let memtables = &version.memtables;
602 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
603
604 let sst_usage = version.ssts.owned_sst_usage(self.region_id);
605 let index_usage = version.ssts.owned_index_usage(self.region_id);
606 let flushed_entry_id = version.flushed_entry_id;
607
608 let wal_usage = self.estimated_wal_usage(memtable_usage);
609 let manifest_usage = self.stats.total_manifest_size();
610 let num_rows = version.ssts.owned_num_rows(self.region_id) + version.memtables.num_rows();
611 let num_files = version.ssts.owned_num_files(self.region_id);
612 let manifest_version = self.stats.manifest_version();
613 let file_removed_cnt = self.stats.file_removed_cnt();
614
615 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
616 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
617
618 RegionStatistic {
619 num_rows,
620 memtable_size: memtable_usage,
621 wal_size: wal_usage,
622 manifest_size: manifest_usage,
623 sst_size: sst_usage,
624 sst_num: num_files,
625 index_size: index_usage,
626 manifest: RegionManifestInfo::Mito {
627 manifest_version,
628 flushed_entry_id,
629 file_removed_cnt,
630 },
631 data_topic_latest_entry_id: topic_latest_entry_id,
632 metadata_topic_latest_entry_id: topic_latest_entry_id,
633 written_bytes,
634 }
635 }
636
637 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
640 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
641 }
642
643 fn compare_exchange_state(
646 &self,
647 expect: RegionLeaderState,
648 state: RegionRoleState,
649 ) -> Result<()> {
650 self.manifest_ctx
651 .state
652 .compare_exchange(RegionRoleState::Leader(expect), state)
653 .map_err(|actual| {
654 RegionStateSnafu {
655 region_id: self.region_id,
656 state: actual,
657 expect: RegionRoleState::Leader(expect),
658 }
659 .build()
660 })?;
661 Ok(())
662 }
663
664 pub fn access_layer(&self) -> AccessLayerRef {
665 self.access_layer.clone()
666 }
667
668 pub(crate) fn region_info_entry(&self, node_id: Option<u64>) -> RegionInfoEntry {
670 let region_id = self.region_id;
671 let version = self.version();
672 let state = self.state();
673 let role = self.region_role();
674 let region_options = serde_json::to_string(&version.options)
675 .unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string());
676 let sst_format = match version.options.sst_format.unwrap_or_default() {
677 crate::sst::FormatType::PrimaryKey => "primary_key",
678 crate::sst::FormatType::Flat => "flat",
679 }
680 .to_string();
681
682 RegionInfoEntry {
683 region_id,
684 table_id: region_id.table_id(),
685 region_number: region_id.region_number(),
686 region_group: region_id.region_group(),
687 region_sequence: region_id.region_sequence(),
688 state: state.as_str().to_string(),
689 role: role.to_string(),
690 writable: self.is_writable(),
691 committed_sequence: self.find_committed_sequence(),
692 flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0),
693 manifest_version: self.stats.manifest_version(),
694 compaction_time_window: version
695 .compaction_time_window
696 .map(|duration| humantime::format_duration(duration).to_string()),
697 region_options,
698 sst_format,
699 node_id,
700 }
701 }
702
703 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
705 let table_dir = self.table_dir();
706 let path_type = self.access_layer.path_type();
707
708 let visible_ssts = self
709 .version()
710 .ssts
711 .levels()
712 .iter()
713 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
714 .collect::<HashSet<_>>();
715
716 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
717 let staging_files = self
718 .manifest_ctx
719 .staging_manifest()
720 .await
721 .map(|m| m.files.clone())
722 .unwrap_or_default();
723 let files = manifest_files
724 .into_iter()
725 .chain(staging_files)
726 .collect::<HashMap<_, _>>();
727
728 files
729 .values()
730 .map(|meta| {
731 let region_id = self.region_id;
732 let origin_region_id = meta.region_id;
733 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
734 {
735 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
736 (
737 meta.index_version,
738 Some(index_file_path),
739 Some(meta.index_file_size),
740 )
741 } else {
742 (0, None, None)
743 };
744 let visible = visible_ssts.contains(&meta.file_id);
745 ManifestSstEntry {
746 table_dir: table_dir.to_string(),
747 region_id,
748 table_id: region_id.table_id(),
749 region_number: region_id.region_number(),
750 region_group: region_id.region_group(),
751 region_sequence: region_id.region_sequence(),
752 file_id: meta.file_id.to_string(),
753 index_version,
754 level: meta.level,
755 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
756 file_size: meta.file_size,
757 index_file_path,
758 index_file_size,
759 num_rows: meta.num_rows,
760 num_row_groups: meta.num_row_groups,
761 num_series: Some(meta.num_series),
762 min_ts: meta.time_range.0,
763 max_ts: meta.time_range.1,
764 sequence: meta.sequence.map(|s| s.get()),
765 origin_region_id,
766 node_id: None,
767 visible,
768 primary_key_min: meta.primary_key_min.clone(),
769 primary_key_max: meta.primary_key_max.clone(),
770 }
771 })
772 .collect()
773 }
774
775 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
777 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
778
779 file_ids
780 .iter()
781 .map(|file_id| manifest_files.get(file_id).cloned())
782 .collect::<Vec<_>>()
783 }
784
785 pub(crate) async fn exit_staging_on_success(
787 &self,
788 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
789 ) -> Result<()> {
790 let current_state = self.manifest_ctx.current_state();
791 ensure!(
792 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
793 RegionStateSnafu {
794 region_id: self.region_id,
795 state: current_state,
796 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
797 }
798 );
799
800 let merged_actions = match manager.merge_staged_actions(current_state).await? {
802 Some(actions) => actions,
803 None => {
804 info!(
805 "No staged manifests to merge for region {}, exiting staging mode without changes",
806 self.region_id
807 );
808 self.exit_staging()?;
810 return Ok(());
811 }
812 };
813 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
814 let expect_partition_expr_change = merged_actions
815 .actions
816 .iter()
817 .any(|a| a.is_partition_expr_change());
818 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
819 ensure!(
820 !(expect_change && expect_partition_expr_change),
821 UnexpectedSnafu {
822 reason: "unexpected both change and partition expr change actions in merged actions"
823 }
824 );
825 ensure!(
826 expect_change || expect_partition_expr_change,
827 UnexpectedSnafu {
828 reason: "expect a change or partition expr change action in merged actions"
829 }
830 );
831 ensure!(
832 expect_edit,
833 UnexpectedSnafu {
834 reason: "expect an edit action in merged actions"
835 }
836 );
837
838 let (merged_partition_expr_change, merged_change, merged_edit) =
839 merged_actions.clone().split_region_change_and_edit();
840 if let Some(change) = &merged_change {
841 let current_column_metadatas = &self.version().metadata.column_metadatas;
845 ensure!(
846 change.metadata.column_metadatas == *current_column_metadatas,
847 UnexpectedSnafu {
848 reason: "change action alters column metadata in staging exit"
849 }
850 );
851 }
852
853 let new_version = manager.update(merged_actions, false).await?;
856 info!(
857 "Successfully submitted merged staged manifests for region {}, new version: {}",
858 self.region_id, new_version
859 );
860
861 if let Some(change) = merged_partition_expr_change {
863 let mut new_metadata = self.version().metadata.as_ref().clone();
864 new_metadata.set_partition_expr(change.partition_expr);
865 self.version_control.alter_metadata(new_metadata.into());
866 }
867 if let Some(change) = merged_change {
868 self.version_control.alter_metadata(change.metadata);
869 }
870 self.version_control
871 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
872
873 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
875 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
876 }
877 self.exit_staging()?;
878
879 Ok(())
880 }
881
882 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
888 let is_staging = self.is_staging();
889 if is_staging {
890 let staging_partition_info = self.manifest_ctx.staging_partition_info();
891 if staging_partition_info.is_none() {
892 warn!(
893 "Staging partition expr is none for region {} in staging state",
894 self.region_id
895 );
896 }
897 staging_partition_info
898 .as_ref()
899 .and_then(|info| info.partition_expr().map(ToString::to_string))
900 } else {
901 let version = self.version();
902 version.metadata.partition_expr.clone()
903 }
904 }
905
906 pub fn expected_partition_expr_version(&self) -> u64 {
907 if self.is_staging() {
908 self.manifest_ctx
909 .staging_partition_info()
910 .as_ref()
911 .map(|info| info.partition_rule_version)
912 .unwrap_or_default()
913 } else {
914 self.version().metadata.partition_expr_version
915 }
916 }
917
918 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
920 if !self.is_staging() {
921 return false;
922 }
923 self.manifest_ctx
924 .staging_partition_info()
925 .as_ref()
926 .map(|info| {
927 matches!(
928 info.partition_directive,
929 StagingPartitionDirective::RejectAllWrites
930 )
931 })
932 .unwrap_or(false)
933 }
934}
935
936#[derive(Debug)]
938pub(crate) struct ManifestContext {
939 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
941 state: AtomicCell<RegionRoleState>,
944 staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
949}
950
951impl ManifestContext {
952 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
953 ManifestContext {
954 manifest_manager: tokio::sync::RwLock::new(manager),
955 state: AtomicCell::new(state),
956 staging_partition_info: Mutex::new(None),
957 }
958 }
959
960 pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
961 self.staging_partition_info.lock().unwrap().clone()
962 }
963
964 pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
965 let mut current = self.staging_partition_info.lock().unwrap();
966 debug_assert!(current.is_none());
967 *current = Some(staging_partition_info);
968 }
969
970 fn clear_staging_partition_info(&self) {
971 *self.staging_partition_info.lock().unwrap() = None;
972 }
973
974 pub(crate) fn exit_staging(
975 &self,
976 region_id: RegionId,
977 next_state: RegionRoleState,
978 ) -> Result<()> {
979 self.state
980 .compare_exchange(
981 RegionRoleState::Leader(RegionLeaderState::Staging),
982 next_state,
983 )
984 .map_err(|actual| {
985 RegionStateSnafu {
986 region_id,
987 state: actual,
988 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
989 }
990 .build()
991 })?;
992 self.clear_staging_partition_info();
993 Ok(())
994 }
995
996 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
997 self.manifest_manager
998 .read()
999 .await
1000 .manifest()
1001 .manifest_version
1002 }
1003
1004 pub(crate) async fn has_update(&self) -> Result<bool> {
1005 self.manifest_manager.read().await.has_update().await
1006 }
1007
1008 pub(crate) fn current_state(&self) -> RegionRoleState {
1010 self.state.load()
1011 }
1012
1013 pub(crate) async fn install_manifest_to(
1019 &self,
1020 version: ManifestVersion,
1021 ) -> Result<Arc<RegionManifest>> {
1022 let mut manager = self.manifest_manager.write().await;
1023 manager.install_manifest_to(version).await?;
1024
1025 Ok(manager.manifest())
1026 }
1027
1028 pub(crate) async fn update_manifest(
1030 &self,
1031 expect_state: RegionLeaderState,
1032 action_list: RegionMetaActionList,
1033 is_staging: bool,
1034 ) -> Result<ManifestVersion> {
1035 self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| {
1036 if expect_state != RegionLeaderState::Downgrading {
1041 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1042 info!(
1043 "Region {} is in downgrading leader state, updating manifest. Expect state is {:?}",
1044 region_id, expect_state
1045 );
1046 }
1047 ensure!(
1048 current_state == RegionRoleState::Leader(expect_state)
1049 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
1050 UpdateManifestSnafu {
1051 region_id,
1052 state: current_state,
1053 }
1054 );
1055 } else {
1056 ensure!(
1057 current_state == RegionRoleState::Leader(expect_state),
1058 RegionStateSnafu {
1059 region_id,
1060 state: current_state,
1061 expect: RegionRoleState::Leader(expect_state),
1062 }
1063 );
1064 }
1065
1066 Ok(())
1067 })
1068 .await
1069 }
1070
1071 pub(crate) async fn update_manifest_for_compaction(
1088 &self,
1089 action_list: RegionMetaActionList,
1090 ) -> Result<ManifestVersion> {
1091 self.update_manifest_with_state_check(action_list, false, |current_state, region_id| {
1092 ensure!(
1093 matches!(
1094 current_state,
1095 RegionRoleState::Leader(RegionLeaderState::Writable)
1096 | RegionRoleState::Leader(RegionLeaderState::Editing)
1097 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1098 ),
1099 UpdateManifestSnafu {
1100 region_id,
1101 state: current_state,
1102 }
1103 );
1104
1105 Ok(())
1106 })
1107 .await
1108 }
1109
1110 async fn update_manifest_with_state_check(
1111 &self,
1112 action_list: RegionMetaActionList,
1113 is_staging: bool,
1114 check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>,
1115 ) -> Result<ManifestVersion> {
1116 let mut manager = self.manifest_manager.write().await;
1118 let manifest = manager.manifest();
1120 let current_state = self.state.load();
1123 check_state(current_state, manifest.metadata.region_id)?;
1124
1125 for action in &action_list.actions {
1126 let RegionMetaAction::Edit(edit) = &action else {
1128 continue;
1129 };
1130
1131 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1133 continue;
1134 };
1135
1136 if let Some(flushed_entry_id) = edit.flushed_entry_id {
1138 let is_newer_entry = truncated_entry_id < flushed_entry_id;
1148 let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1149 && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1150 manifest.flushed_sequence < flushed_sequence
1151 });
1152
1153 ensure!(
1154 is_newer_entry || is_same_entry_with_newer_sequence,
1155 RegionTruncatedSnafu {
1156 region_id: manifest.metadata.region_id,
1157 }
1158 );
1159 }
1160
1161 if !edit.files_to_remove.is_empty() {
1163 for file in &edit.files_to_remove {
1165 ensure!(
1166 manifest.files.contains_key(&file.file_id),
1167 RegionTruncatedSnafu {
1168 region_id: manifest.metadata.region_id,
1169 }
1170 );
1171 }
1172 }
1173 }
1174
1175 let version = manager.update(action_list, is_staging).await.inspect_err(
1177 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1178 )?;
1179
1180 if self.state.load() == RegionRoleState::Follower {
1181 warn!(
1182 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1183 manifest.metadata.region_id
1184 );
1185 }
1186
1187 Ok(version)
1188 }
1189
1190 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1224 match next_role {
1225 RegionRole::Follower => {
1226 if self
1227 .exit_staging(region_id, RegionRoleState::Follower)
1228 .is_ok()
1229 {
1230 info!(
1231 "Convert region {} to follower, previous role state: {:?}",
1232 region_id,
1233 RegionRoleState::Leader(RegionLeaderState::Staging)
1234 );
1235 return;
1236 }
1237 match self.state.fetch_update(|state| {
1238 if !matches!(state, RegionRoleState::Follower) {
1239 Some(RegionRoleState::Follower)
1240 } else {
1241 None
1242 }
1243 }) {
1244 Ok(state) => info!(
1245 "Convert region {} to follower, previous role state: {:?}",
1246 region_id, state
1247 ),
1248 Err(state) => {
1249 if state != RegionRoleState::Follower {
1250 warn!(
1251 "Failed to convert region {} to follower, current role state: {:?}",
1252 region_id, state
1253 )
1254 }
1255 }
1256 }
1257 }
1258 RegionRole::Leader => {
1259 if self
1260 .exit_staging(
1261 region_id,
1262 RegionRoleState::Leader(RegionLeaderState::Writable),
1263 )
1264 .is_ok()
1265 {
1266 info!(
1267 "Convert region {} to leader, previous role state: {:?}",
1268 region_id,
1269 RegionRoleState::Leader(RegionLeaderState::Staging)
1270 );
1271 return;
1272 }
1273 match self.state.fetch_update(|state| {
1274 if matches!(
1275 state,
1276 RegionRoleState::Follower
1277 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1278 ) {
1279 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1280 } else {
1281 None
1282 }
1283 }) {
1284 Ok(state) => info!(
1285 "Convert region {} to leader, previous role state: {:?}",
1286 region_id, state
1287 ),
1288 Err(state) => {
1289 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1290 warn!(
1291 "Failed to convert region {} to leader, current role state: {:?}",
1292 region_id, state
1293 )
1294 }
1295 }
1296 }
1297 }
1298 RegionRole::StagingLeader => {
1299 info!(
1300 "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1301 region_id
1302 );
1303 }
1304 RegionRole::DowngradingLeader => {
1305 if self
1306 .exit_staging(
1307 region_id,
1308 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1309 )
1310 .is_ok()
1311 {
1312 info!(
1313 "Convert region {} to downgrading region, previous role state: {:?}",
1314 region_id,
1315 RegionRoleState::Leader(RegionLeaderState::Staging)
1316 );
1317 return;
1318 }
1319 match self.state.compare_exchange(
1320 RegionRoleState::Leader(RegionLeaderState::Writable),
1321 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1322 ) {
1323 Ok(state) => info!(
1324 "Convert region {} to downgrading region, previous role state: {:?}",
1325 region_id, state
1326 ),
1327 Err(state) => {
1328 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1329 warn!(
1330 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1331 region_id, state
1332 )
1333 }
1334 }
1335 }
1336 }
1337 }
1338 }
1339
1340 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1342 self.manifest_manager.read().await.manifest()
1343 }
1344
1345 pub(crate) async fn staging_manifest(
1347 &self,
1348 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1349 self.manifest_manager.read().await.staging_manifest()
1350 }
1351}
1352
1353pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1354
1355#[derive(Debug, Default)]
1357pub(crate) struct RegionMap {
1358 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1359}
1360
1361impl RegionMap {
1362 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1364 let regions = self.regions.read().unwrap();
1365 regions.contains_key(®ion_id)
1366 }
1367
1368 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1370 let mut regions = self.regions.write().unwrap();
1371 regions.insert(region.region_id, region);
1372 }
1373
1374 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1376 let regions = self.regions.read().unwrap();
1377 regions.get(®ion_id).cloned()
1378 }
1379
1380 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1384 let region = self
1385 .get_region(region_id)
1386 .context(RegionNotFoundSnafu { region_id })?;
1387 ensure!(
1388 region.is_writable(),
1389 RegionStateSnafu {
1390 region_id,
1391 state: region.state(),
1392 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1393 }
1394 );
1395 Ok(region)
1396 }
1397
1398 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1402 let region = self
1403 .get_region(region_id)
1404 .context(RegionNotFoundSnafu { region_id })?;
1405 ensure!(
1406 region.is_follower(),
1407 RegionStateSnafu {
1408 region_id,
1409 state: region.state(),
1410 expect: RegionRoleState::Follower,
1411 }
1412 );
1413
1414 Ok(region)
1415 }
1416
1417 pub(crate) fn get_region_or<F: OnFailure>(
1421 &self,
1422 region_id: RegionId,
1423 cb: &mut F,
1424 ) -> Option<MitoRegionRef> {
1425 match self
1426 .get_region(region_id)
1427 .context(RegionNotFoundSnafu { region_id })
1428 {
1429 Ok(region) => Some(region),
1430 Err(e) => {
1431 cb.on_failure(e);
1432 None
1433 }
1434 }
1435 }
1436
1437 pub(crate) fn writable_region_or<F: OnFailure>(
1441 &self,
1442 region_id: RegionId,
1443 cb: &mut F,
1444 ) -> Option<MitoRegionRef> {
1445 match self.writable_region(region_id) {
1446 Ok(region) => Some(region),
1447 Err(e) => {
1448 cb.on_failure(e);
1449 None
1450 }
1451 }
1452 }
1453
1454 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1458 let region = self.writable_region(region_id)?;
1459 if region.is_staging() {
1460 return Err(crate::error::RegionStateSnafu {
1461 region_id,
1462 state: region.state(),
1463 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1464 }
1465 .build());
1466 }
1467 Ok(region)
1468 }
1469
1470 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1474 let region = self
1475 .get_region(region_id)
1476 .context(RegionNotFoundSnafu { region_id })?;
1477 ensure!(
1478 region.is_staging(),
1479 RegionStateSnafu {
1480 region_id,
1481 state: region.state(),
1482 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1483 }
1484 );
1485 Ok(region)
1486 }
1487
1488 pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1492 let region = self
1493 .get_region(region_id)
1494 .context(RegionNotFoundSnafu { region_id })?;
1495 ensure!(
1496 region.is_flushable(),
1497 FlushableRegionStateSnafu {
1498 region_id,
1499 state: region.state(),
1500 }
1501 );
1502 Ok(region)
1503 }
1504
1505 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1507 let mut regions = self.regions.write().unwrap();
1508 regions.remove(®ion_id)
1509 }
1510
1511 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1513 let regions = self.regions.read().unwrap();
1514 regions.values().cloned().collect()
1515 }
1516
1517 pub(crate) fn clear(&self) {
1519 self.regions.write().unwrap().clear();
1520 }
1521}
1522
1523pub(crate) type RegionMapRef = Arc<RegionMap>;
1524
1525#[derive(Debug, Default)]
1527pub(crate) struct OpeningRegions {
1528 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1529}
1530
1531impl OpeningRegions {
1532 pub(crate) fn wait_for_opening_region(
1534 &self,
1535 region_id: RegionId,
1536 sender: OptionOutputTx,
1537 ) -> Option<OptionOutputTx> {
1538 let mut regions = self.regions.write().unwrap();
1539 match regions.entry(region_id) {
1540 Entry::Occupied(mut senders) => {
1541 senders.get_mut().push(sender);
1542 None
1543 }
1544 Entry::Vacant(_) => Some(sender),
1545 }
1546 }
1547
1548 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1550 let regions = self.regions.read().unwrap();
1551 regions.contains_key(®ion_id)
1552 }
1553
1554 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1556 let mut regions = self.regions.write().unwrap();
1557 regions.insert(region, vec![sender]);
1558 }
1559
1560 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1562 let mut regions = self.regions.write().unwrap();
1563 regions.remove(®ion_id).unwrap_or_default()
1564 }
1565
1566 #[cfg(test)]
1567 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1568 let regions = self.regions.read().unwrap();
1569 if let Some(senders) = regions.get(®ion_id) {
1570 senders.len()
1571 } else {
1572 0
1573 }
1574 }
1575}
1576
1577pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1578
1579#[derive(Debug, Default)]
1581pub(crate) struct CatchupRegions {
1582 regions: RwLock<HashSet<RegionId>>,
1583}
1584
1585impl CatchupRegions {
1586 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1588 let regions = self.regions.read().unwrap();
1589 regions.contains(®ion_id)
1590 }
1591
1592 pub(crate) fn insert_region(&self, region_id: RegionId) {
1594 let mut regions = self.regions.write().unwrap();
1595 regions.insert(region_id);
1596 }
1597
1598 pub(crate) fn remove_region(&self, region_id: RegionId) {
1600 let mut regions = self.regions.write().unwrap();
1601 regions.remove(®ion_id);
1602 }
1603}
1604
1605pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1606
1607#[derive(Default, Debug, Clone)]
1609pub struct ManifestStats {
1610 pub(crate) total_manifest_size: Arc<AtomicU64>,
1611 pub(crate) manifest_version: Arc<AtomicU64>,
1612 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1613}
1614
1615impl ManifestStats {
1616 fn total_manifest_size(&self) -> u64 {
1617 self.total_manifest_size.load(Ordering::Relaxed)
1618 }
1619
1620 fn manifest_version(&self) -> u64 {
1621 self.manifest_version.load(Ordering::Relaxed)
1622 }
1623
1624 fn file_removed_cnt(&self) -> u64 {
1625 self.file_removed_cnt.load(Ordering::Relaxed)
1626 }
1627}
1628
1629pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1631 match partition_expr_str {
1632 None => Ok(None),
1633 Some("") => Ok(None),
1634 Some(json_str) => {
1635 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1636 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1637 Ok(expr)
1638 }
1639 }
1640}
1641
1642#[cfg(test)]
1643mod tests {
1644 use std::sync::Arc;
1645 use std::sync::atomic::AtomicU64;
1646
1647 use common_datasource::compression::CompressionType;
1648 use common_test_util::temp_dir::create_temp_dir;
1649 use crossbeam_utils::atomic::AtomicCell;
1650 use object_store::ObjectStore;
1651 use object_store::services::Fs;
1652 use store_api::logstore::provider::Provider;
1653 use store_api::region_engine::RegionRole;
1654 use store_api::region_request::PathType;
1655 use store_api::storage::{FileId, RegionId};
1656
1657 use crate::access_layer::AccessLayer;
1658 use crate::error::Error;
1659 use crate::manifest::action::{
1660 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1661 };
1662 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1663 use crate::region::{
1664 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1665 };
1666 use crate::sst::FormatType;
1667 use crate::sst::index::intermediate::IntermediateManager;
1668 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1669 use crate::test_util::scheduler_util::SchedulerEnv;
1670 use crate::test_util::version_util::VersionControlBuilder;
1671 use crate::time_provider::StdTimeProvider;
1672
1673 #[test]
1674 fn test_region_state_lock_free() {
1675 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1676 }
1677
1678 #[test]
1679 fn test_region_role_state_as_str() {
1680 assert_eq!("Follower", RegionRoleState::Follower.as_str());
1681 assert_eq!(
1682 "Leader(Writable)",
1683 RegionRoleState::Leader(RegionLeaderState::Writable).as_str()
1684 );
1685 assert_eq!(
1686 "Leader(Staging)",
1687 RegionRoleState::Leader(RegionLeaderState::Staging).as_str()
1688 );
1689 assert_eq!(
1690 "Leader(Downgrading)",
1691 RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str()
1692 );
1693 }
1694
1695 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1696 let builder = VersionControlBuilder::new();
1697 let version_control = Arc::new(builder.build());
1698 let metadata = version_control.current().version.metadata.clone();
1699
1700 let manager = RegionManifestManager::new(
1701 metadata.clone(),
1702 0,
1703 RegionManifestOptions {
1704 manifest_dir: "".to_string(),
1705 object_store: env.access_layer.object_store().clone(),
1706 compress_type: CompressionType::Uncompressed,
1707 checkpoint_distance: 10,
1708 remove_file_options: Default::default(),
1709 manifest_cache: None,
1710 },
1711 FormatType::PrimaryKey,
1712 &Default::default(),
1713 )
1714 .await
1715 .unwrap();
1716
1717 let manifest_ctx = Arc::new(ManifestContext::new(
1718 manager,
1719 RegionRoleState::Leader(RegionLeaderState::Writable),
1720 ));
1721
1722 MitoRegion {
1723 region_id: metadata.region_id,
1724 version_control,
1725 access_layer: env.access_layer.clone(),
1726 manifest_ctx,
1727 file_purger: crate::test_util::new_noop_file_purger(),
1728 provider: Provider::noop_provider(),
1729 last_flush_millis: Default::default(),
1730 last_compaction_millis: Default::default(),
1731 time_provider: Arc::new(StdTimeProvider),
1732 topic_latest_entry_id: Default::default(),
1733 written_bytes: Arc::new(AtomicU64::new(0)),
1734 stats: ManifestStats::default(),
1735 }
1736 }
1737
1738 fn empty_edit() -> RegionEdit {
1739 RegionEdit {
1740 files_to_add: Vec::new(),
1741 files_to_remove: Vec::new(),
1742 timestamp_ms: None,
1743 compaction_time_window: None,
1744 flushed_entry_id: None,
1745 flushed_sequence: None,
1746 committed_sequence: None,
1747 }
1748 }
1749
1750 #[tokio::test]
1751 async fn test_compaction_update_manifest_allows_editing_state() {
1752 let env = SchedulerEnv::new().await;
1753 let region = build_test_region(&env).await;
1754 region.set_editing(RegionLeaderState::Writable).unwrap();
1755
1756 let file_id = FileId::random();
1757 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit {
1758 files_to_add: vec![crate::sst::file::FileMeta {
1759 region_id: region.region_id,
1760 file_id,
1761 level: 1,
1762 ..Default::default()
1763 }],
1764 files_to_remove: Vec::new(),
1765 timestamp_ms: None,
1766 compaction_time_window: None,
1767 flushed_entry_id: None,
1768 flushed_sequence: None,
1769 committed_sequence: None,
1770 }));
1771
1772 region
1773 .manifest_ctx
1774 .update_manifest_for_compaction(action_list)
1775 .await
1776 .unwrap();
1777
1778 assert!(
1779 region
1780 .manifest_ctx
1781 .manifest()
1782 .await
1783 .files
1784 .contains_key(&file_id)
1785 );
1786 }
1787
1788 #[tokio::test]
1789 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1790 let env = SchedulerEnv::new().await;
1791 let region = build_test_region(&env).await;
1792
1793 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1794 region.set_staging(&mut manager).await.unwrap();
1795 manager
1796 .update(
1797 RegionMetaActionList::new(vec![
1798 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1799 partition_expr: Some("expr_a".to_string()),
1800 }),
1801 RegionMetaAction::Edit(empty_edit()),
1802 ]),
1803 true,
1804 )
1805 .await
1806 .unwrap();
1807
1808 region.exit_staging_on_success(&mut manager).await.unwrap();
1809 drop(manager);
1810
1811 assert_eq!(
1812 region.version().metadata.partition_expr.as_deref(),
1813 Some("expr_a")
1814 );
1815 assert_eq!(
1816 region.state(),
1817 RegionRoleState::Leader(RegionLeaderState::Writable)
1818 );
1819 }
1820
1821 #[tokio::test]
1822 async fn test_exit_staging_change_with_same_columns_success() {
1823 let env = SchedulerEnv::new().await;
1824 let region = build_test_region(&env).await;
1825
1826 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1827 region.set_staging(&mut manager).await.unwrap();
1828
1829 let mut changed_metadata = region.version().metadata.as_ref().clone();
1830 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1831
1832 manager
1833 .update(
1834 RegionMetaActionList::new(vec![
1835 RegionMetaAction::Change(RegionChange {
1836 metadata: Arc::new(changed_metadata),
1837 sst_format: FormatType::PrimaryKey,
1838 append_mode: None,
1839 }),
1840 RegionMetaAction::Edit(empty_edit()),
1841 ]),
1842 true,
1843 )
1844 .await
1845 .unwrap();
1846
1847 region.exit_staging_on_success(&mut manager).await.unwrap();
1848 drop(manager);
1849
1850 assert_eq!(
1851 region.version().metadata.partition_expr.as_deref(),
1852 Some("expr_b")
1853 );
1854 assert_eq!(
1855 region.state(),
1856 RegionRoleState::Leader(RegionLeaderState::Writable)
1857 );
1858 }
1859
1860 #[tokio::test]
1861 async fn test_exit_staging_change_with_different_columns_fails() {
1862 let env = SchedulerEnv::new().await;
1863 let region = build_test_region(&env).await;
1864
1865 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1866 region.set_staging(&mut manager).await.unwrap();
1867
1868 let mut changed_metadata = region.version().metadata.as_ref().clone();
1869 changed_metadata.column_metadatas.rotate_left(1);
1870
1871 manager
1872 .update(
1873 RegionMetaActionList::new(vec![
1874 RegionMetaAction::Change(RegionChange {
1875 metadata: Arc::new(changed_metadata),
1876 sst_format: FormatType::PrimaryKey,
1877 append_mode: None,
1878 }),
1879 RegionMetaAction::Edit(empty_edit()),
1880 ]),
1881 true,
1882 )
1883 .await
1884 .unwrap();
1885
1886 let result = region.exit_staging_on_success(&mut manager).await;
1887 assert!(matches!(result, Err(Error::Unexpected { .. })));
1888 }
1889
1890 #[tokio::test]
1891 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1892 let env = SchedulerEnv::new().await;
1893 let region = build_test_region(&env).await;
1894
1895 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1896 region.set_staging(&mut manager).await.unwrap();
1897
1898 let mut changed_metadata = region.version().metadata.as_ref().clone();
1899 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1900
1901 manager
1902 .update(
1903 RegionMetaActionList::new(vec![
1904 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1905 partition_expr: Some("expr_c".to_string()),
1906 }),
1907 RegionMetaAction::Change(RegionChange {
1908 metadata: Arc::new(changed_metadata),
1909 sst_format: FormatType::PrimaryKey,
1910 append_mode: None,
1911 }),
1912 RegionMetaAction::Edit(empty_edit()),
1913 ]),
1914 true,
1915 )
1916 .await
1917 .unwrap();
1918
1919 let result = region.exit_staging_on_success(&mut manager).await;
1920 assert!(matches!(result, Err(Error::Unexpected { .. })));
1921 }
1922
1923 #[tokio::test]
1924 async fn test_set_region_state() {
1925 let env = SchedulerEnv::new().await;
1926 let builder = VersionControlBuilder::new();
1927 let version_control = Arc::new(builder.build());
1928 let manifest_ctx = env
1929 .mock_manifest_context(version_control.current().version.metadata.clone())
1930 .await;
1931
1932 let region_id = RegionId::new(1024, 0);
1933 manifest_ctx.set_role(RegionRole::Follower, region_id);
1935 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1936
1937 manifest_ctx.set_role(RegionRole::Leader, region_id);
1939 assert_eq!(
1940 manifest_ctx.state.load(),
1941 RegionRoleState::Leader(RegionLeaderState::Writable)
1942 );
1943
1944 manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
1946 assert_eq!(
1947 manifest_ctx.state.load(),
1948 RegionRoleState::Leader(RegionLeaderState::Writable)
1949 );
1950
1951 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1953 assert_eq!(
1954 manifest_ctx.state.load(),
1955 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1956 );
1957
1958 manifest_ctx.set_role(RegionRole::Follower, region_id);
1960 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1961
1962 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1964 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1965
1966 manifest_ctx.set_role(RegionRole::Leader, region_id);
1968 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1969 assert_eq!(
1970 manifest_ctx.state.load(),
1971 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1972 );
1973
1974 manifest_ctx.set_role(RegionRole::Leader, region_id);
1976 assert_eq!(
1977 manifest_ctx.state.load(),
1978 RegionRoleState::Leader(RegionLeaderState::Writable)
1979 );
1980 }
1981
1982 #[tokio::test]
1983 async fn test_staging_state_validation() {
1984 let env = SchedulerEnv::new().await;
1985 let builder = VersionControlBuilder::new();
1986 let version_control = Arc::new(builder.build());
1987
1988 let staging_ctx = {
1990 let manager = RegionManifestManager::new(
1991 version_control.current().version.metadata.clone(),
1992 0,
1993 RegionManifestOptions {
1994 manifest_dir: "".to_string(),
1995 object_store: env.access_layer.object_store().clone(),
1996 compress_type: CompressionType::Uncompressed,
1997 checkpoint_distance: 10,
1998 remove_file_options: Default::default(),
1999 manifest_cache: None,
2000 },
2001 FormatType::PrimaryKey,
2002 &Default::default(),
2003 )
2004 .await
2005 .unwrap();
2006 Arc::new(ManifestContext::new(
2007 manager,
2008 RegionRoleState::Leader(RegionLeaderState::Staging),
2009 ))
2010 };
2011
2012 assert_eq!(
2014 staging_ctx.current_state(),
2015 RegionRoleState::Leader(RegionLeaderState::Staging)
2016 );
2017
2018 let writable_ctx = env
2020 .mock_manifest_context(version_control.current().version.metadata.clone())
2021 .await;
2022
2023 assert_eq!(
2024 writable_ctx.current_state(),
2025 RegionRoleState::Leader(RegionLeaderState::Writable)
2026 );
2027 }
2028
2029 #[tokio::test]
2030 async fn test_staging_state_transitions() {
2031 let builder = VersionControlBuilder::new();
2032 let version_control = Arc::new(builder.build());
2033 let metadata = version_control.current().version.metadata.clone();
2034
2035 let temp_dir = create_temp_dir("");
2037 let path_str = temp_dir.path().display().to_string();
2038 let fs_builder = Fs::default().root(&path_str);
2039 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
2040
2041 let index_aux_path = temp_dir.path().join("index_aux");
2042 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
2043 .await
2044 .unwrap();
2045 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
2046 .await
2047 .unwrap();
2048
2049 let access_layer = Arc::new(AccessLayer::new(
2050 "",
2051 PathType::Bare,
2052 object_store,
2053 puffin_mgr,
2054 intm_mgr,
2055 ));
2056
2057 let manager = RegionManifestManager::new(
2058 metadata.clone(),
2059 0,
2060 RegionManifestOptions {
2061 manifest_dir: "".to_string(),
2062 object_store: access_layer.object_store().clone(),
2063 compress_type: CompressionType::Uncompressed,
2064 checkpoint_distance: 10,
2065 remove_file_options: Default::default(),
2066 manifest_cache: None,
2067 },
2068 FormatType::PrimaryKey,
2069 &Default::default(),
2070 )
2071 .await
2072 .unwrap();
2073
2074 let manifest_ctx = Arc::new(ManifestContext::new(
2075 manager,
2076 RegionRoleState::Leader(RegionLeaderState::Writable),
2077 ));
2078
2079 let region = MitoRegion {
2080 region_id: metadata.region_id,
2081 version_control,
2082 access_layer,
2083 manifest_ctx: manifest_ctx.clone(),
2084 file_purger: crate::test_util::new_noop_file_purger(),
2085 provider: Provider::noop_provider(),
2086 last_flush_millis: Default::default(),
2087 last_compaction_millis: Default::default(),
2088 time_provider: Arc::new(StdTimeProvider),
2089 topic_latest_entry_id: Default::default(),
2090 written_bytes: Arc::new(AtomicU64::new(0)),
2091 stats: ManifestStats::default(),
2092 };
2093
2094 assert_eq!(
2096 region.state(),
2097 RegionRoleState::Leader(RegionLeaderState::Writable)
2098 );
2099 assert!(!region.is_staging());
2100
2101 let mut manager = manifest_ctx.manifest_manager.write().await;
2103 region.set_staging(&mut manager).await.unwrap();
2104 drop(manager);
2105 assert_eq!(
2106 region.state(),
2107 RegionRoleState::Leader(RegionLeaderState::Staging)
2108 );
2109 assert!(region.is_staging());
2110
2111 region.exit_staging().unwrap();
2113 assert_eq!(
2114 region.state(),
2115 RegionRoleState::Leader(RegionLeaderState::Writable)
2116 );
2117 assert!(!region.is_staging());
2118
2119 {
2121 let manager = manifest_ctx.manifest_manager.write().await;
2123 let dummy_actions = RegionMetaActionList::new(vec![]);
2124 let dummy_bytes = dummy_actions.encode().unwrap();
2125
2126 manager.store().save(100, &dummy_bytes, true).await.unwrap();
2128 manager.store().save(101, &dummy_bytes, true).await.unwrap();
2129 drop(manager);
2130
2131 let manager = manifest_ctx.manifest_manager.read().await;
2133 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2134 assert_eq!(
2135 dirty_manifests.len(),
2136 2,
2137 "Should have 2 dirty staging files"
2138 );
2139 drop(manager);
2140
2141 let mut manager = manifest_ctx.manifest_manager.write().await;
2143 region.set_staging(&mut manager).await.unwrap();
2144 drop(manager);
2145
2146 let manager = manifest_ctx.manifest_manager.read().await;
2148 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2149 assert_eq!(
2150 cleaned_manifests.len(),
2151 0,
2152 "Dirty staging files should be cleaned up"
2153 );
2154 drop(manager);
2155
2156 region.exit_staging().unwrap();
2158 }
2159
2160 let mut manager = manifest_ctx.manifest_manager.write().await;
2162 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
2164 let mut manager = manifest_ctx.manifest_manager.write().await;
2165 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
2167 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
2170}