1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44 UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::file_purger::FilePurgerRef;
53use crate::sst::location::{index_file_path, sst_file_path};
54use crate::time_provider::TimeProviderRef;
55
56const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
58
59#[derive(Debug)]
61pub struct RegionUsage {
62 pub region_id: RegionId,
63 pub wal_usage: u64,
64 pub sst_usage: u64,
65 pub manifest_usage: u64,
66}
67
68impl RegionUsage {
69 pub fn disk_usage(&self) -> u64 {
70 self.wal_usage + self.sst_usage + self.manifest_usage
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum RegionLeaderState {
76 Writable,
78 Staging,
80 Altering,
82 Dropping,
84 Truncating,
86 Editing,
88 Downgrading,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RegionRoleState {
94 Leader(RegionLeaderState),
95 Follower,
96}
97
98#[derive(Debug)]
104pub struct MitoRegion {
105 pub(crate) region_id: RegionId,
110
111 pub(crate) version_control: VersionControlRef,
115 pub(crate) access_layer: AccessLayerRef,
117 pub(crate) manifest_ctx: ManifestContextRef,
119 pub(crate) file_purger: FilePurgerRef,
121 pub(crate) provider: Provider,
123 last_flush_millis: AtomicI64,
125 last_compaction_millis: AtomicI64,
127 time_provider: TimeProviderRef,
129 pub(crate) topic_latest_entry_id: AtomicU64,
139 pub(crate) written_bytes: Arc<AtomicU64>,
141 stats: ManifestStats,
143}
144
145pub type MitoRegionRef = Arc<MitoRegion>;
146
147impl MitoRegion {
148 pub(crate) async fn stop(&self) {
150 self.manifest_ctx
151 .manifest_manager
152 .write()
153 .await
154 .stop()
155 .await;
156
157 info!(
158 "Stopped region manifest manager, region_id: {}",
159 self.region_id
160 );
161 }
162
163 pub(crate) fn metadata(&self) -> RegionMetadataRef {
165 let version_data = self.version_control.current();
166 version_data.version.metadata.clone()
167 }
168
169 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
171 let version_data = self.version_control.current();
172 version_data.version.metadata.primary_key_encoding
173 }
174
175 pub(crate) fn version(&self) -> VersionRef {
177 let version_data = self.version_control.current();
178 version_data.version
179 }
180
181 pub(crate) fn last_flush_millis(&self) -> i64 {
183 self.last_flush_millis.load(Ordering::Relaxed)
184 }
185
186 pub(crate) fn update_flush_millis(&self) {
188 let now = self.time_provider.current_time_millis();
189 self.last_flush_millis.store(now, Ordering::Relaxed);
190 }
191
192 pub(crate) fn last_compaction_millis(&self) -> i64 {
194 self.last_compaction_millis.load(Ordering::Relaxed)
195 }
196
197 pub(crate) fn update_compaction_millis(&self) {
199 let now = self.time_provider.current_time_millis();
200 self.last_compaction_millis.store(now, Ordering::Relaxed);
201 }
202
203 pub(crate) fn table_dir(&self) -> &str {
205 self.access_layer.table_dir()
206 }
207
208 pub(crate) fn is_writable(&self) -> bool {
210 matches!(
211 self.manifest_ctx.state.load(),
212 RegionRoleState::Leader(RegionLeaderState::Writable)
213 | RegionRoleState::Leader(RegionLeaderState::Staging)
214 )
215 }
216
217 pub(crate) fn is_flushable(&self) -> bool {
219 matches!(
220 self.manifest_ctx.state.load(),
221 RegionRoleState::Leader(RegionLeaderState::Writable)
222 | RegionRoleState::Leader(RegionLeaderState::Staging)
223 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
224 )
225 }
226
227 pub(crate) fn should_abort_index(&self) -> bool {
229 matches!(
230 self.manifest_ctx.state.load(),
231 RegionRoleState::Follower
232 | RegionRoleState::Leader(RegionLeaderState::Dropping)
233 | RegionRoleState::Leader(RegionLeaderState::Truncating)
234 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
235 | RegionRoleState::Leader(RegionLeaderState::Staging)
236 )
237 }
238
239 pub(crate) fn is_downgrading(&self) -> bool {
241 matches!(
242 self.manifest_ctx.state.load(),
243 RegionRoleState::Leader(RegionLeaderState::Downgrading)
244 )
245 }
246
247 #[allow(dead_code)]
249 pub(crate) fn is_staging(&self) -> bool {
250 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
251 }
252
253 pub fn region_id(&self) -> RegionId {
254 self.region_id
255 }
256
257 pub fn find_committed_sequence(&self) -> SequenceNumber {
258 self.version_control.committed_sequence()
259 }
260
261 pub fn is_follower(&self) -> bool {
263 self.manifest_ctx.state.load() == RegionRoleState::Follower
264 }
265
266 pub(crate) fn state(&self) -> RegionRoleState {
268 self.manifest_ctx.state.load()
269 }
270
271 pub(crate) fn set_role(&self, next_role: RegionRole) {
273 self.manifest_ctx.set_role(next_role, self.region_id);
274 }
275
276 pub(crate) fn set_altering(&self) -> Result<()> {
279 self.compare_exchange_state(
280 RegionLeaderState::Writable,
281 RegionRoleState::Leader(RegionLeaderState::Altering),
282 )
283 }
284
285 pub(crate) fn set_dropping(&self) -> Result<()> {
288 self.compare_exchange_state(
289 RegionLeaderState::Writable,
290 RegionRoleState::Leader(RegionLeaderState::Dropping),
291 )
292 }
293
294 pub(crate) fn set_truncating(&self) -> Result<()> {
297 self.compare_exchange_state(
298 RegionLeaderState::Writable,
299 RegionRoleState::Leader(RegionLeaderState::Truncating),
300 )
301 }
302
303 pub(crate) fn set_editing(&self) -> Result<()> {
306 self.compare_exchange_state(
307 RegionLeaderState::Writable,
308 RegionRoleState::Leader(RegionLeaderState::Editing),
309 )
310 }
311
312 pub(crate) async fn set_staging(
318 &self,
319 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
320 ) -> Result<()> {
321 manager.store().clear_staging_manifests().await?;
322
323 self.compare_exchange_state(
324 RegionLeaderState::Writable,
325 RegionRoleState::Leader(RegionLeaderState::Staging),
326 )
327 }
328
329 fn exit_staging(&self) -> Result<()> {
334 self.compare_exchange_state(
335 RegionLeaderState::Staging,
336 RegionRoleState::Leader(RegionLeaderState::Writable),
337 )
338 }
339
340 pub(crate) async fn set_role_state_gracefully(
342 &self,
343 state: SettableRegionRoleState,
344 ) -> Result<()> {
345 let mut manager = self.manifest_ctx.manifest_manager.write().await;
346 let current_state = self.state();
347
348 match state {
349 SettableRegionRoleState::Leader => {
350 match current_state {
353 RegionRoleState::Leader(RegionLeaderState::Staging) => {
354 info!("Exiting staging mode for region {}", self.region_id);
355 self.exit_staging_on_success(&mut manager).await?;
357 }
358 RegionRoleState::Leader(RegionLeaderState::Writable) => {
359 info!("Region {} already in normal leader mode", self.region_id);
361 }
362 _ => {
363 return Err(RegionStateSnafu {
365 region_id: self.region_id,
366 state: current_state,
367 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
368 }
369 .build());
370 }
371 }
372 }
373
374 SettableRegionRoleState::StagingLeader => {
375 match current_state {
378 RegionRoleState::Leader(RegionLeaderState::Writable) => {
379 info!("Entering staging mode for region {}", self.region_id);
380 self.set_staging(&mut manager).await?;
381 }
382 RegionRoleState::Leader(RegionLeaderState::Staging) => {
383 info!("Region {} already in staging mode", self.region_id);
385 }
386 _ => {
387 return Err(RegionStateSnafu {
388 region_id: self.region_id,
389 state: current_state,
390 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
391 }
392 .build());
393 }
394 }
395 }
396
397 SettableRegionRoleState::Follower => {
398 match current_state {
400 RegionRoleState::Leader(RegionLeaderState::Staging) => {
401 info!(
402 "Exiting staging and demoting region {} to follower",
403 self.region_id
404 );
405 self.exit_staging()?;
406 self.set_role(RegionRole::Follower);
407 }
408 RegionRoleState::Leader(_) => {
409 info!("Demoting region {} from leader to follower", self.region_id);
410 self.set_role(RegionRole::Follower);
411 }
412 RegionRoleState::Follower => {
413 info!("Region {} already in follower mode", self.region_id);
415 }
416 }
417 }
418
419 SettableRegionRoleState::DowngradingLeader => {
420 match current_state {
422 RegionRoleState::Leader(RegionLeaderState::Staging) => {
423 info!(
424 "Exiting staging and entering downgrade for region {}",
425 self.region_id
426 );
427 self.exit_staging()?;
428 self.set_role(RegionRole::DowngradingLeader);
429 }
430 RegionRoleState::Leader(RegionLeaderState::Writable) => {
431 info!("Starting downgrade for region {}", self.region_id);
432 self.set_role(RegionRole::DowngradingLeader);
433 }
434 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
435 info!("Region {} already in downgrading mode", self.region_id);
437 }
438 _ => {
439 warn!(
440 "Cannot start downgrade for region {} from state {:?}",
441 self.region_id, current_state
442 );
443 }
444 }
445 }
446 }
447
448 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
450 let manifest_meta = &manager.manifest().metadata;
452 let current_version = self.version();
453 let current_meta = ¤t_version.metadata;
454 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
455 let action = RegionMetaAction::Change(RegionChange {
456 metadata: current_meta.clone(),
457 sst_format: current_version.options.sst_format.unwrap_or_default(),
458 });
459 let result = manager
460 .update(
461 RegionMetaActionList::with_action(action),
462 RegionRoleState::Leader(RegionLeaderState::Writable),
463 )
464 .await;
465
466 match result {
467 Ok(version) => {
468 info!(
469 "Successfully persisted backfilled metadata for region {}, version: {}",
470 self.region_id, version
471 );
472 }
473 Err(e) => {
474 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
475 }
476 }
477 }
478 }
479
480 drop(manager);
481
482 Ok(())
483 }
484
485 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
488 if let Err(e) = self
489 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
490 {
491 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
492 }
493 }
494
495 pub(crate) fn region_statistic(&self) -> RegionStatistic {
497 let version = self.version();
498 let memtables = &version.memtables;
499 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
500
501 let sst_usage = version.ssts.sst_usage();
502 let index_usage = version.ssts.index_usage();
503 let flushed_entry_id = version.flushed_entry_id;
504
505 let wal_usage = self.estimated_wal_usage(memtable_usage);
506 let manifest_usage = self.stats.total_manifest_size();
507 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
508 let num_files = version.ssts.num_files();
509 let manifest_version = self.stats.manifest_version();
510 let file_removed_cnt = self.stats.file_removed_cnt();
511
512 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
513 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
514
515 RegionStatistic {
516 num_rows,
517 memtable_size: memtable_usage,
518 wal_size: wal_usage,
519 manifest_size: manifest_usage,
520 sst_size: sst_usage,
521 sst_num: num_files,
522 index_size: index_usage,
523 manifest: RegionManifestInfo::Mito {
524 manifest_version,
525 flushed_entry_id,
526 file_removed_cnt,
527 },
528 data_topic_latest_entry_id: topic_latest_entry_id,
529 metadata_topic_latest_entry_id: topic_latest_entry_id,
530 written_bytes,
531 }
532 }
533
534 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
537 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
538 }
539
540 fn compare_exchange_state(
543 &self,
544 expect: RegionLeaderState,
545 state: RegionRoleState,
546 ) -> Result<()> {
547 self.manifest_ctx
548 .state
549 .compare_exchange(RegionRoleState::Leader(expect), state)
550 .map_err(|actual| {
551 RegionStateSnafu {
552 region_id: self.region_id,
553 state: actual,
554 expect: RegionRoleState::Leader(expect),
555 }
556 .build()
557 })?;
558 Ok(())
559 }
560
561 pub fn access_layer(&self) -> AccessLayerRef {
562 self.access_layer.clone()
563 }
564
565 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
567 let table_dir = self.table_dir();
568 let path_type = self.access_layer.path_type();
569
570 let visible_ssts = self
571 .version()
572 .ssts
573 .levels()
574 .iter()
575 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
576 .collect::<HashSet<_>>();
577
578 self.manifest_ctx
579 .manifest()
580 .await
581 .files
582 .values()
583 .map(|meta| {
584 let region_id = self.region_id;
585 let origin_region_id = meta.region_id;
586 let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
587 {
588 let index_file_path =
589 index_file_path(table_dir, meta.index_file_id(), path_type);
590 (
591 Some(meta.index_file_id().file_id().to_string()),
592 Some(index_file_path),
593 Some(meta.index_file_size),
594 )
595 } else {
596 (None, None, None)
597 };
598 let visible = visible_ssts.contains(&meta.file_id);
599 ManifestSstEntry {
600 table_dir: table_dir.to_string(),
601 region_id,
602 table_id: region_id.table_id(),
603 region_number: region_id.region_number(),
604 region_group: region_id.region_group(),
605 region_sequence: region_id.region_sequence(),
606 file_id: meta.file_id.to_string(),
607 index_file_id,
608 level: meta.level,
609 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
610 file_size: meta.file_size,
611 index_file_path,
612 index_file_size,
613 num_rows: meta.num_rows,
614 num_row_groups: meta.num_row_groups,
615 num_series: Some(meta.num_series),
616 min_ts: meta.time_range.0,
617 max_ts: meta.time_range.1,
618 sequence: meta.sequence.map(|s| s.get()),
619 origin_region_id,
620 node_id: None,
621 visible,
622 }
623 })
624 .collect()
625 }
626
627 pub(crate) async fn exit_staging_on_success(
629 &self,
630 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
631 ) -> Result<()> {
632 let current_state = self.manifest_ctx.current_state();
633 ensure!(
634 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
635 RegionStateSnafu {
636 region_id: self.region_id,
637 state: current_state,
638 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
639 }
640 );
641
642 let merged_actions = match manager.merge_staged_actions(current_state).await? {
644 Some(actions) => actions,
645 None => {
646 info!(
647 "No staged manifests to merge for region {}, exiting staging mode without changes",
648 self.region_id
649 );
650 self.exit_staging()?;
652 return Ok(());
653 }
654 };
655
656 let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
659 let new_version = manager.update(merged_actions.clone(), target_state).await?;
660
661 info!(
662 "Successfully submitted merged staged manifests for region {}, new version: {}",
663 self.region_id, new_version
664 );
665
666 let merged_edit = merged_actions.into_region_edit();
668 self.version_control
669 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
670
671 manager.store().clear_staging_manifests().await?;
673 self.exit_staging()?;
674
675 Ok(())
676 }
677}
678
679#[derive(Debug)]
681pub(crate) struct ManifestContext {
682 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
684 state: AtomicCell<RegionRoleState>,
687}
688
689impl ManifestContext {
690 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
691 ManifestContext {
692 manifest_manager: tokio::sync::RwLock::new(manager),
693 state: AtomicCell::new(state),
694 }
695 }
696
697 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
698 self.manifest_manager
699 .read()
700 .await
701 .manifest()
702 .manifest_version
703 }
704
705 pub(crate) async fn has_update(&self) -> Result<bool> {
706 self.manifest_manager.read().await.has_update().await
707 }
708
709 pub(crate) fn current_state(&self) -> RegionRoleState {
711 self.state.load()
712 }
713
714 pub(crate) async fn install_manifest_to(
720 &self,
721 version: ManifestVersion,
722 ) -> Result<Arc<RegionManifest>> {
723 let mut manager = self.manifest_manager.write().await;
724 manager.install_manifest_to(version).await?;
725
726 Ok(manager.manifest())
727 }
728
729 pub(crate) async fn update_manifest(
731 &self,
732 expect_state: RegionLeaderState,
733 action_list: RegionMetaActionList,
734 ) -> Result<ManifestVersion> {
735 let mut manager = self.manifest_manager.write().await;
737 let manifest = manager.manifest();
739 let current_state = self.state.load();
742
743 if expect_state != RegionLeaderState::Downgrading {
748 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
749 info!(
750 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
751 manifest.metadata.region_id, expect_state
752 );
753 }
754 ensure!(
755 current_state == RegionRoleState::Leader(expect_state)
756 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
757 UpdateManifestSnafu {
758 region_id: manifest.metadata.region_id,
759 state: current_state,
760 }
761 );
762 } else {
763 ensure!(
764 current_state == RegionRoleState::Leader(expect_state),
765 RegionStateSnafu {
766 region_id: manifest.metadata.region_id,
767 state: current_state,
768 expect: RegionRoleState::Leader(expect_state),
769 }
770 );
771 }
772
773 for action in &action_list.actions {
774 let RegionMetaAction::Edit(edit) = &action else {
776 continue;
777 };
778
779 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
781 continue;
782 };
783
784 if let Some(flushed_entry_id) = edit.flushed_entry_id {
786 ensure!(
787 truncated_entry_id < flushed_entry_id,
788 RegionTruncatedSnafu {
789 region_id: manifest.metadata.region_id,
790 }
791 );
792 }
793
794 if !edit.files_to_remove.is_empty() {
796 for file in &edit.files_to_remove {
798 ensure!(
799 manifest.files.contains_key(&file.file_id),
800 RegionTruncatedSnafu {
801 region_id: manifest.metadata.region_id,
802 }
803 );
804 }
805 }
806 }
807
808 let version = manager.update(action_list, current_state).await.inspect_err(
810 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
811 )?;
812
813 if self.state.load() == RegionRoleState::Follower {
814 warn!(
815 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
816 manifest.metadata.region_id
817 );
818 }
819
820 Ok(version)
821 }
822
823 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
845 match next_role {
846 RegionRole::Follower => {
847 match self.state.fetch_update(|state| {
848 if !matches!(state, RegionRoleState::Follower) {
849 Some(RegionRoleState::Follower)
850 } else {
851 None
852 }
853 }) {
854 Ok(state) => info!(
855 "Convert region {} to follower, previous role state: {:?}",
856 region_id, state
857 ),
858 Err(state) => {
859 if state != RegionRoleState::Follower {
860 warn!(
861 "Failed to convert region {} to follower, current role state: {:?}",
862 region_id, state
863 )
864 }
865 }
866 }
867 }
868 RegionRole::Leader => {
869 match self.state.fetch_update(|state| {
870 if matches!(
871 state,
872 RegionRoleState::Follower
873 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
874 ) {
875 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
876 } else {
877 None
878 }
879 }) {
880 Ok(state) => info!(
881 "Convert region {} to leader, previous role state: {:?}",
882 region_id, state
883 ),
884 Err(state) => {
885 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
886 warn!(
887 "Failed to convert region {} to leader, current role state: {:?}",
888 region_id, state
889 )
890 }
891 }
892 }
893 }
894 RegionRole::DowngradingLeader => {
895 match self.state.compare_exchange(
896 RegionRoleState::Leader(RegionLeaderState::Writable),
897 RegionRoleState::Leader(RegionLeaderState::Downgrading),
898 ) {
899 Ok(state) => info!(
900 "Convert region {} to downgrading region, previous role state: {:?}",
901 region_id, state
902 ),
903 Err(state) => {
904 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
905 warn!(
906 "Failed to convert region {} to downgrading leader, current role state: {:?}",
907 region_id, state
908 )
909 }
910 }
911 }
912 }
913 }
914 }
915
916 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
917 self.manifest_manager.read().await.manifest()
918 }
919}
920
921pub(crate) type ManifestContextRef = Arc<ManifestContext>;
922
923#[derive(Debug, Default)]
925pub(crate) struct RegionMap {
926 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
927}
928
929impl RegionMap {
930 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
932 let regions = self.regions.read().unwrap();
933 regions.contains_key(®ion_id)
934 }
935
936 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
938 let mut regions = self.regions.write().unwrap();
939 regions.insert(region.region_id, region);
940 }
941
942 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
944 let regions = self.regions.read().unwrap();
945 regions.get(®ion_id).cloned()
946 }
947
948 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
952 let region = self
953 .get_region(region_id)
954 .context(RegionNotFoundSnafu { region_id })?;
955 ensure!(
956 region.is_writable(),
957 RegionStateSnafu {
958 region_id,
959 state: region.state(),
960 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
961 }
962 );
963 Ok(region)
964 }
965
966 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
970 let region = self
971 .get_region(region_id)
972 .context(RegionNotFoundSnafu { region_id })?;
973 ensure!(
974 region.is_follower(),
975 RegionStateSnafu {
976 region_id,
977 state: region.state(),
978 expect: RegionRoleState::Follower,
979 }
980 );
981
982 Ok(region)
983 }
984
985 pub(crate) fn get_region_or<F: OnFailure>(
989 &self,
990 region_id: RegionId,
991 cb: &mut F,
992 ) -> Option<MitoRegionRef> {
993 match self
994 .get_region(region_id)
995 .context(RegionNotFoundSnafu { region_id })
996 {
997 Ok(region) => Some(region),
998 Err(e) => {
999 cb.on_failure(e);
1000 None
1001 }
1002 }
1003 }
1004
1005 pub(crate) fn writable_region_or<F: OnFailure>(
1009 &self,
1010 region_id: RegionId,
1011 cb: &mut F,
1012 ) -> Option<MitoRegionRef> {
1013 match self.writable_region(region_id) {
1014 Ok(region) => Some(region),
1015 Err(e) => {
1016 cb.on_failure(e);
1017 None
1018 }
1019 }
1020 }
1021
1022 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1026 let region = self.writable_region(region_id)?;
1027 if region.is_staging() {
1028 return Err(crate::error::RegionStateSnafu {
1029 region_id,
1030 state: region.state(),
1031 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1032 }
1033 .build());
1034 }
1035 Ok(region)
1036 }
1037
1038 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1042 let region = self
1043 .get_region(region_id)
1044 .context(RegionNotFoundSnafu { region_id })?;
1045 ensure!(
1046 region.is_flushable(),
1047 FlushableRegionStateSnafu {
1048 region_id,
1049 state: region.state(),
1050 }
1051 );
1052 Ok(region)
1053 }
1054
1055 pub(crate) fn flushable_region_or<F: OnFailure>(
1059 &self,
1060 region_id: RegionId,
1061 cb: &mut F,
1062 ) -> Option<MitoRegionRef> {
1063 match self.flushable_region(region_id) {
1064 Ok(region) => Some(region),
1065 Err(e) => {
1066 cb.on_failure(e);
1067 None
1068 }
1069 }
1070 }
1071
1072 pub(crate) fn remove_region(&self, region_id: RegionId) {
1074 let mut regions = self.regions.write().unwrap();
1075 regions.remove(®ion_id);
1076 }
1077
1078 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1080 let regions = self.regions.read().unwrap();
1081 regions.values().cloned().collect()
1082 }
1083
1084 pub(crate) fn clear(&self) {
1086 self.regions.write().unwrap().clear();
1087 }
1088}
1089
1090pub(crate) type RegionMapRef = Arc<RegionMap>;
1091
1092#[derive(Debug, Default)]
1094pub(crate) struct OpeningRegions {
1095 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1096}
1097
1098impl OpeningRegions {
1099 pub(crate) fn wait_for_opening_region(
1101 &self,
1102 region_id: RegionId,
1103 sender: OptionOutputTx,
1104 ) -> Option<OptionOutputTx> {
1105 let mut regions = self.regions.write().unwrap();
1106 match regions.entry(region_id) {
1107 Entry::Occupied(mut senders) => {
1108 senders.get_mut().push(sender);
1109 None
1110 }
1111 Entry::Vacant(_) => Some(sender),
1112 }
1113 }
1114
1115 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1117 let regions = self.regions.read().unwrap();
1118 regions.contains_key(®ion_id)
1119 }
1120
1121 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1123 let mut regions = self.regions.write().unwrap();
1124 regions.insert(region, vec![sender]);
1125 }
1126
1127 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1129 let mut regions = self.regions.write().unwrap();
1130 regions.remove(®ion_id).unwrap_or_default()
1131 }
1132
1133 #[cfg(test)]
1134 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1135 let regions = self.regions.read().unwrap();
1136 if let Some(senders) = regions.get(®ion_id) {
1137 senders.len()
1138 } else {
1139 0
1140 }
1141 }
1142}
1143
1144pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1145
1146#[derive(Debug, Default)]
1148pub(crate) struct CatchupRegions {
1149 regions: RwLock<HashSet<RegionId>>,
1150}
1151
1152impl CatchupRegions {
1153 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1155 let regions = self.regions.read().unwrap();
1156 regions.contains(®ion_id)
1157 }
1158
1159 pub(crate) fn insert_region(&self, region_id: RegionId) {
1161 let mut regions = self.regions.write().unwrap();
1162 regions.insert(region_id);
1163 }
1164
1165 pub(crate) fn remove_region(&self, region_id: RegionId) {
1167 let mut regions = self.regions.write().unwrap();
1168 regions.remove(®ion_id);
1169 }
1170}
1171
1172pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1173
1174#[derive(Default, Debug, Clone)]
1176pub struct ManifestStats {
1177 pub(crate) total_manifest_size: Arc<AtomicU64>,
1178 pub(crate) manifest_version: Arc<AtomicU64>,
1179 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1180}
1181
1182impl ManifestStats {
1183 fn total_manifest_size(&self) -> u64 {
1184 self.total_manifest_size.load(Ordering::Relaxed)
1185 }
1186
1187 fn manifest_version(&self) -> u64 {
1188 self.manifest_version.load(Ordering::Relaxed)
1189 }
1190
1191 fn file_removed_cnt(&self) -> u64 {
1192 self.file_removed_cnt.load(Ordering::Relaxed)
1193 }
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198 use std::sync::Arc;
1199 use std::sync::atomic::AtomicU64;
1200
1201 use common_datasource::compression::CompressionType;
1202 use common_test_util::temp_dir::create_temp_dir;
1203 use crossbeam_utils::atomic::AtomicCell;
1204 use object_store::ObjectStore;
1205 use object_store::services::Fs;
1206 use store_api::logstore::provider::Provider;
1207 use store_api::region_engine::RegionRole;
1208 use store_api::region_request::PathType;
1209 use store_api::storage::RegionId;
1210
1211 use crate::access_layer::AccessLayer;
1212 use crate::manifest::action::RegionMetaActionList;
1213 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1214 use crate::region::{
1215 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1216 };
1217 use crate::sst::FormatType;
1218 use crate::sst::index::intermediate::IntermediateManager;
1219 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1220 use crate::test_util::scheduler_util::SchedulerEnv;
1221 use crate::test_util::version_util::VersionControlBuilder;
1222 use crate::time_provider::StdTimeProvider;
1223
1224 #[test]
1225 fn test_region_state_lock_free() {
1226 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1227 }
1228
1229 #[tokio::test]
1230 async fn test_set_region_state() {
1231 let env = SchedulerEnv::new().await;
1232 let builder = VersionControlBuilder::new();
1233 let version_control = Arc::new(builder.build());
1234 let manifest_ctx = env
1235 .mock_manifest_context(version_control.current().version.metadata.clone())
1236 .await;
1237
1238 let region_id = RegionId::new(1024, 0);
1239 manifest_ctx.set_role(RegionRole::Follower, region_id);
1241 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1242
1243 manifest_ctx.set_role(RegionRole::Leader, region_id);
1245 assert_eq!(
1246 manifest_ctx.state.load(),
1247 RegionRoleState::Leader(RegionLeaderState::Writable)
1248 );
1249
1250 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1252 assert_eq!(
1253 manifest_ctx.state.load(),
1254 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1255 );
1256
1257 manifest_ctx.set_role(RegionRole::Follower, region_id);
1259 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1260
1261 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1263 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1264
1265 manifest_ctx.set_role(RegionRole::Leader, region_id);
1267 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1268 assert_eq!(
1269 manifest_ctx.state.load(),
1270 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1271 );
1272
1273 manifest_ctx.set_role(RegionRole::Leader, region_id);
1275 assert_eq!(
1276 manifest_ctx.state.load(),
1277 RegionRoleState::Leader(RegionLeaderState::Writable)
1278 );
1279 }
1280
1281 #[tokio::test]
1282 async fn test_staging_state_validation() {
1283 let env = SchedulerEnv::new().await;
1284 let builder = VersionControlBuilder::new();
1285 let version_control = Arc::new(builder.build());
1286
1287 let staging_ctx = {
1289 let manager = RegionManifestManager::new(
1290 version_control.current().version.metadata.clone(),
1291 0,
1292 RegionManifestOptions {
1293 manifest_dir: "".to_string(),
1294 object_store: env.access_layer.object_store().clone(),
1295 compress_type: CompressionType::Uncompressed,
1296 checkpoint_distance: 10,
1297 remove_file_options: Default::default(),
1298 },
1299 FormatType::PrimaryKey,
1300 &Default::default(),
1301 )
1302 .await
1303 .unwrap();
1304 Arc::new(ManifestContext::new(
1305 manager,
1306 RegionRoleState::Leader(RegionLeaderState::Staging),
1307 ))
1308 };
1309
1310 assert_eq!(
1312 staging_ctx.current_state(),
1313 RegionRoleState::Leader(RegionLeaderState::Staging)
1314 );
1315
1316 let writable_ctx = env
1318 .mock_manifest_context(version_control.current().version.metadata.clone())
1319 .await;
1320
1321 assert_eq!(
1322 writable_ctx.current_state(),
1323 RegionRoleState::Leader(RegionLeaderState::Writable)
1324 );
1325 }
1326
1327 #[tokio::test]
1328 async fn test_staging_state_transitions() {
1329 let builder = VersionControlBuilder::new();
1330 let version_control = Arc::new(builder.build());
1331 let metadata = version_control.current().version.metadata.clone();
1332
1333 let temp_dir = create_temp_dir("");
1335 let path_str = temp_dir.path().display().to_string();
1336 let fs_builder = Fs::default().root(&path_str);
1337 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1338
1339 let index_aux_path = temp_dir.path().join("index_aux");
1340 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1341 .await
1342 .unwrap();
1343 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1344 .await
1345 .unwrap();
1346
1347 let access_layer = Arc::new(AccessLayer::new(
1348 "",
1349 PathType::Bare,
1350 object_store,
1351 puffin_mgr,
1352 intm_mgr,
1353 ));
1354
1355 let manager = RegionManifestManager::new(
1356 metadata.clone(),
1357 0,
1358 RegionManifestOptions {
1359 manifest_dir: "".to_string(),
1360 object_store: access_layer.object_store().clone(),
1361 compress_type: CompressionType::Uncompressed,
1362 checkpoint_distance: 10,
1363 remove_file_options: Default::default(),
1364 },
1365 FormatType::PrimaryKey,
1366 &Default::default(),
1367 )
1368 .await
1369 .unwrap();
1370
1371 let manifest_ctx = Arc::new(ManifestContext::new(
1372 manager,
1373 RegionRoleState::Leader(RegionLeaderState::Writable),
1374 ));
1375
1376 let region = MitoRegion {
1377 region_id: metadata.region_id,
1378 version_control,
1379 access_layer,
1380 manifest_ctx: manifest_ctx.clone(),
1381 file_purger: crate::test_util::new_noop_file_purger(),
1382 provider: Provider::noop_provider(),
1383 last_flush_millis: Default::default(),
1384 last_compaction_millis: Default::default(),
1385 time_provider: Arc::new(StdTimeProvider),
1386 topic_latest_entry_id: Default::default(),
1387 written_bytes: Arc::new(AtomicU64::new(0)),
1388 stats: ManifestStats::default(),
1389 };
1390
1391 assert_eq!(
1393 region.state(),
1394 RegionRoleState::Leader(RegionLeaderState::Writable)
1395 );
1396 assert!(!region.is_staging());
1397
1398 let mut manager = manifest_ctx.manifest_manager.write().await;
1400 region.set_staging(&mut manager).await.unwrap();
1401 drop(manager);
1402 assert_eq!(
1403 region.state(),
1404 RegionRoleState::Leader(RegionLeaderState::Staging)
1405 );
1406 assert!(region.is_staging());
1407
1408 region.exit_staging().unwrap();
1410 assert_eq!(
1411 region.state(),
1412 RegionRoleState::Leader(RegionLeaderState::Writable)
1413 );
1414 assert!(!region.is_staging());
1415
1416 {
1418 let manager = manifest_ctx.manifest_manager.write().await;
1420 let dummy_actions = RegionMetaActionList::new(vec![]);
1421 let dummy_bytes = dummy_actions.encode().unwrap();
1422
1423 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1425 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1426 drop(manager);
1427
1428 let manager = manifest_ctx.manifest_manager.read().await;
1430 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1431 assert_eq!(
1432 dirty_manifests.len(),
1433 2,
1434 "Should have 2 dirty staging files"
1435 );
1436 drop(manager);
1437
1438 let mut manager = manifest_ctx.manifest_manager.write().await;
1440 region.set_staging(&mut manager).await.unwrap();
1441 drop(manager);
1442
1443 let manager = manifest_ctx.manifest_manager.read().await;
1445 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1446 assert_eq!(
1447 cleaned_manifests.len(),
1448 0,
1449 "Dirty staging files should be cleaned up"
1450 );
1451 drop(manager);
1452
1453 region.exit_staging().unwrap();
1455 }
1456
1457 let mut manager = manifest_ctx.manifest_manager.write().await;
1459 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1461 let mut manager = manifest_ctx.manifest_manager.write().await;
1462 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1464 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1467}