1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48 InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49 UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65#[derive(Debug)]
67pub struct RegionUsage {
68 pub region_id: RegionId,
69 pub wal_usage: u64,
70 pub sst_usage: u64,
71 pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75 pub fn disk_usage(&self) -> u64 {
76 self.wal_usage + self.sst_usage + self.manifest_usage
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82 Writable,
84 Staging,
86 EnteringStaging,
88 Altering,
90 Dropping,
92 Truncating,
94 Editing,
96 Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102 Leader(RegionLeaderState),
103 Follower,
104}
105
106impl RegionRoleState {
107 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109 match self {
110 RegionRoleState::Leader(leader_state) => Some(leader_state),
111 RegionRoleState::Follower => None,
112 }
113 }
114}
115
116#[derive(Debug)]
122pub struct MitoRegion {
123 pub(crate) region_id: RegionId,
128
129 pub(crate) version_control: VersionControlRef,
133 pub(crate) access_layer: AccessLayerRef,
135 pub(crate) manifest_ctx: ManifestContextRef,
137 pub(crate) file_purger: FilePurgerRef,
139 pub(crate) provider: Provider,
141 last_flush_millis: AtomicI64,
143 last_compaction_millis: AtomicI64,
145 time_provider: TimeProviderRef,
147 pub(crate) topic_latest_entry_id: AtomicU64,
157 pub(crate) written_bytes: Arc<AtomicU64>,
159 pub(crate) staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
164 stats: ManifestStats,
166}
167
168pub type MitoRegionRef = Arc<MitoRegion>;
169
170#[derive(Debug, Clone)]
171pub(crate) struct StagingPartitionInfo {
172 pub(crate) partition_directive: StagingPartitionDirective,
173 pub(crate) partition_rule_version: u64,
174}
175
176impl StagingPartitionInfo {
177 pub(crate) fn partition_expr(&self) -> Option<&str> {
179 self.partition_directive.partition_expr()
180 }
181
182 pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
184 let partition_rule_version = match &partition_directive {
185 StagingPartitionDirective::UpdatePartitionExpr(expr) => {
186 partition_expr_version(Some(expr))
187 }
188 StagingPartitionDirective::RejectAllWrites => 0,
189 };
190 Self {
191 partition_directive,
192 partition_rule_version,
193 }
194 }
195}
196
197impl MitoRegion {
198 pub(crate) async fn stop(&self) {
200 self.manifest_ctx
201 .manifest_manager
202 .write()
203 .await
204 .stop()
205 .await;
206
207 info!(
208 "Stopped region manifest manager, region_id: {}",
209 self.region_id
210 );
211 }
212
213 pub fn metadata(&self) -> RegionMetadataRef {
215 let version_data = self.version_control.current();
216 version_data.version.metadata.clone()
217 }
218
219 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
221 let version_data = self.version_control.current();
222 version_data.version.metadata.primary_key_encoding
223 }
224
225 pub(crate) fn version(&self) -> VersionRef {
227 let version_data = self.version_control.current();
228 version_data.version
229 }
230
231 pub(crate) fn last_flush_millis(&self) -> i64 {
233 self.last_flush_millis.load(Ordering::Relaxed)
234 }
235
236 pub(crate) fn update_flush_millis(&self) {
238 let now = self.time_provider.current_time_millis();
239 self.last_flush_millis.store(now, Ordering::Relaxed);
240 }
241
242 pub(crate) fn last_compaction_millis(&self) -> i64 {
244 self.last_compaction_millis.load(Ordering::Relaxed)
245 }
246
247 pub(crate) fn update_compaction_millis(&self) {
249 let now = self.time_provider.current_time_millis();
250 self.last_compaction_millis.store(now, Ordering::Relaxed);
251 }
252
253 pub(crate) fn table_dir(&self) -> &str {
255 self.access_layer.table_dir()
256 }
257
258 pub(crate) fn path_type(&self) -> PathType {
260 self.access_layer.path_type()
261 }
262
263 pub(crate) fn is_writable(&self) -> bool {
265 matches!(
266 self.manifest_ctx.state.load(),
267 RegionRoleState::Leader(RegionLeaderState::Writable)
268 | RegionRoleState::Leader(RegionLeaderState::Staging)
269 )
270 }
271
272 pub(crate) fn is_flushable(&self) -> bool {
274 matches!(
275 self.manifest_ctx.state.load(),
276 RegionRoleState::Leader(RegionLeaderState::Writable)
277 | RegionRoleState::Leader(RegionLeaderState::Staging)
278 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
279 )
280 }
281
282 pub(crate) fn should_abort_index(&self) -> bool {
284 matches!(
285 self.manifest_ctx.state.load(),
286 RegionRoleState::Follower
287 | RegionRoleState::Leader(RegionLeaderState::Dropping)
288 | RegionRoleState::Leader(RegionLeaderState::Truncating)
289 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
290 | RegionRoleState::Leader(RegionLeaderState::Staging)
291 )
292 }
293
294 pub(crate) fn is_downgrading(&self) -> bool {
296 matches!(
297 self.manifest_ctx.state.load(),
298 RegionRoleState::Leader(RegionLeaderState::Downgrading)
299 )
300 }
301
302 #[allow(dead_code)]
304 pub(crate) fn is_staging(&self) -> bool {
305 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
306 }
307
308 pub fn region_id(&self) -> RegionId {
309 self.region_id
310 }
311
312 pub fn find_committed_sequence(&self) -> SequenceNumber {
313 self.version_control.committed_sequence()
314 }
315
316 pub fn is_follower(&self) -> bool {
318 self.manifest_ctx.state.load() == RegionRoleState::Follower
319 }
320
321 pub(crate) fn state(&self) -> RegionRoleState {
323 self.manifest_ctx.state.load()
324 }
325
326 pub(crate) fn set_role(&self, next_role: RegionRole) {
328 self.manifest_ctx.set_role(next_role, self.region_id);
329 }
330
331 pub(crate) fn set_altering(&self) -> Result<()> {
334 self.compare_exchange_state(
335 RegionLeaderState::Writable,
336 RegionRoleState::Leader(RegionLeaderState::Altering),
337 )
338 }
339
340 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
343 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
344 }
345
346 pub(crate) fn set_truncating(&self) -> Result<()> {
349 self.compare_exchange_state(
350 RegionLeaderState::Writable,
351 RegionRoleState::Leader(RegionLeaderState::Truncating),
352 )
353 }
354
355 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
358 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
359 }
360
361 pub(crate) async fn set_staging(
367 &self,
368 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
369 ) -> Result<()> {
370 manager.store().clear_staging_manifests().await?;
371
372 self.compare_exchange_state(
373 RegionLeaderState::Writable,
374 RegionRoleState::Leader(RegionLeaderState::Staging),
375 )
376 }
377
378 pub(crate) fn set_entering_staging(&self) -> Result<()> {
380 self.compare_exchange_state(
381 RegionLeaderState::Writable,
382 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
383 )
384 }
385
386 pub fn exit_staging(&self) -> Result<()> {
391 *self.staging_partition_info.lock().unwrap() = None;
392 self.compare_exchange_state(
393 RegionLeaderState::Staging,
394 RegionRoleState::Leader(RegionLeaderState::Writable),
395 )
396 }
397
398 pub(crate) async fn set_role_state_gracefully(
400 &self,
401 state: SettableRegionRoleState,
402 ) -> Result<()> {
403 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
404 self.manifest_ctx.manifest_manager.write().await;
405 let current_state = self.state();
406
407 match state {
408 SettableRegionRoleState::Leader => {
409 match current_state {
412 RegionRoleState::Leader(RegionLeaderState::Staging) => {
413 info!("Exiting staging mode for region {}", self.region_id);
414 self.exit_staging_on_success(&mut manager).await?;
416 }
417 RegionRoleState::Leader(RegionLeaderState::Writable) => {
418 info!("Region {} already in normal leader mode", self.region_id);
420 }
421 _ => {
422 return Err(RegionStateSnafu {
424 region_id: self.region_id,
425 state: current_state,
426 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
427 }
428 .build());
429 }
430 }
431 }
432
433 SettableRegionRoleState::StagingLeader => {
434 match current_state {
437 RegionRoleState::Leader(RegionLeaderState::Writable) => {
438 info!("Entering staging mode for region {}", self.region_id);
439 self.set_staging(&mut manager).await?;
440 }
441 RegionRoleState::Leader(RegionLeaderState::Staging) => {
442 info!("Region {} already in staging mode", self.region_id);
444 }
445 _ => {
446 return Err(RegionStateSnafu {
447 region_id: self.region_id,
448 state: current_state,
449 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
450 }
451 .build());
452 }
453 }
454 }
455
456 SettableRegionRoleState::Follower => {
457 match current_state {
459 RegionRoleState::Leader(RegionLeaderState::Staging) => {
460 info!(
461 "Exiting staging and demoting region {} to follower",
462 self.region_id
463 );
464 self.exit_staging()?;
465 self.set_role(RegionRole::Follower);
466 }
467 RegionRoleState::Leader(_) => {
468 info!("Demoting region {} from leader to follower", self.region_id);
469 self.set_role(RegionRole::Follower);
470 }
471 RegionRoleState::Follower => {
472 info!("Region {} already in follower mode", self.region_id);
474 }
475 }
476 }
477
478 SettableRegionRoleState::DowngradingLeader => {
479 match current_state {
481 RegionRoleState::Leader(RegionLeaderState::Staging) => {
482 info!(
483 "Exiting staging and entering downgrade for region {}",
484 self.region_id
485 );
486 self.exit_staging()?;
487 self.set_role(RegionRole::DowngradingLeader);
488 }
489 RegionRoleState::Leader(RegionLeaderState::Writable) => {
490 info!("Starting downgrade for region {}", self.region_id);
491 self.set_role(RegionRole::DowngradingLeader);
492 }
493 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
494 info!("Region {} already in downgrading mode", self.region_id);
496 }
497 _ => {
498 warn!(
499 "Cannot start downgrade for region {} from state {:?}",
500 self.region_id, current_state
501 );
502 }
503 }
504 }
505 }
506
507 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
509 let manifest_meta = &manager.manifest().metadata;
511 let current_version = self.version();
512 let current_meta = ¤t_version.metadata;
513 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
514 let action = RegionMetaAction::Change(RegionChange {
515 metadata: current_meta.clone(),
516 sst_format: current_version.options.sst_format.unwrap_or_default(),
517 append_mode: None,
518 });
519 let result = manager
520 .update(RegionMetaActionList::with_action(action), false)
521 .await;
522
523 match result {
524 Ok(version) => {
525 info!(
526 "Successfully persisted backfilled metadata for region {}, version: {}",
527 self.region_id, version
528 );
529 }
530 Err(e) => {
531 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
532 }
533 }
534 }
535 }
536
537 drop(manager);
538
539 Ok(())
540 }
541
542 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
545 if let Err(e) = self
546 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
547 {
548 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
549 }
550 }
551
552 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
555 if let Err(e) =
556 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
557 {
558 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
559 }
560 }
561
562 pub(crate) fn region_statistic(&self) -> RegionStatistic {
564 let version = self.version();
565 let memtables = &version.memtables;
566 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
567
568 let sst_usage = version.ssts.sst_usage();
569 let index_usage = version.ssts.index_usage();
570 let flushed_entry_id = version.flushed_entry_id;
571
572 let wal_usage = self.estimated_wal_usage(memtable_usage);
573 let manifest_usage = self.stats.total_manifest_size();
574 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
575 let num_files = version.ssts.num_files();
576 let manifest_version = self.stats.manifest_version();
577 let file_removed_cnt = self.stats.file_removed_cnt();
578
579 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
580 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
581
582 RegionStatistic {
583 num_rows,
584 memtable_size: memtable_usage,
585 wal_size: wal_usage,
586 manifest_size: manifest_usage,
587 sst_size: sst_usage,
588 sst_num: num_files,
589 index_size: index_usage,
590 manifest: RegionManifestInfo::Mito {
591 manifest_version,
592 flushed_entry_id,
593 file_removed_cnt,
594 },
595 data_topic_latest_entry_id: topic_latest_entry_id,
596 metadata_topic_latest_entry_id: topic_latest_entry_id,
597 written_bytes,
598 }
599 }
600
601 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
604 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
605 }
606
607 fn compare_exchange_state(
610 &self,
611 expect: RegionLeaderState,
612 state: RegionRoleState,
613 ) -> Result<()> {
614 self.manifest_ctx
615 .state
616 .compare_exchange(RegionRoleState::Leader(expect), state)
617 .map_err(|actual| {
618 RegionStateSnafu {
619 region_id: self.region_id,
620 state: actual,
621 expect: RegionRoleState::Leader(expect),
622 }
623 .build()
624 })?;
625 Ok(())
626 }
627
628 pub fn access_layer(&self) -> AccessLayerRef {
629 self.access_layer.clone()
630 }
631
632 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
634 let table_dir = self.table_dir();
635 let path_type = self.access_layer.path_type();
636
637 let visible_ssts = self
638 .version()
639 .ssts
640 .levels()
641 .iter()
642 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
643 .collect::<HashSet<_>>();
644
645 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
646 let staging_files = self
647 .manifest_ctx
648 .staging_manifest()
649 .await
650 .map(|m| m.files.clone())
651 .unwrap_or_default();
652 let files = manifest_files
653 .into_iter()
654 .chain(staging_files.into_iter())
655 .collect::<HashMap<_, _>>();
656
657 files
658 .values()
659 .map(|meta| {
660 let region_id = self.region_id;
661 let origin_region_id = meta.region_id;
662 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
663 {
664 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
665 (
666 meta.index_version,
667 Some(index_file_path),
668 Some(meta.index_file_size),
669 )
670 } else {
671 (0, None, None)
672 };
673 let visible = visible_ssts.contains(&meta.file_id);
674 ManifestSstEntry {
675 table_dir: table_dir.to_string(),
676 region_id,
677 table_id: region_id.table_id(),
678 region_number: region_id.region_number(),
679 region_group: region_id.region_group(),
680 region_sequence: region_id.region_sequence(),
681 file_id: meta.file_id.to_string(),
682 index_version,
683 level: meta.level,
684 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
685 file_size: meta.file_size,
686 index_file_path,
687 index_file_size,
688 num_rows: meta.num_rows,
689 num_row_groups: meta.num_row_groups,
690 num_series: Some(meta.num_series),
691 min_ts: meta.time_range.0,
692 max_ts: meta.time_range.1,
693 sequence: meta.sequence.map(|s| s.get()),
694 origin_region_id,
695 node_id: None,
696 visible,
697 }
698 })
699 .collect()
700 }
701
702 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
704 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
705
706 file_ids
707 .iter()
708 .map(|file_id| manifest_files.get(file_id).cloned())
709 .collect::<Vec<_>>()
710 }
711
712 pub(crate) async fn exit_staging_on_success(
714 &self,
715 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
716 ) -> Result<()> {
717 let current_state = self.manifest_ctx.current_state();
718 ensure!(
719 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
720 RegionStateSnafu {
721 region_id: self.region_id,
722 state: current_state,
723 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
724 }
725 );
726
727 let merged_actions = match manager.merge_staged_actions(current_state).await? {
729 Some(actions) => actions,
730 None => {
731 info!(
732 "No staged manifests to merge for region {}, exiting staging mode without changes",
733 self.region_id
734 );
735 self.exit_staging()?;
737 return Ok(());
738 }
739 };
740 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
741 let expect_partition_expr_change = merged_actions
742 .actions
743 .iter()
744 .any(|a| a.is_partition_expr_change());
745 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
746 ensure!(
747 !(expect_change && expect_partition_expr_change),
748 UnexpectedSnafu {
749 reason: "unexpected both change and partition expr change actions in merged actions"
750 }
751 );
752 ensure!(
753 expect_change || expect_partition_expr_change,
754 UnexpectedSnafu {
755 reason: "expect a change or partition expr change action in merged actions"
756 }
757 );
758 ensure!(
759 expect_edit,
760 UnexpectedSnafu {
761 reason: "expect an edit action in merged actions"
762 }
763 );
764
765 let (merged_partition_expr_change, merged_change, merged_edit) =
766 merged_actions.clone().split_region_change_and_edit();
767 if let Some(change) = &merged_change {
768 let current_column_metadatas = &self.version().metadata.column_metadatas;
772 ensure!(
773 change.metadata.column_metadatas == *current_column_metadatas,
774 UnexpectedSnafu {
775 reason: "change action alters column metadata in staging exit"
776 }
777 );
778 }
779
780 let new_version = manager.update(merged_actions, false).await?;
783 info!(
784 "Successfully submitted merged staged manifests for region {}, new version: {}",
785 self.region_id, new_version
786 );
787
788 if let Some(change) = merged_partition_expr_change {
790 let mut new_metadata = self.version().metadata.as_ref().clone();
791 new_metadata.set_partition_expr(change.partition_expr);
792 self.version_control.alter_metadata(new_metadata.into());
793 }
794 if let Some(change) = merged_change {
795 self.version_control.alter_metadata(change.metadata);
796 }
797 self.version_control
798 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
799
800 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
802 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
803 }
804 self.exit_staging()?;
805
806 Ok(())
807 }
808
809 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
815 let is_staging = self.is_staging();
816 if is_staging {
817 let staging_partition_info = self.staging_partition_info.lock().unwrap();
818 if staging_partition_info.is_none() {
819 warn!(
820 "Staging partition expr is none for region {} in staging state",
821 self.region_id
822 );
823 }
824 staging_partition_info
825 .as_ref()
826 .and_then(|info| info.partition_expr().map(ToString::to_string))
827 } else {
828 let version = self.version();
829 version.metadata.partition_expr.clone()
830 }
831 }
832
833 pub fn expected_partition_expr_version(&self) -> u64 {
834 if self.is_staging() {
835 let staging_partition_info = self.staging_partition_info.lock().unwrap();
836 staging_partition_info
837 .as_ref()
838 .map(|info| info.partition_rule_version)
839 .unwrap_or_default()
840 } else {
841 self.version().metadata.partition_expr_version
842 }
843 }
844
845 pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
847 if !self.is_staging() {
848 return false;
849 }
850 let staging_partition_info = self.staging_partition_info.lock().unwrap();
851 staging_partition_info
852 .as_ref()
853 .map(|info| {
854 matches!(
855 info.partition_directive,
856 StagingPartitionDirective::RejectAllWrites
857 )
858 })
859 .unwrap_or(false)
860 }
861}
862
863#[derive(Debug)]
865pub(crate) struct ManifestContext {
866 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
868 state: AtomicCell<RegionRoleState>,
871}
872
873impl ManifestContext {
874 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
875 ManifestContext {
876 manifest_manager: tokio::sync::RwLock::new(manager),
877 state: AtomicCell::new(state),
878 }
879 }
880
881 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
882 self.manifest_manager
883 .read()
884 .await
885 .manifest()
886 .manifest_version
887 }
888
889 pub(crate) async fn has_update(&self) -> Result<bool> {
890 self.manifest_manager.read().await.has_update().await
891 }
892
893 pub(crate) fn current_state(&self) -> RegionRoleState {
895 self.state.load()
896 }
897
898 pub(crate) async fn install_manifest_to(
904 &self,
905 version: ManifestVersion,
906 ) -> Result<Arc<RegionManifest>> {
907 let mut manager = self.manifest_manager.write().await;
908 manager.install_manifest_to(version).await?;
909
910 Ok(manager.manifest())
911 }
912
913 pub(crate) async fn update_manifest(
915 &self,
916 expect_state: RegionLeaderState,
917 action_list: RegionMetaActionList,
918 is_staging: bool,
919 ) -> Result<ManifestVersion> {
920 let mut manager = self.manifest_manager.write().await;
922 let manifest = manager.manifest();
924 let current_state = self.state.load();
927
928 if expect_state != RegionLeaderState::Downgrading {
933 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
934 info!(
935 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
936 manifest.metadata.region_id, expect_state
937 );
938 }
939 ensure!(
940 current_state == RegionRoleState::Leader(expect_state)
941 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
942 UpdateManifestSnafu {
943 region_id: manifest.metadata.region_id,
944 state: current_state,
945 }
946 );
947 } else {
948 ensure!(
949 current_state == RegionRoleState::Leader(expect_state),
950 RegionStateSnafu {
951 region_id: manifest.metadata.region_id,
952 state: current_state,
953 expect: RegionRoleState::Leader(expect_state),
954 }
955 );
956 }
957
958 for action in &action_list.actions {
959 let RegionMetaAction::Edit(edit) = &action else {
961 continue;
962 };
963
964 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
966 continue;
967 };
968
969 if let Some(flushed_entry_id) = edit.flushed_entry_id {
971 ensure!(
972 truncated_entry_id < flushed_entry_id,
973 RegionTruncatedSnafu {
974 region_id: manifest.metadata.region_id,
975 }
976 );
977 }
978
979 if !edit.files_to_remove.is_empty() {
981 for file in &edit.files_to_remove {
983 ensure!(
984 manifest.files.contains_key(&file.file_id),
985 RegionTruncatedSnafu {
986 region_id: manifest.metadata.region_id,
987 }
988 );
989 }
990 }
991 }
992
993 let version = manager.update(action_list, is_staging).await.inspect_err(
995 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
996 )?;
997
998 if self.state.load() == RegionRoleState::Follower {
999 warn!(
1000 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1001 manifest.metadata.region_id
1002 );
1003 }
1004
1005 Ok(version)
1006 }
1007
1008 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1030 match next_role {
1031 RegionRole::Follower => {
1032 match self.state.fetch_update(|state| {
1033 if !matches!(state, RegionRoleState::Follower) {
1034 Some(RegionRoleState::Follower)
1035 } else {
1036 None
1037 }
1038 }) {
1039 Ok(state) => info!(
1040 "Convert region {} to follower, previous role state: {:?}",
1041 region_id, state
1042 ),
1043 Err(state) => {
1044 if state != RegionRoleState::Follower {
1045 warn!(
1046 "Failed to convert region {} to follower, current role state: {:?}",
1047 region_id, state
1048 )
1049 }
1050 }
1051 }
1052 }
1053 RegionRole::Leader => {
1054 match self.state.fetch_update(|state| {
1055 if matches!(
1056 state,
1057 RegionRoleState::Follower
1058 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1059 ) {
1060 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1061 } else {
1062 None
1063 }
1064 }) {
1065 Ok(state) => info!(
1066 "Convert region {} to leader, previous role state: {:?}",
1067 region_id, state
1068 ),
1069 Err(state) => {
1070 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1071 warn!(
1072 "Failed to convert region {} to leader, current role state: {:?}",
1073 region_id, state
1074 )
1075 }
1076 }
1077 }
1078 }
1079 RegionRole::DowngradingLeader => {
1080 match self.state.compare_exchange(
1081 RegionRoleState::Leader(RegionLeaderState::Writable),
1082 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1083 ) {
1084 Ok(state) => info!(
1085 "Convert region {} to downgrading region, previous role state: {:?}",
1086 region_id, state
1087 ),
1088 Err(state) => {
1089 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1090 warn!(
1091 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1092 region_id, state
1093 )
1094 }
1095 }
1096 }
1097 }
1098 }
1099 }
1100
1101 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1103 self.manifest_manager.read().await.manifest()
1104 }
1105
1106 pub(crate) async fn staging_manifest(
1108 &self,
1109 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1110 self.manifest_manager.read().await.staging_manifest()
1111 }
1112}
1113
1114pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1115
1116#[derive(Debug, Default)]
1118pub(crate) struct RegionMap {
1119 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1120}
1121
1122impl RegionMap {
1123 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1125 let regions = self.regions.read().unwrap();
1126 regions.contains_key(®ion_id)
1127 }
1128
1129 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1131 let mut regions = self.regions.write().unwrap();
1132 regions.insert(region.region_id, region);
1133 }
1134
1135 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1137 let regions = self.regions.read().unwrap();
1138 regions.get(®ion_id).cloned()
1139 }
1140
1141 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1145 let region = self
1146 .get_region(region_id)
1147 .context(RegionNotFoundSnafu { region_id })?;
1148 ensure!(
1149 region.is_writable(),
1150 RegionStateSnafu {
1151 region_id,
1152 state: region.state(),
1153 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1154 }
1155 );
1156 Ok(region)
1157 }
1158
1159 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1163 let region = self
1164 .get_region(region_id)
1165 .context(RegionNotFoundSnafu { region_id })?;
1166 ensure!(
1167 region.is_follower(),
1168 RegionStateSnafu {
1169 region_id,
1170 state: region.state(),
1171 expect: RegionRoleState::Follower,
1172 }
1173 );
1174
1175 Ok(region)
1176 }
1177
1178 pub(crate) fn get_region_or<F: OnFailure>(
1182 &self,
1183 region_id: RegionId,
1184 cb: &mut F,
1185 ) -> Option<MitoRegionRef> {
1186 match self
1187 .get_region(region_id)
1188 .context(RegionNotFoundSnafu { region_id })
1189 {
1190 Ok(region) => Some(region),
1191 Err(e) => {
1192 cb.on_failure(e);
1193 None
1194 }
1195 }
1196 }
1197
1198 pub(crate) fn writable_region_or<F: OnFailure>(
1202 &self,
1203 region_id: RegionId,
1204 cb: &mut F,
1205 ) -> Option<MitoRegionRef> {
1206 match self.writable_region(region_id) {
1207 Ok(region) => Some(region),
1208 Err(e) => {
1209 cb.on_failure(e);
1210 None
1211 }
1212 }
1213 }
1214
1215 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1219 let region = self.writable_region(region_id)?;
1220 if region.is_staging() {
1221 return Err(crate::error::RegionStateSnafu {
1222 region_id,
1223 state: region.state(),
1224 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1225 }
1226 .build());
1227 }
1228 Ok(region)
1229 }
1230
1231 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1235 let region = self
1236 .get_region(region_id)
1237 .context(RegionNotFoundSnafu { region_id })?;
1238 ensure!(
1239 region.is_staging(),
1240 RegionStateSnafu {
1241 region_id,
1242 state: region.state(),
1243 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1244 }
1245 );
1246 Ok(region)
1247 }
1248
1249 fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1254 let region = self
1255 .get_region(region_id)
1256 .context(RegionNotFoundSnafu { region_id })?;
1257 if region.is_flushable() {
1258 Ok(Some(region))
1259 } else {
1260 Ok(None)
1261 }
1262 }
1263
1264 pub(crate) fn flushable_region_or<F: OnFailure>(
1269 &self,
1270 region_id: RegionId,
1271 cb: &mut F,
1272 ) -> Option<MitoRegionRef> {
1273 match self.flushable_region(region_id) {
1274 Ok(region) => region,
1275 Err(e) => {
1276 cb.on_failure(e);
1277 None
1278 }
1279 }
1280 }
1281
1282 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1284 let mut regions = self.regions.write().unwrap();
1285 regions.remove(®ion_id)
1286 }
1287
1288 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1290 let regions = self.regions.read().unwrap();
1291 regions.values().cloned().collect()
1292 }
1293
1294 pub(crate) fn clear(&self) {
1296 self.regions.write().unwrap().clear();
1297 }
1298}
1299
1300pub(crate) type RegionMapRef = Arc<RegionMap>;
1301
1302#[derive(Debug, Default)]
1304pub(crate) struct OpeningRegions {
1305 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1306}
1307
1308impl OpeningRegions {
1309 pub(crate) fn wait_for_opening_region(
1311 &self,
1312 region_id: RegionId,
1313 sender: OptionOutputTx,
1314 ) -> Option<OptionOutputTx> {
1315 let mut regions = self.regions.write().unwrap();
1316 match regions.entry(region_id) {
1317 Entry::Occupied(mut senders) => {
1318 senders.get_mut().push(sender);
1319 None
1320 }
1321 Entry::Vacant(_) => Some(sender),
1322 }
1323 }
1324
1325 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1327 let regions = self.regions.read().unwrap();
1328 regions.contains_key(®ion_id)
1329 }
1330
1331 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1333 let mut regions = self.regions.write().unwrap();
1334 regions.insert(region, vec![sender]);
1335 }
1336
1337 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1339 let mut regions = self.regions.write().unwrap();
1340 regions.remove(®ion_id).unwrap_or_default()
1341 }
1342
1343 #[cfg(test)]
1344 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1345 let regions = self.regions.read().unwrap();
1346 if let Some(senders) = regions.get(®ion_id) {
1347 senders.len()
1348 } else {
1349 0
1350 }
1351 }
1352}
1353
1354pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1355
1356#[derive(Debug, Default)]
1358pub(crate) struct CatchupRegions {
1359 regions: RwLock<HashSet<RegionId>>,
1360}
1361
1362impl CatchupRegions {
1363 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1365 let regions = self.regions.read().unwrap();
1366 regions.contains(®ion_id)
1367 }
1368
1369 pub(crate) fn insert_region(&self, region_id: RegionId) {
1371 let mut regions = self.regions.write().unwrap();
1372 regions.insert(region_id);
1373 }
1374
1375 pub(crate) fn remove_region(&self, region_id: RegionId) {
1377 let mut regions = self.regions.write().unwrap();
1378 regions.remove(®ion_id);
1379 }
1380}
1381
1382pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1383
1384#[derive(Default, Debug, Clone)]
1386pub struct ManifestStats {
1387 pub(crate) total_manifest_size: Arc<AtomicU64>,
1388 pub(crate) manifest_version: Arc<AtomicU64>,
1389 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1390}
1391
1392impl ManifestStats {
1393 fn total_manifest_size(&self) -> u64 {
1394 self.total_manifest_size.load(Ordering::Relaxed)
1395 }
1396
1397 fn manifest_version(&self) -> u64 {
1398 self.manifest_version.load(Ordering::Relaxed)
1399 }
1400
1401 fn file_removed_cnt(&self) -> u64 {
1402 self.file_removed_cnt.load(Ordering::Relaxed)
1403 }
1404}
1405
1406pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1408 match partition_expr_str {
1409 None => Ok(None),
1410 Some("") => Ok(None),
1411 Some(json_str) => {
1412 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1413 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1414 Ok(expr)
1415 }
1416 }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421 use std::sync::atomic::AtomicU64;
1422 use std::sync::{Arc, Mutex};
1423
1424 use common_datasource::compression::CompressionType;
1425 use common_test_util::temp_dir::create_temp_dir;
1426 use crossbeam_utils::atomic::AtomicCell;
1427 use object_store::ObjectStore;
1428 use object_store::services::Fs;
1429 use store_api::logstore::provider::Provider;
1430 use store_api::region_engine::RegionRole;
1431 use store_api::region_request::PathType;
1432 use store_api::storage::RegionId;
1433
1434 use crate::access_layer::AccessLayer;
1435 use crate::error::Error;
1436 use crate::manifest::action::{
1437 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1438 };
1439 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1440 use crate::region::{
1441 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1442 };
1443 use crate::sst::FormatType;
1444 use crate::sst::index::intermediate::IntermediateManager;
1445 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1446 use crate::test_util::scheduler_util::SchedulerEnv;
1447 use crate::test_util::version_util::VersionControlBuilder;
1448 use crate::time_provider::StdTimeProvider;
1449
1450 #[test]
1451 fn test_region_state_lock_free() {
1452 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1453 }
1454
1455 async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1456 let builder = VersionControlBuilder::new();
1457 let version_control = Arc::new(builder.build());
1458 let metadata = version_control.current().version.metadata.clone();
1459
1460 let manager = RegionManifestManager::new(
1461 metadata.clone(),
1462 0,
1463 RegionManifestOptions {
1464 manifest_dir: "".to_string(),
1465 object_store: env.access_layer.object_store().clone(),
1466 compress_type: CompressionType::Uncompressed,
1467 checkpoint_distance: 10,
1468 remove_file_options: Default::default(),
1469 manifest_cache: None,
1470 },
1471 FormatType::PrimaryKey,
1472 &Default::default(),
1473 )
1474 .await
1475 .unwrap();
1476
1477 let manifest_ctx = Arc::new(ManifestContext::new(
1478 manager,
1479 RegionRoleState::Leader(RegionLeaderState::Writable),
1480 ));
1481
1482 MitoRegion {
1483 region_id: metadata.region_id,
1484 version_control,
1485 access_layer: env.access_layer.clone(),
1486 manifest_ctx,
1487 file_purger: crate::test_util::new_noop_file_purger(),
1488 provider: Provider::noop_provider(),
1489 last_flush_millis: Default::default(),
1490 last_compaction_millis: Default::default(),
1491 time_provider: Arc::new(StdTimeProvider),
1492 topic_latest_entry_id: Default::default(),
1493 written_bytes: Arc::new(AtomicU64::new(0)),
1494 stats: ManifestStats::default(),
1495 staging_partition_info: Mutex::new(None),
1496 }
1497 }
1498
1499 fn empty_edit() -> RegionEdit {
1500 RegionEdit {
1501 files_to_add: Vec::new(),
1502 files_to_remove: Vec::new(),
1503 timestamp_ms: None,
1504 compaction_time_window: None,
1505 flushed_entry_id: None,
1506 flushed_sequence: None,
1507 committed_sequence: None,
1508 }
1509 }
1510
1511 #[tokio::test]
1512 async fn test_exit_staging_partition_expr_change_and_edit_success() {
1513 let env = SchedulerEnv::new().await;
1514 let region = build_test_region(&env).await;
1515
1516 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1517 region.set_staging(&mut manager).await.unwrap();
1518 manager
1519 .update(
1520 RegionMetaActionList::new(vec![
1521 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1522 partition_expr: Some("expr_a".to_string()),
1523 }),
1524 RegionMetaAction::Edit(empty_edit()),
1525 ]),
1526 true,
1527 )
1528 .await
1529 .unwrap();
1530
1531 region.exit_staging_on_success(&mut manager).await.unwrap();
1532 drop(manager);
1533
1534 assert_eq!(
1535 region.version().metadata.partition_expr.as_deref(),
1536 Some("expr_a")
1537 );
1538 assert_eq!(
1539 region.state(),
1540 RegionRoleState::Leader(RegionLeaderState::Writable)
1541 );
1542 }
1543
1544 #[tokio::test]
1545 async fn test_exit_staging_change_with_same_columns_success() {
1546 let env = SchedulerEnv::new().await;
1547 let region = build_test_region(&env).await;
1548
1549 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1550 region.set_staging(&mut manager).await.unwrap();
1551
1552 let mut changed_metadata = region.version().metadata.as_ref().clone();
1553 changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1554
1555 manager
1556 .update(
1557 RegionMetaActionList::new(vec![
1558 RegionMetaAction::Change(RegionChange {
1559 metadata: Arc::new(changed_metadata),
1560 sst_format: FormatType::PrimaryKey,
1561 append_mode: None,
1562 }),
1563 RegionMetaAction::Edit(empty_edit()),
1564 ]),
1565 true,
1566 )
1567 .await
1568 .unwrap();
1569
1570 region.exit_staging_on_success(&mut manager).await.unwrap();
1571 drop(manager);
1572
1573 assert_eq!(
1574 region.version().metadata.partition_expr.as_deref(),
1575 Some("expr_b")
1576 );
1577 assert_eq!(
1578 region.state(),
1579 RegionRoleState::Leader(RegionLeaderState::Writable)
1580 );
1581 }
1582
1583 #[tokio::test]
1584 async fn test_exit_staging_change_with_different_columns_fails() {
1585 let env = SchedulerEnv::new().await;
1586 let region = build_test_region(&env).await;
1587
1588 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1589 region.set_staging(&mut manager).await.unwrap();
1590
1591 let mut changed_metadata = region.version().metadata.as_ref().clone();
1592 changed_metadata.column_metadatas.rotate_left(1);
1593
1594 manager
1595 .update(
1596 RegionMetaActionList::new(vec![
1597 RegionMetaAction::Change(RegionChange {
1598 metadata: Arc::new(changed_metadata),
1599 sst_format: FormatType::PrimaryKey,
1600 append_mode: None,
1601 }),
1602 RegionMetaAction::Edit(empty_edit()),
1603 ]),
1604 true,
1605 )
1606 .await
1607 .unwrap();
1608
1609 let result = region.exit_staging_on_success(&mut manager).await;
1610 assert!(matches!(result, Err(Error::Unexpected { .. })));
1611 }
1612
1613 #[tokio::test]
1614 async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1615 let env = SchedulerEnv::new().await;
1616 let region = build_test_region(&env).await;
1617
1618 let mut manager = region.manifest_ctx.manifest_manager.write().await;
1619 region.set_staging(&mut manager).await.unwrap();
1620
1621 let mut changed_metadata = region.version().metadata.as_ref().clone();
1622 changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1623
1624 manager
1625 .update(
1626 RegionMetaActionList::new(vec![
1627 RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1628 partition_expr: Some("expr_c".to_string()),
1629 }),
1630 RegionMetaAction::Change(RegionChange {
1631 metadata: Arc::new(changed_metadata),
1632 sst_format: FormatType::PrimaryKey,
1633 append_mode: None,
1634 }),
1635 RegionMetaAction::Edit(empty_edit()),
1636 ]),
1637 true,
1638 )
1639 .await
1640 .unwrap();
1641
1642 let result = region.exit_staging_on_success(&mut manager).await;
1643 assert!(matches!(result, Err(Error::Unexpected { .. })));
1644 }
1645
1646 #[tokio::test]
1647 async fn test_set_region_state() {
1648 let env = SchedulerEnv::new().await;
1649 let builder = VersionControlBuilder::new();
1650 let version_control = Arc::new(builder.build());
1651 let manifest_ctx = env
1652 .mock_manifest_context(version_control.current().version.metadata.clone())
1653 .await;
1654
1655 let region_id = RegionId::new(1024, 0);
1656 manifest_ctx.set_role(RegionRole::Follower, region_id);
1658 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1659
1660 manifest_ctx.set_role(RegionRole::Leader, region_id);
1662 assert_eq!(
1663 manifest_ctx.state.load(),
1664 RegionRoleState::Leader(RegionLeaderState::Writable)
1665 );
1666
1667 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1669 assert_eq!(
1670 manifest_ctx.state.load(),
1671 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1672 );
1673
1674 manifest_ctx.set_role(RegionRole::Follower, region_id);
1676 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1677
1678 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1680 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1681
1682 manifest_ctx.set_role(RegionRole::Leader, region_id);
1684 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1685 assert_eq!(
1686 manifest_ctx.state.load(),
1687 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1688 );
1689
1690 manifest_ctx.set_role(RegionRole::Leader, region_id);
1692 assert_eq!(
1693 manifest_ctx.state.load(),
1694 RegionRoleState::Leader(RegionLeaderState::Writable)
1695 );
1696 }
1697
1698 #[tokio::test]
1699 async fn test_staging_state_validation() {
1700 let env = SchedulerEnv::new().await;
1701 let builder = VersionControlBuilder::new();
1702 let version_control = Arc::new(builder.build());
1703
1704 let staging_ctx = {
1706 let manager = RegionManifestManager::new(
1707 version_control.current().version.metadata.clone(),
1708 0,
1709 RegionManifestOptions {
1710 manifest_dir: "".to_string(),
1711 object_store: env.access_layer.object_store().clone(),
1712 compress_type: CompressionType::Uncompressed,
1713 checkpoint_distance: 10,
1714 remove_file_options: Default::default(),
1715 manifest_cache: None,
1716 },
1717 FormatType::PrimaryKey,
1718 &Default::default(),
1719 )
1720 .await
1721 .unwrap();
1722 Arc::new(ManifestContext::new(
1723 manager,
1724 RegionRoleState::Leader(RegionLeaderState::Staging),
1725 ))
1726 };
1727
1728 assert_eq!(
1730 staging_ctx.current_state(),
1731 RegionRoleState::Leader(RegionLeaderState::Staging)
1732 );
1733
1734 let writable_ctx = env
1736 .mock_manifest_context(version_control.current().version.metadata.clone())
1737 .await;
1738
1739 assert_eq!(
1740 writable_ctx.current_state(),
1741 RegionRoleState::Leader(RegionLeaderState::Writable)
1742 );
1743 }
1744
1745 #[tokio::test]
1746 async fn test_staging_state_transitions() {
1747 let builder = VersionControlBuilder::new();
1748 let version_control = Arc::new(builder.build());
1749 let metadata = version_control.current().version.metadata.clone();
1750
1751 let temp_dir = create_temp_dir("");
1753 let path_str = temp_dir.path().display().to_string();
1754 let fs_builder = Fs::default().root(&path_str);
1755 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1756
1757 let index_aux_path = temp_dir.path().join("index_aux");
1758 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1759 .await
1760 .unwrap();
1761 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1762 .await
1763 .unwrap();
1764
1765 let access_layer = Arc::new(AccessLayer::new(
1766 "",
1767 PathType::Bare,
1768 object_store,
1769 puffin_mgr,
1770 intm_mgr,
1771 ));
1772
1773 let manager = RegionManifestManager::new(
1774 metadata.clone(),
1775 0,
1776 RegionManifestOptions {
1777 manifest_dir: "".to_string(),
1778 object_store: access_layer.object_store().clone(),
1779 compress_type: CompressionType::Uncompressed,
1780 checkpoint_distance: 10,
1781 remove_file_options: Default::default(),
1782 manifest_cache: None,
1783 },
1784 FormatType::PrimaryKey,
1785 &Default::default(),
1786 )
1787 .await
1788 .unwrap();
1789
1790 let manifest_ctx = Arc::new(ManifestContext::new(
1791 manager,
1792 RegionRoleState::Leader(RegionLeaderState::Writable),
1793 ));
1794
1795 let region = MitoRegion {
1796 region_id: metadata.region_id,
1797 version_control,
1798 access_layer,
1799 manifest_ctx: manifest_ctx.clone(),
1800 file_purger: crate::test_util::new_noop_file_purger(),
1801 provider: Provider::noop_provider(),
1802 last_flush_millis: Default::default(),
1803 last_compaction_millis: Default::default(),
1804 time_provider: Arc::new(StdTimeProvider),
1805 topic_latest_entry_id: Default::default(),
1806 written_bytes: Arc::new(AtomicU64::new(0)),
1807 stats: ManifestStats::default(),
1808 staging_partition_info: Mutex::new(None),
1809 };
1810
1811 assert_eq!(
1813 region.state(),
1814 RegionRoleState::Leader(RegionLeaderState::Writable)
1815 );
1816 assert!(!region.is_staging());
1817
1818 let mut manager = manifest_ctx.manifest_manager.write().await;
1820 region.set_staging(&mut manager).await.unwrap();
1821 drop(manager);
1822 assert_eq!(
1823 region.state(),
1824 RegionRoleState::Leader(RegionLeaderState::Staging)
1825 );
1826 assert!(region.is_staging());
1827
1828 region.exit_staging().unwrap();
1830 assert_eq!(
1831 region.state(),
1832 RegionRoleState::Leader(RegionLeaderState::Writable)
1833 );
1834 assert!(!region.is_staging());
1835
1836 {
1838 let manager = manifest_ctx.manifest_manager.write().await;
1840 let dummy_actions = RegionMetaActionList::new(vec![]);
1841 let dummy_bytes = dummy_actions.encode().unwrap();
1842
1843 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1845 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1846 drop(manager);
1847
1848 let manager = manifest_ctx.manifest_manager.read().await;
1850 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1851 assert_eq!(
1852 dirty_manifests.len(),
1853 2,
1854 "Should have 2 dirty staging files"
1855 );
1856 drop(manager);
1857
1858 let mut manager = manifest_ctx.manifest_manager.write().await;
1860 region.set_staging(&mut manager).await.unwrap();
1861 drop(manager);
1862
1863 let manager = manifest_ctx.manifest_manager.read().await;
1865 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1866 assert_eq!(
1867 cleaned_manifests.len(),
1868 0,
1869 "Dirty staging files should be cleaned up"
1870 );
1871 drop(manager);
1872
1873 region.exit_staging().unwrap();
1875 }
1876
1877 let mut manager = manifest_ctx.manifest_manager.write().await;
1879 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1881 let mut manager = manifest_ctx.manifest_manager.write().await;
1882 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1884 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1887}