1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48 InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49 UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65#[derive(Debug)]
67pub struct RegionUsage {
68 pub region_id: RegionId,
69 pub wal_usage: u64,
70 pub sst_usage: u64,
71 pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75 pub fn disk_usage(&self) -> u64 {
76 self.wal_usage + self.sst_usage + self.manifest_usage
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82 Writable,
84 Staging,
86 EnteringStaging,
88 Altering,
90 Dropping,
92 Truncating,
94 Editing,
96 Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102 Leader(RegionLeaderState),
103 Follower,
104}
105
106impl RegionRoleState {
107 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109 match self {
110 RegionRoleState::Leader(leader_state) => Some(leader_state),
111 RegionRoleState::Follower => None,
112 }
113 }
114}
115
116#[derive(Debug)]
122pub struct MitoRegion {
123 pub(crate) region_id: RegionId,
128
129 pub(crate) version_control: VersionControlRef,
133 pub(crate) access_layer: AccessLayerRef,
135 pub(crate) manifest_ctx: ManifestContextRef,
137 pub(crate) file_purger: FilePurgerRef,
139 pub(crate) provider: Provider,
141 last_flush_millis: AtomicI64,
143 last_compaction_millis: AtomicI64,
145 time_provider: TimeProviderRef,
147 pub(crate) topic_latest_entry_id: AtomicU64,
157 pub(crate) written_bytes: Arc<AtomicU64>,
159 pub(crate) staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
164 stats: ManifestStats,
166}
167
168pub type MitoRegionRef = Arc<MitoRegion>;
169
170#[derive(Debug, Clone)]
171pub(crate) struct StagingPartitionInfo {
172 pub(crate) partition_directive: StagingPartitionDirective,
173 pub(crate) partition_rule_version: u64,
174}
175
176impl StagingPartitionInfo {
177 pub(crate) fn partition_expr(&self) -> Option<&str> {
179 self.partition_directive.partition_expr()
180 }
181
182 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
184 let partition_rule_version = match &partition_directive {
185 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
186 partition_expr_version(Some(expr))
187 }
188 StagingPartitionDirective::RejectAllWrites => 0,
189 };
190 Self {
191 partition_directive,
192 partition_rule_version,
193 }
194 }
195}
196
197impl MitoRegion {
198 pub(crate) async fn stop(&self) {
200 self.manifest_ctx
201 .manifest_manager
202 .write()
203 .await
204 .stop()
205 .await;
206
207 info!(
208 "Stopped region manifest manager, region_id: {}",
209 self.region_id
210 );
211 }
212
213 pub fn metadata(&self) -> RegionMetadataRef {
215 let version_data = self.version_control.current();
216 version_data.version.metadata.clone()
217 }
218
219 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
221 let version_data = self.version_control.current();
222 version_data.version.metadata.primary_key_encoding
223 }
224
225 pub(crate) fn version(&self) -> VersionRef {
227 let version_data = self.version_control.current();
228 version_data.version
229 }
230
231 pub(crate) fn last_flush_millis(&self) -> i64 {
233 self.last_flush_millis.load(Ordering::Relaxed)
234 }
235
236 pub(crate) fn update_flush_millis(&self) {
238 let now = self.time_provider.current_time_millis();
239 self.last_flush_millis.store(now, Ordering::Relaxed);
240 }
241
242 pub(crate) fn last_compaction_millis(&self) -> i64 {
244 self.last_compaction_millis.load(Ordering::Relaxed)
245 }
246
247 pub(crate) fn update_compaction_millis(&self) {
249 let now = self.time_provider.current_time_millis();
250 self.last_compaction_millis.store(now, Ordering::Relaxed);
251 }
252
253 pub(crate) fn table_dir(&self) -> &str {
255 self.access_layer.table_dir()
256 }
257
258 pub(crate) fn path_type(&self) -> PathType {
260 self.access_layer.path_type()
261 }
262
263 pub(crate) fn is_writable(&self) -> bool {
265 matches!(
266 self.manifest_ctx.state.load(),
267 RegionRoleState::Leader(RegionLeaderState::Writable)
268 | RegionRoleState::Leader(RegionLeaderState::Staging)
269 )
270 }
271
272 pub(crate) fn is_flushable(&self) -> bool {
274 matches!(
275 self.manifest_ctx.state.load(),
276 RegionRoleState::Leader(RegionLeaderState::Writable)
277 | RegionRoleState::Leader(RegionLeaderState::Staging)
278 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
279 )
280 }
281
282 pub(crate) fn should_abort_index(&self) -> bool {
284 matches!(
285 self.manifest_ctx.state.load(),
286 RegionRoleState::Follower
287 | RegionRoleState::Leader(RegionLeaderState::Dropping)
288 | RegionRoleState::Leader(RegionLeaderState::Truncating)
289 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
290 | RegionRoleState::Leader(RegionLeaderState::Staging)
291 )
292 }
293
294 pub(crate) fn is_downgrading(&self) -> bool {
296 matches!(
297 self.manifest_ctx.state.load(),
298 RegionRoleState::Leader(RegionLeaderState::Downgrading)
299 )
300 }
301
302 pub(crate) fn is_staging(&self) -> bool {
304 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
305 }
306
307 pub(crate) fn is_enter_staging(&self) -> bool {
309 self.manifest_ctx.state.load()
310 == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
311 }
312
313 pub fn region_id(&self) -> RegionId {
314 self.region_id
315 }
316
317 pub fn find_committed_sequence(&self) -> SequenceNumber {
318 self.version_control.committed_sequence()
319 }
320
321 pub fn is_follower(&self) -> bool {
323 self.manifest_ctx.state.load() == RegionRoleState::Follower
324 }
325
326 pub(crate) fn state(&self) -> RegionRoleState {
328 self.manifest_ctx.state.load()
329 }
330
331 pub(crate) fn set_role(&self, next_role: RegionRole) {
333 self.manifest_ctx.set_role(next_role, self.region_id);
334 }
335
336 pub(crate) fn set_altering(&self) -> Result<()> {
339 self.compare_exchange_state(
340 RegionLeaderState::Writable,
341 RegionRoleState::Leader(RegionLeaderState::Altering),
342 )
343 }
344
345 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
348 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
349 }
350
351 pub(crate) fn set_truncating(&self) -> Result<()> {
354 self.compare_exchange_state(
355 RegionLeaderState::Writable,
356 RegionRoleState::Leader(RegionLeaderState::Truncating),
357 )
358 }
359
360 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
363 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
364 }
365
366 pub(crate) async fn set_staging(
372 &self,
373 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
374 ) -> Result<()> {
375 manager.store().clear_staging_manifests().await?;
376
377 self.compare_exchange_state(
378 RegionLeaderState::Writable,
379 RegionRoleState::Leader(RegionLeaderState::Staging),
380 )
381 }
382
383 pub(crate) fn set_entering_staging(&self) -> Result<()> {
385 self.compare_exchange_state(
386 RegionLeaderState::Writable,
387 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
388 )
389 }
390
391 pub fn exit_staging(&self) -> Result<()> {
396 *self.staging_partition_info.lock().unwrap() = None;
397 self.compare_exchange_state(
398 RegionLeaderState::Staging,
399 RegionRoleState::Leader(RegionLeaderState::Writable),
400 )
401 }
402
403 pub(crate) async fn set_role_state_gracefully(
405 &self,
406 state: SettableRegionRoleState,
407 ) -> Result<()> {
408 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
409 self.manifest_ctx.manifest_manager.write().await;
410 let current_state = self.state();
411
412 match state {
413 SettableRegionRoleState::Leader => {
414 match current_state {
417 RegionRoleState::Leader(RegionLeaderState::Staging) => {
418 info!("Exiting staging mode for region {}", self.region_id);
419 self.exit_staging_on_success(&mut manager).await?;
421 }
422 RegionRoleState::Leader(RegionLeaderState::Writable) => {
423 info!("Region {} already in normal leader mode", self.region_id);
425 }
426 _ => {
427 return Err(RegionStateSnafu {
429 region_id: self.region_id,
430 state: current_state,
431 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
432 }
433 .build());
434 }
435 }
436 }
437
438 SettableRegionRoleState::StagingLeader => {
439 match current_state {
442 RegionRoleState::Leader(RegionLeaderState::Writable) => {
443 info!("Entering staging mode for region {}", self.region_id);
444 self.set_staging(&mut manager).await?;
445 }
446 RegionRoleState::Leader(RegionLeaderState::Staging) => {
447 info!("Region {} already in staging mode", self.region_id);
449 }
450 _ => {
451 return Err(RegionStateSnafu {
452 region_id: self.region_id,
453 state: current_state,
454 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
455 }
456 .build());
457 }
458 }
459 }
460
461 SettableRegionRoleState::Follower => {
462 match current_state {
464 RegionRoleState::Leader(RegionLeaderState::Staging) => {
465 info!(
466 "Exiting staging and demoting region {} to follower",
467 self.region_id
468 );
469 self.exit_staging()?;
470 self.set_role(RegionRole::Follower);
471 }
472 RegionRoleState::Leader(_) => {
473 info!("Demoting region {} from leader to follower", self.region_id);
474 self.set_role(RegionRole::Follower);
475 }
476 RegionRoleState::Follower => {
477 info!("Region {} already in follower mode", self.region_id);
479 }
480 }
481 }
482
483 SettableRegionRoleState::DowngradingLeader => {
484 match current_state {
486 RegionRoleState::Leader(RegionLeaderState::Staging) => {
487 info!(
488 "Exiting staging and entering downgrade for region {}",
489 self.region_id
490 );
491 self.exit_staging()?;
492 self.set_role(RegionRole::DowngradingLeader);
493 }
494 RegionRoleState::Leader(RegionLeaderState::Writable) => {
495 info!("Starting downgrade for region {}", self.region_id);
496 self.set_role(RegionRole::DowngradingLeader);
497 }
498 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
499 info!("Region {} already in downgrading mode", self.region_id);
501 }
502 _ => {
503 warn!(
504 "Cannot start downgrade for region {} from state {:?}",
505 self.region_id, current_state
506 );
507 }
508 }
509 }
510 }
511
512 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
514 let manifest_meta = &manager.manifest().metadata;
516 let current_version = self.version();
517 let current_meta = ¤t_version.metadata;
518 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
519 let action = RegionMetaAction::Change(RegionChange {
520 metadata: current_meta.clone(),
521 sst_format: current_version.options.sst_format.unwrap_or_default(),
522 append_mode: None,
523 });
524 let result = manager
525 .update(RegionMetaActionList::with_action(action), false)
526 .await;
527
528 match result {
529 Ok(version) => {
530 info!(
531 "Successfully persisted backfilled metadata for region {}, version: {}",
532 self.region_id, version
533 );
534 }
535 Err(e) => {
536 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
537 }
538 }
539 }
540 }
541
542 drop(manager);
543
544 Ok(())
545 }
546
547 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
550 if let Err(e) = self
551 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
552 {
553 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
554 }
555 }
556
557 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
560 if let Err(e) =
561 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
562 {
563 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
564 }
565 }
566
567 pub(crate) fn region_statistic(&self) -> RegionStatistic {
569 let version = self.version();
570 let memtables = &version.memtables;
571 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
572
573 let sst_usage = version.ssts.sst_usage();
574 let index_usage = version.ssts.index_usage();
575 let flushed_entry_id = version.flushed_entry_id;
576
577 let wal_usage = self.estimated_wal_usage(memtable_usage);
578 let manifest_usage = self.stats.total_manifest_size();
579 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
580 let num_files = version.ssts.num_files();
581 let manifest_version = self.stats.manifest_version();
582 let file_removed_cnt = self.stats.file_removed_cnt();
583
584 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
585 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
586
587 RegionStatistic {
588 num_rows,
589 memtable_size: memtable_usage,
590 wal_size: wal_usage,
591 manifest_size: manifest_usage,
592 sst_size: sst_usage,
593 sst_num: num_files,
594 index_size: index_usage,
595 manifest: RegionManifestInfo::Mito {
596 manifest_version,
597 flushed_entry_id,
598 file_removed_cnt,
599 },
600 data_topic_latest_entry_id: topic_latest_entry_id,
601 metadata_topic_latest_entry_id: topic_latest_entry_id,
602 written_bytes,
603 }
604 }
605
606 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
609 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
610 }
611
612 fn compare_exchange_state(
615 &self,
616 expect: RegionLeaderState,
617 state: RegionRoleState,
618 ) -> Result<()> {
619 self.manifest_ctx
620 .state
621 .compare_exchange(RegionRoleState::Leader(expect), state)
622 .map_err(|actual| {
623 RegionStateSnafu {
624 region_id: self.region_id,
625 state: actual,
626 expect: RegionRoleState::Leader(expect),
627 }
628 .build()
629 })?;
630 Ok(())
631 }
632
633 pub fn access_layer(&self) -> AccessLayerRef {
634 self.access_layer.clone()
635 }
636
637 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
639 let table_dir = self.table_dir();
640 let path_type = self.access_layer.path_type();
641
642 let visible_ssts = self
643 .version()
644 .ssts
645 .levels()
646 .iter()
647 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
648 .collect::<HashSet<_>>();
649
650 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
651 let staging_files = self
652 .manifest_ctx
653 .staging_manifest()
654 .await
655 .map(|m| m.files.clone())
656 .unwrap_or_default();
657 let files = manifest_files
658 .into_iter()
659 .chain(staging_files.into_iter())
660 .collect::<HashMap<_, _>>();
661
662 files
663 .values()
664 .map(|meta| {
665 let region_id = self.region_id;
666 let origin_region_id = meta.region_id;
667 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
668 {
669 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
670 (
671 meta.index_version,
672 Some(index_file_path),
673 Some(meta.index_file_size),
674 )
675 } else {
676 (0, None, None)
677 };
678 let visible = visible_ssts.contains(&meta.file_id);
679 ManifestSstEntry {
680 table_dir: table_dir.to_string(),
681 region_id,
682 table_id: region_id.table_id(),
683 region_number: region_id.region_number(),
684 region_group: region_id.region_group(),
685 region_sequence: region_id.region_sequence(),
686 file_id: meta.file_id.to_string(),
687 index_version,
688 level: meta.level,
689 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
690 file_size: meta.file_size,
691 index_file_path,
692 index_file_size,
693 num_rows: meta.num_rows,
694 num_row_groups: meta.num_row_groups,
695 num_series: Some(meta.num_series),
696 min_ts: meta.time_range.0,
697 max_ts: meta.time_range.1,
698 sequence: meta.sequence.map(|s| s.get()),
699 origin_region_id,
700 node_id: None,
701 visible,
702 }
703 })
704 .collect()
705 }
706
707 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
709 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
710
711 file_ids
712 .iter()
713 .map(|file_id| manifest_files.get(file_id).cloned())
714 .collect::<Vec<_>>()
715 }
716
717 pub(crate) async fn exit_staging_on_success(
719 &self,
720 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
721 ) -> Result<()> {
722 let current_state = self.manifest_ctx.current_state();
723 ensure!(
724 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
725 RegionStateSnafu {
726 region_id: self.region_id,
727 state: current_state,
728 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
729 }
730 );
731
732 let merged_actions = match manager.merge_staged_actions(current_state).await? {
734 Some(actions) => actions,
735 None => {
736 info!(
737 "No staged manifests to merge for region {}, exiting staging mode without changes",
738 self.region_id
739 );
740 self.exit_staging()?;
742 return Ok(());
743 }
744 };
745 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
746 let expect_partition_expr_change = merged_actions
747 .actions
748 .iter()
749 .any(|a| a.is_partition_expr_change());
750 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
751 ensure!(
752 !(expect_change && expect_partition_expr_change),
753 UnexpectedSnafu {
754 reason: "unexpected both change and partition expr change actions in merged actions"
755 }
756 );
757 ensure!(
758 expect_change || expect_partition_expr_change,
759 UnexpectedSnafu {
760 reason: "expect a change or partition expr change action in merged actions"
761 }
762 );
763 ensure!(
764 expect_edit,
765 UnexpectedSnafu {
766 reason: "expect an edit action in merged actions"
767 }
768 );
769
770 let (merged_partition_expr_change, merged_change, merged_edit) =
771 merged_actions.clone().split_region_change_and_edit();
772 if let Some(change) = &merged_change {
773 let current_column_metadatas = &self.version().metadata.column_metadatas;
777 ensure!(
778 change.metadata.column_metadatas == *current_column_metadatas,
779 UnexpectedSnafu {
780 reason: "change action alters column metadata in staging exit"
781 }
782 );
783 }
784
785 let new_version = manager.update(merged_actions, false).await?;
788 info!(
789 "Successfully submitted merged staged manifests for region {}, new version: {}",
790 self.region_id, new_version
791 );
792
793 if let Some(change) = merged_partition_expr_change {
795 let mut new_metadata = self.version().metadata.as_ref().clone();
796 new_metadata.set_partition_expr(change.partition_expr);
797 self.version_control.alter_metadata(new_metadata.into());
798 }
799 if let Some(change) = merged_change {
800 self.version_control.alter_metadata(change.metadata);
801 }
802 self.version_control
803 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
804
805 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
807 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
808 }
809 self.exit_staging()?;
810
811 Ok(())
812 }
813
814 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
820 let is_staging = self.is_staging();
821 if is_staging {
822 let staging_partition_info = self.staging_partition_info.lock().unwrap();
823 if staging_partition_info.is_none() {
824 warn!(
825 "Staging partition expr is none for region {} in staging state",
826 self.region_id
827 );
828 }
829 staging_partition_info
830 .as_ref()
831 .and_then(|info| info.partition_expr().map(ToString::to_string))
832 } else {
833 let version = self.version();
834 version.metadata.partition_expr.clone()
835 }
836 }
837
838 pub fn expected_partition_expr_version(&self) -> u64 {
839 if self.is_staging() {
840 let staging_partition_info = self.staging_partition_info.lock().unwrap();
841 staging_partition_info
842 .as_ref()
843 .map(|info| info.partition_rule_version)
844 .unwrap_or_default()
845 } else {
846 self.version().metadata.partition_expr_version
847 }
848 }
849
850 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
852 if !self.is_staging() {
853 return false;
854 }
855 let staging_partition_info = self.staging_partition_info.lock().unwrap();
856 staging_partition_info
857 .as_ref()
858 .map(|info| {
859 matches!(
860 info.partition_directive,
861 StagingPartitionDirective::RejectAllWrites
862 )
863 })
864 .unwrap_or(false)
865 }
866}
867
868#[derive(Debug)]
870pub(crate) struct ManifestContext {
871 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
873 state: AtomicCell<RegionRoleState>,
876}
877
878impl ManifestContext {
879 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
880 ManifestContext {
881 manifest_manager: tokio::sync::RwLock::new(manager),
882 state: AtomicCell::new(state),
883 }
884 }
885
886 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
887 self.manifest_manager
888 .read()
889 .await
890 .manifest()
891 .manifest_version
892 }
893
894 pub(crate) async fn has_update(&self) -> Result<bool> {
895 self.manifest_manager.read().await.has_update().await
896 }
897
898 pub(crate) fn current_state(&self) -> RegionRoleState {
900 self.state.load()
901 }
902
903 pub(crate) async fn install_manifest_to(
909 &self,
910 version: ManifestVersion,
911 ) -> Result<Arc<RegionManifest>> {
912 let mut manager = self.manifest_manager.write().await;
913 manager.install_manifest_to(version).await?;
914
915 Ok(manager.manifest())
916 }
917
918 pub(crate) async fn update_manifest(
920 &self,
921 expect_state: RegionLeaderState,
922 action_list: RegionMetaActionList,
923 is_staging: bool,
924 ) -> Result<ManifestVersion> {
925 let mut manager = self.manifest_manager.write().await;
927 let manifest = manager.manifest();
929 let current_state = self.state.load();
932
933 if expect_state != RegionLeaderState::Downgrading {
938 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
939 info!(
940 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
941 manifest.metadata.region_id, expect_state
942 );
943 }
944 ensure!(
945 current_state == RegionRoleState::Leader(expect_state)
946 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
947 UpdateManifestSnafu {
948 region_id: manifest.metadata.region_id,
949 state: current_state,
950 }
951 );
952 } else {
953 ensure!(
954 current_state == RegionRoleState::Leader(expect_state),
955 RegionStateSnafu {
956 region_id: manifest.metadata.region_id,
957 state: current_state,
958 expect: RegionRoleState::Leader(expect_state),
959 }
960 );
961 }
962
963 for action in &action_list.actions {
964 let RegionMetaAction::Edit(edit) = &action else {
966 continue;
967 };
968
969 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
971 continue;
972 };
973
974 if let Some(flushed_entry_id) = edit.flushed_entry_id {
976 ensure!(
977 truncated_entry_id < flushed_entry_id,
978 RegionTruncatedSnafu {
979 region_id: manifest.metadata.region_id,
980 }
981 );
982 }
983
984 if !edit.files_to_remove.is_empty() {
986 for file in &edit.files_to_remove {
988 ensure!(
989 manifest.files.contains_key(&file.file_id),
990 RegionTruncatedSnafu {
991 region_id: manifest.metadata.region_id,
992 }
993 );
994 }
995 }
996 }
997
998 let version = manager.update(action_list, is_staging).await.inspect_err(
1000 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1001 )?;
1002
1003 if self.state.load() == RegionRoleState::Follower {
1004 warn!(
1005 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1006 manifest.metadata.region_id
1007 );
1008 }
1009
1010 Ok(version)
1011 }
1012
1013 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1035 match next_role {
1036 RegionRole::Follower => {
1037 match self.state.fetch_update(|state| {
1038 if !matches!(state, RegionRoleState::Follower) {
1039 Some(RegionRoleState::Follower)
1040 } else {
1041 None
1042 }
1043 }) {
1044 Ok(state) => info!(
1045 "Convert region {} to follower, previous role state: {:?}",
1046 region_id, state
1047 ),
1048 Err(state) => {
1049 if state != RegionRoleState::Follower {
1050 warn!(
1051 "Failed to convert region {} to follower, current role state: {:?}",
1052 region_id, state
1053 )
1054 }
1055 }
1056 }
1057 }
1058 RegionRole::Leader => {
1059 match self.state.fetch_update(|state| {
1060 if matches!(
1061 state,
1062 RegionRoleState::Follower
1063 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1064 ) {
1065 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1066 } else {
1067 None
1068 }
1069 }) {
1070 Ok(state) => info!(
1071 "Convert region {} to leader, previous role state: {:?}",
1072 region_id, state
1073 ),
1074 Err(state) => {
1075 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1076 warn!(
1077 "Failed to convert region {} to leader, current role state: {:?}",
1078 region_id, state
1079 )
1080 }
1081 }
1082 }
1083 }
1084 RegionRole::DowngradingLeader => {
1085 match self.state.compare_exchange(
1086 RegionRoleState::Leader(RegionLeaderState::Writable),
1087 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1088 ) {
1089 Ok(state) => info!(
1090 "Convert region {} to downgrading region, previous role state: {:?}",
1091 region_id, state
1092 ),
1093 Err(state) => {
1094 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1095 warn!(
1096 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1097 region_id, state
1098 )
1099 }
1100 }
1101 }
1102 }
1103 }
1104 }
1105
1106 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1108 self.manifest_manager.read().await.manifest()
1109 }
1110
1111 pub(crate) async fn staging_manifest(
1113 &self,
1114 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1115 self.manifest_manager.read().await.staging_manifest()
1116 }
1117}
1118
1119pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1120
1121#[derive(Debug, Default)]
1123pub(crate) struct RegionMap {
1124 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1125}
1126
1127impl RegionMap {
1128 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1130 let regions = self.regions.read().unwrap();
1131 regions.contains_key(®ion_id)
1132 }
1133
1134 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1136 let mut regions = self.regions.write().unwrap();
1137 regions.insert(region.region_id, region);
1138 }
1139
1140 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1142 let regions = self.regions.read().unwrap();
1143 regions.get(®ion_id).cloned()
1144 }
1145
1146 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1150 let region = self
1151 .get_region(region_id)
1152 .context(RegionNotFoundSnafu { region_id })?;
1153 ensure!(
1154 region.is_writable(),
1155 RegionStateSnafu {
1156 region_id,
1157 state: region.state(),
1158 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1159 }
1160 );
1161 Ok(region)
1162 }
1163
1164 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1168 let region = self
1169 .get_region(region_id)
1170 .context(RegionNotFoundSnafu { region_id })?;
1171 ensure!(
1172 region.is_follower(),
1173 RegionStateSnafu {
1174 region_id,
1175 state: region.state(),
1176 expect: RegionRoleState::Follower,
1177 }
1178 );
1179
1180 Ok(region)
1181 }
1182
1183 pub(crate) fn get_region_or<F: OnFailure>(
1187 &self,
1188 region_id: RegionId,
1189 cb: &mut F,
1190 ) -> Option<MitoRegionRef> {
1191 match self
1192 .get_region(region_id)
1193 .context(RegionNotFoundSnafu { region_id })
1194 {
1195 Ok(region) => Some(region),
1196 Err(e) => {
1197 cb.on_failure(e);
1198 None
1199 }
1200 }
1201 }
1202
1203 pub(crate) fn writable_region_or<F: OnFailure>(
1207 &self,
1208 region_id: RegionId,
1209 cb: &mut F,
1210 ) -> Option<MitoRegionRef> {
1211 match self.writable_region(region_id) {
1212 Ok(region) => Some(region),
1213 Err(e) => {
1214 cb.on_failure(e);
1215 None
1216 }
1217 }
1218 }
1219
1220 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1224 let region = self.writable_region(region_id)?;
1225 if region.is_staging() {
1226 return Err(crate::error::RegionStateSnafu {
1227 region_id,
1228 state: region.state(),
1229 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1230 }
1231 .build());
1232 }
1233 Ok(region)
1234 }
1235
1236 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1240 let region = self
1241 .get_region(region_id)
1242 .context(RegionNotFoundSnafu { region_id })?;
1243 ensure!(
1244 region.is_staging(),
1245 RegionStateSnafu {
1246 region_id,
1247 state: region.state(),
1248 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1249 }
1250 );
1251 Ok(region)
1252 }
1253
1254 fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1259 let region = self
1260 .get_region(region_id)
1261 .context(RegionNotFoundSnafu { region_id })?;
1262 if region.is_flushable() {
1263 Ok(Some(region))
1264 } else {
1265 Ok(None)
1266 }
1267 }
1268
1269 pub(crate) fn flushable_region_or<F: OnFailure>(
1274 &self,
1275 region_id: RegionId,
1276 cb: &mut F,
1277 ) -> Option<MitoRegionRef> {
1278 match self.flushable_region(region_id) {
1279 Ok(region) => region,
1280 Err(e) => {
1281 cb.on_failure(e);
1282 None
1283 }
1284 }
1285 }
1286
1287 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1289 let mut regions = self.regions.write().unwrap();
1290 regions.remove(®ion_id)
1291 }
1292
1293 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1295 let regions = self.regions.read().unwrap();
1296 regions.values().cloned().collect()
1297 }
1298
1299 pub(crate) fn clear(&self) {
1301 self.regions.write().unwrap().clear();
1302 }
1303}
1304
1305pub(crate) type RegionMapRef = Arc<RegionMap>;
1306
1307#[derive(Debug, Default)]
1309pub(crate) struct OpeningRegions {
1310 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1311}
1312
1313impl OpeningRegions {
1314 pub(crate) fn wait_for_opening_region(
1316 &self,
1317 region_id: RegionId,
1318 sender: OptionOutputTx,
1319 ) -> Option<OptionOutputTx> {
1320 let mut regions = self.regions.write().unwrap();
1321 match regions.entry(region_id) {
1322 Entry::Occupied(mut senders) => {
1323 senders.get_mut().push(sender);
1324 None
1325 }
1326 Entry::Vacant(_) => Some(sender),
1327 }
1328 }
1329
1330 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1332 let regions = self.regions.read().unwrap();
1333 regions.contains_key(®ion_id)
1334 }
1335
1336 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1338 let mut regions = self.regions.write().unwrap();
1339 regions.insert(region, vec![sender]);
1340 }
1341
1342 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1344 let mut regions = self.regions.write().unwrap();
1345 regions.remove(®ion_id).unwrap_or_default()
1346 }
1347
1348 #[cfg(test)]
1349 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1350 let regions = self.regions.read().unwrap();
1351 if let Some(senders) = regions.get(®ion_id) {
1352 senders.len()
1353 } else {
1354 0
1355 }
1356 }
1357}
1358
1359pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1360
1361#[derive(Debug, Default)]
1363pub(crate) struct CatchupRegions {
1364 regions: RwLock<HashSet<RegionId>>,
1365}
1366
1367impl CatchupRegions {
1368 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1370 let regions = self.regions.read().unwrap();
1371 regions.contains(®ion_id)
1372 }
1373
1374 pub(crate) fn insert_region(&self, region_id: RegionId) {
1376 let mut regions = self.regions.write().unwrap();
1377 regions.insert(region_id);
1378 }
1379
1380 pub(crate) fn remove_region(&self, region_id: RegionId) {
1382 let mut regions = self.regions.write().unwrap();
1383 regions.remove(®ion_id);
1384 }
1385}
1386
1387pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1388
1389#[derive(Default, Debug, Clone)]
1391pub struct ManifestStats {
1392 pub(crate) total_manifest_size: Arc<AtomicU64>,
1393 pub(crate) manifest_version: Arc<AtomicU64>,
1394 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1395}
1396
1397impl ManifestStats {
1398 fn total_manifest_size(&self) -> u64 {
1399 self.total_manifest_size.load(Ordering::Relaxed)
1400 }
1401
1402 fn manifest_version(&self) -> u64 {
1403 self.manifest_version.load(Ordering::Relaxed)
1404 }
1405
1406 fn file_removed_cnt(&self) -> u64 {
1407 self.file_removed_cnt.load(Ordering::Relaxed)
1408 }
1409}
1410
1411pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1413 match partition_expr_str {
1414 None => Ok(None),
1415 Some("") => Ok(None),
1416 Some(json_str) => {
1417 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1418 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1419 Ok(expr)
1420 }
1421 }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426 use std::sync::atomic::AtomicU64;
1427 use std::sync::{Arc, Mutex};
1428
1429 use common_datasource::compression::CompressionType;
1430 use common_test_util::temp_dir::create_temp_dir;
1431 use crossbeam_utils::atomic::AtomicCell;
1432 use object_store::ObjectStore;
1433 use object_store::services::Fs;
1434 use store_api::logstore::provider::Provider;
1435 use store_api::region_engine::RegionRole;
1436 use store_api::region_request::PathType;
1437 use store_api::storage::RegionId;
1438
1439 use crate::access_layer::AccessLayer;
1440 use crate::error::Error;
1441 use crate::manifest::action::{
1442 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1443 };
1444 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1445 use crate::region::{
1446 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1447 };
1448 use crate::sst::FormatType;
1449 use crate::sst::index::intermediate::IntermediateManager;
1450 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1451 use crate::test_util::scheduler_util::SchedulerEnv;
1452 use crate::test_util::version_util::VersionControlBuilder;
1453 use crate::time_provider::StdTimeProvider;
1454
1455 #[test]
1456 fn test_region_state_lock_free() {
1457 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1458 }
1459
1460 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1461 let builder = VersionControlBuilder::new();
1462 let version_control = Arc::new(builder.build());
1463 let metadata = version_control.current().version.metadata.clone();
1464
1465 let manager = RegionManifestManager::new(
1466 metadata.clone(),
1467 0,
1468 RegionManifestOptions {
1469 manifest_dir: "".to_string(),
1470 object_store: env.access_layer.object_store().clone(),
1471 compress_type: CompressionType::Uncompressed,
1472 checkpoint_distance: 10,
1473 remove_file_options: Default::default(),
1474 manifest_cache: None,
1475 },
1476 FormatType::PrimaryKey,
1477 &Default::default(),
1478 )
1479 .await
1480 .unwrap();
1481
1482 let manifest_ctx = Arc::new(ManifestContext::new(
1483 manager,
1484 RegionRoleState::Leader(RegionLeaderState::Writable),
1485 ));
1486
1487 MitoRegion {
1488 region_id: metadata.region_id,
1489 version_control,
1490 access_layer: env.access_layer.clone(),
1491 manifest_ctx,
1492 file_purger: crate::test_util::new_noop_file_purger(),
1493 provider: Provider::noop_provider(),
1494 last_flush_millis: Default::default(),
1495 last_compaction_millis: Default::default(),
1496 time_provider: Arc::new(StdTimeProvider),
1497 topic_latest_entry_id: Default::default(),
1498 written_bytes: Arc::new(AtomicU64::new(0)),
1499 stats: ManifestStats::default(),
1500 staging_partition_info: Mutex::new(None),
1501 }
1502 }
1503
1504 fn empty_edit() -> RegionEdit {
1505 RegionEdit {
1506 files_to_add: Vec::new(),
1507 files_to_remove: Vec::new(),
1508 timestamp_ms: None,
1509 compaction_time_window: None,
1510 flushed_entry_id: None,
1511 flushed_sequence: None,
1512 committed_sequence: None,
1513 }
1514 }
1515
1516 #[tokio::test]
1517 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1518 let env = SchedulerEnv::new().await;
1519 let region = build_test_region(&env).await;
1520
1521 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1522 region.set_staging(&mut manager).await.unwrap();
1523 manager
1524 .update(
1525 RegionMetaActionList::new(vec![
1526 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1527 partition_expr: Some("expr_a".to_string()),
1528 }),
1529 RegionMetaAction::Edit(empty_edit()),
1530 ]),
1531 true,
1532 )
1533 .await
1534 .unwrap();
1535
1536 region.exit_staging_on_success(&mut manager).await.unwrap();
1537 drop(manager);
1538
1539 assert_eq!(
1540 region.version().metadata.partition_expr.as_deref(),
1541 Some("expr_a")
1542 );
1543 assert_eq!(
1544 region.state(),
1545 RegionRoleState::Leader(RegionLeaderState::Writable)
1546 );
1547 }
1548
1549 #[tokio::test]
1550 async fn test_exit_staging_change_with_same_columns_success() {
1551 let env = SchedulerEnv::new().await;
1552 let region = build_test_region(&env).await;
1553
1554 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1555 region.set_staging(&mut manager).await.unwrap();
1556
1557 let mut changed_metadata = region.version().metadata.as_ref().clone();
1558 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1559
1560 manager
1561 .update(
1562 RegionMetaActionList::new(vec![
1563 RegionMetaAction::Change(RegionChange {
1564 metadata: Arc::new(changed_metadata),
1565 sst_format: FormatType::PrimaryKey,
1566 append_mode: None,
1567 }),
1568 RegionMetaAction::Edit(empty_edit()),
1569 ]),
1570 true,
1571 )
1572 .await
1573 .unwrap();
1574
1575 region.exit_staging_on_success(&mut manager).await.unwrap();
1576 drop(manager);
1577
1578 assert_eq!(
1579 region.version().metadata.partition_expr.as_deref(),
1580 Some("expr_b")
1581 );
1582 assert_eq!(
1583 region.state(),
1584 RegionRoleState::Leader(RegionLeaderState::Writable)
1585 );
1586 }
1587
1588 #[tokio::test]
1589 async fn test_exit_staging_change_with_different_columns_fails() {
1590 let env = SchedulerEnv::new().await;
1591 let region = build_test_region(&env).await;
1592
1593 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1594 region.set_staging(&mut manager).await.unwrap();
1595
1596 let mut changed_metadata = region.version().metadata.as_ref().clone();
1597 changed_metadata.column_metadatas.rotate_left(1);
1598
1599 manager
1600 .update(
1601 RegionMetaActionList::new(vec![
1602 RegionMetaAction::Change(RegionChange {
1603 metadata: Arc::new(changed_metadata),
1604 sst_format: FormatType::PrimaryKey,
1605 append_mode: None,
1606 }),
1607 RegionMetaAction::Edit(empty_edit()),
1608 ]),
1609 true,
1610 )
1611 .await
1612 .unwrap();
1613
1614 let result = region.exit_staging_on_success(&mut manager).await;
1615 assert!(matches!(result, Err(Error::Unexpected { .. })));
1616 }
1617
1618 #[tokio::test]
1619 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1620 let env = SchedulerEnv::new().await;
1621 let region = build_test_region(&env).await;
1622
1623 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1624 region.set_staging(&mut manager).await.unwrap();
1625
1626 let mut changed_metadata = region.version().metadata.as_ref().clone();
1627 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1628
1629 manager
1630 .update(
1631 RegionMetaActionList::new(vec![
1632 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1633 partition_expr: Some("expr_c".to_string()),
1634 }),
1635 RegionMetaAction::Change(RegionChange {
1636 metadata: Arc::new(changed_metadata),
1637 sst_format: FormatType::PrimaryKey,
1638 append_mode: None,
1639 }),
1640 RegionMetaAction::Edit(empty_edit()),
1641 ]),
1642 true,
1643 )
1644 .await
1645 .unwrap();
1646
1647 let result = region.exit_staging_on_success(&mut manager).await;
1648 assert!(matches!(result, Err(Error::Unexpected { .. })));
1649 }
1650
1651 #[tokio::test]
1652 async fn test_set_region_state() {
1653 let env = SchedulerEnv::new().await;
1654 let builder = VersionControlBuilder::new();
1655 let version_control = Arc::new(builder.build());
1656 let manifest_ctx = env
1657 .mock_manifest_context(version_control.current().version.metadata.clone())
1658 .await;
1659
1660 let region_id = RegionId::new(1024, 0);
1661 manifest_ctx.set_role(RegionRole::Follower, region_id);
1663 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1664
1665 manifest_ctx.set_role(RegionRole::Leader, region_id);
1667 assert_eq!(
1668 manifest_ctx.state.load(),
1669 RegionRoleState::Leader(RegionLeaderState::Writable)
1670 );
1671
1672 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1674 assert_eq!(
1675 manifest_ctx.state.load(),
1676 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1677 );
1678
1679 manifest_ctx.set_role(RegionRole::Follower, region_id);
1681 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1682
1683 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1685 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1686
1687 manifest_ctx.set_role(RegionRole::Leader, region_id);
1689 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1690 assert_eq!(
1691 manifest_ctx.state.load(),
1692 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1693 );
1694
1695 manifest_ctx.set_role(RegionRole::Leader, region_id);
1697 assert_eq!(
1698 manifest_ctx.state.load(),
1699 RegionRoleState::Leader(RegionLeaderState::Writable)
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_staging_state_validation() {
1705 let env = SchedulerEnv::new().await;
1706 let builder = VersionControlBuilder::new();
1707 let version_control = Arc::new(builder.build());
1708
1709 let staging_ctx = {
1711 let manager = RegionManifestManager::new(
1712 version_control.current().version.metadata.clone(),
1713 0,
1714 RegionManifestOptions {
1715 manifest_dir: "".to_string(),
1716 object_store: env.access_layer.object_store().clone(),
1717 compress_type: CompressionType::Uncompressed,
1718 checkpoint_distance: 10,
1719 remove_file_options: Default::default(),
1720 manifest_cache: None,
1721 },
1722 FormatType::PrimaryKey,
1723 &Default::default(),
1724 )
1725 .await
1726 .unwrap();
1727 Arc::new(ManifestContext::new(
1728 manager,
1729 RegionRoleState::Leader(RegionLeaderState::Staging),
1730 ))
1731 };
1732
1733 assert_eq!(
1735 staging_ctx.current_state(),
1736 RegionRoleState::Leader(RegionLeaderState::Staging)
1737 );
1738
1739 let writable_ctx = env
1741 .mock_manifest_context(version_control.current().version.metadata.clone())
1742 .await;
1743
1744 assert_eq!(
1745 writable_ctx.current_state(),
1746 RegionRoleState::Leader(RegionLeaderState::Writable)
1747 );
1748 }
1749
1750 #[tokio::test]
1751 async fn test_staging_state_transitions() {
1752 let builder = VersionControlBuilder::new();
1753 let version_control = Arc::new(builder.build());
1754 let metadata = version_control.current().version.metadata.clone();
1755
1756 let temp_dir = create_temp_dir("");
1758 let path_str = temp_dir.path().display().to_string();
1759 let fs_builder = Fs::default().root(&path_str);
1760 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1761
1762 let index_aux_path = temp_dir.path().join("index_aux");
1763 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1764 .await
1765 .unwrap();
1766 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1767 .await
1768 .unwrap();
1769
1770 let access_layer = Arc::new(AccessLayer::new(
1771 "",
1772 PathType::Bare,
1773 object_store,
1774 puffin_mgr,
1775 intm_mgr,
1776 ));
1777
1778 let manager = RegionManifestManager::new(
1779 metadata.clone(),
1780 0,
1781 RegionManifestOptions {
1782 manifest_dir: "".to_string(),
1783 object_store: access_layer.object_store().clone(),
1784 compress_type: CompressionType::Uncompressed,
1785 checkpoint_distance: 10,
1786 remove_file_options: Default::default(),
1787 manifest_cache: None,
1788 },
1789 FormatType::PrimaryKey,
1790 &Default::default(),
1791 )
1792 .await
1793 .unwrap();
1794
1795 let manifest_ctx = Arc::new(ManifestContext::new(
1796 manager,
1797 RegionRoleState::Leader(RegionLeaderState::Writable),
1798 ));
1799
1800 let region = MitoRegion {
1801 region_id: metadata.region_id,
1802 version_control,
1803 access_layer,
1804 manifest_ctx: manifest_ctx.clone(),
1805 file_purger: crate::test_util::new_noop_file_purger(),
1806 provider: Provider::noop_provider(),
1807 last_flush_millis: Default::default(),
1808 last_compaction_millis: Default::default(),
1809 time_provider: Arc::new(StdTimeProvider),
1810 topic_latest_entry_id: Default::default(),
1811 written_bytes: Arc::new(AtomicU64::new(0)),
1812 stats: ManifestStats::default(),
1813 staging_partition_info: Mutex::new(None),
1814 };
1815
1816 assert_eq!(
1818 region.state(),
1819 RegionRoleState::Leader(RegionLeaderState::Writable)
1820 );
1821 assert!(!region.is_staging());
1822
1823 let mut manager = manifest_ctx.manifest_manager.write().await;
1825 region.set_staging(&mut manager).await.unwrap();
1826 drop(manager);
1827 assert_eq!(
1828 region.state(),
1829 RegionRoleState::Leader(RegionLeaderState::Staging)
1830 );
1831 assert!(region.is_staging());
1832
1833 region.exit_staging().unwrap();
1835 assert_eq!(
1836 region.state(),
1837 RegionRoleState::Leader(RegionLeaderState::Writable)
1838 );
1839 assert!(!region.is_staging());
1840
1841 {
1843 let manager = manifest_ctx.manifest_manager.write().await;
1845 let dummy_actions = RegionMetaActionList::new(vec![]);
1846 let dummy_bytes = dummy_actions.encode().unwrap();
1847
1848 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1850 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1851 drop(manager);
1852
1853 let manager = manifest_ctx.manifest_manager.read().await;
1855 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1856 assert_eq!(
1857 dirty_manifests.len(),
1858 2,
1859 "Should have 2 dirty staging files"
1860 );
1861 drop(manager);
1862
1863 let mut manager = manifest_ctx.manifest_manager.write().await;
1865 region.set_staging(&mut manager).await.unwrap();
1866 drop(manager);
1867
1868 let manager = manifest_ctx.manifest_manager.read().await;
1870 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1871 assert_eq!(
1872 cleaned_manifests.len(),
1873 0,
1874 "Dirty staging files should be cleaned up"
1875 );
1876 drop(manager);
1877
1878 region.exit_staging().unwrap();
1880 }
1881
1882 let mut manager = manifest_ctx.manifest_manager.write().await;
1884 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1886 let mut manager = manifest_ctx.manifest_manager.write().await;
1887 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1889 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1892}