1pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{OptionExt, ensure};
29use store_api::ManifestVersion;
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::logstore::provider::Provider;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{
34 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
35};
36use store_api::sst_entry::ManifestSstEntry;
37use store_api::storage::{RegionId, SequenceNumber};
38use tokio::sync::RwLockWriteGuard;
39
40use crate::access_layer::AccessLayerRef;
41use crate::error::{
42 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
43 UpdateManifestSnafu,
44};
45use crate::manifest::action::{
46 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
47};
48use crate::manifest::manager::RegionManifestManager;
49use crate::memtable::MemtableBuilderRef;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::FormatType;
53use crate::sst::file_purger::FilePurgerRef;
54use crate::sst::location::{index_file_path, sst_file_path};
55use crate::time_provider::TimeProviderRef;
56
57const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
59
60#[derive(Debug)]
62pub struct RegionUsage {
63 pub region_id: RegionId,
64 pub wal_usage: u64,
65 pub sst_usage: u64,
66 pub manifest_usage: u64,
67}
68
69impl RegionUsage {
70 pub fn disk_usage(&self) -> u64 {
71 self.wal_usage + self.sst_usage + self.manifest_usage
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RegionLeaderState {
77 Writable,
79 Staging,
81 Altering,
83 Dropping,
85 Truncating,
87 Editing,
89 Downgrading,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum RegionRoleState {
95 Leader(RegionLeaderState),
96 Follower,
97}
98
99#[derive(Debug)]
105pub struct MitoRegion {
106 pub(crate) region_id: RegionId,
111
112 pub(crate) version_control: VersionControlRef,
116 pub(crate) access_layer: AccessLayerRef,
118 pub(crate) manifest_ctx: ManifestContextRef,
120 pub(crate) file_purger: FilePurgerRef,
122 pub(crate) provider: Provider,
124 last_flush_millis: AtomicI64,
126 last_compaction_millis: AtomicI64,
128 time_provider: TimeProviderRef,
130 pub(crate) topic_latest_entry_id: AtomicU64,
140 pub(crate) written_bytes: Arc<AtomicU64>,
142 pub(crate) memtable_builder: MemtableBuilderRef,
144 pub(crate) sst_format: FormatType,
146 stats: ManifestStats,
148}
149
150pub type MitoRegionRef = Arc<MitoRegion>;
151
152impl MitoRegion {
153 pub(crate) async fn stop(&self) {
155 self.manifest_ctx
156 .manifest_manager
157 .write()
158 .await
159 .stop()
160 .await;
161
162 info!(
163 "Stopped region manifest manager, region_id: {}",
164 self.region_id
165 );
166 }
167
168 pub(crate) fn metadata(&self) -> RegionMetadataRef {
170 let version_data = self.version_control.current();
171 version_data.version.metadata.clone()
172 }
173
174 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
176 let version_data = self.version_control.current();
177 version_data.version.metadata.primary_key_encoding
178 }
179
180 pub(crate) fn version(&self) -> VersionRef {
182 let version_data = self.version_control.current();
183 version_data.version
184 }
185
186 pub(crate) fn last_flush_millis(&self) -> i64 {
188 self.last_flush_millis.load(Ordering::Relaxed)
189 }
190
191 pub(crate) fn update_flush_millis(&self) {
193 let now = self.time_provider.current_time_millis();
194 self.last_flush_millis.store(now, Ordering::Relaxed);
195 }
196
197 pub(crate) fn last_compaction_millis(&self) -> i64 {
199 self.last_compaction_millis.load(Ordering::Relaxed)
200 }
201
202 pub(crate) fn sst_format(&self) -> FormatType {
204 self.sst_format
205 }
206
207 pub(crate) fn update_compaction_millis(&self) {
209 let now = self.time_provider.current_time_millis();
210 self.last_compaction_millis.store(now, Ordering::Relaxed);
211 }
212
213 pub(crate) fn table_dir(&self) -> &str {
215 self.access_layer.table_dir()
216 }
217
218 pub(crate) fn is_writable(&self) -> bool {
220 matches!(
221 self.manifest_ctx.state.load(),
222 RegionRoleState::Leader(RegionLeaderState::Writable)
223 | RegionRoleState::Leader(RegionLeaderState::Staging)
224 )
225 }
226
227 pub(crate) fn is_flushable(&self) -> bool {
229 matches!(
230 self.manifest_ctx.state.load(),
231 RegionRoleState::Leader(RegionLeaderState::Writable)
232 | RegionRoleState::Leader(RegionLeaderState::Staging)
233 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
234 )
235 }
236
237 pub(crate) fn is_downgrading(&self) -> bool {
239 matches!(
240 self.manifest_ctx.state.load(),
241 RegionRoleState::Leader(RegionLeaderState::Downgrading)
242 )
243 }
244
245 #[allow(dead_code)]
247 pub(crate) fn is_staging(&self) -> bool {
248 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
249 }
250
251 pub fn region_id(&self) -> RegionId {
252 self.region_id
253 }
254
255 pub fn find_committed_sequence(&self) -> SequenceNumber {
256 self.version_control.committed_sequence()
257 }
258
259 pub fn is_follower(&self) -> bool {
261 self.manifest_ctx.state.load() == RegionRoleState::Follower
262 }
263
264 pub(crate) fn state(&self) -> RegionRoleState {
266 self.manifest_ctx.state.load()
267 }
268
269 pub(crate) fn set_role(&self, next_role: RegionRole) {
271 self.manifest_ctx.set_role(next_role, self.region_id);
272 }
273
274 pub(crate) fn set_altering(&self) -> Result<()> {
277 self.compare_exchange_state(
278 RegionLeaderState::Writable,
279 RegionRoleState::Leader(RegionLeaderState::Altering),
280 )
281 }
282
283 pub(crate) fn set_dropping(&self) -> Result<()> {
286 self.compare_exchange_state(
287 RegionLeaderState::Writable,
288 RegionRoleState::Leader(RegionLeaderState::Dropping),
289 )
290 }
291
292 pub(crate) fn set_truncating(&self) -> Result<()> {
295 self.compare_exchange_state(
296 RegionLeaderState::Writable,
297 RegionRoleState::Leader(RegionLeaderState::Truncating),
298 )
299 }
300
301 pub(crate) fn set_editing(&self) -> Result<()> {
304 self.compare_exchange_state(
305 RegionLeaderState::Writable,
306 RegionRoleState::Leader(RegionLeaderState::Editing),
307 )
308 }
309
310 pub(crate) async fn set_staging(
316 &self,
317 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
318 ) -> Result<()> {
319 manager.store().clear_staging_manifests().await?;
320
321 self.compare_exchange_state(
322 RegionLeaderState::Writable,
323 RegionRoleState::Leader(RegionLeaderState::Staging),
324 )
325 }
326
327 fn exit_staging(&self) -> Result<()> {
332 self.compare_exchange_state(
333 RegionLeaderState::Staging,
334 RegionRoleState::Leader(RegionLeaderState::Writable),
335 )
336 }
337
338 pub(crate) async fn set_role_state_gracefully(
340 &self,
341 state: SettableRegionRoleState,
342 ) -> Result<()> {
343 let mut manager = self.manifest_ctx.manifest_manager.write().await;
344 let current_state = self.state();
345
346 match state {
347 SettableRegionRoleState::Leader => {
348 match current_state {
351 RegionRoleState::Leader(RegionLeaderState::Staging) => {
352 info!("Exiting staging mode for region {}", self.region_id);
353 self.exit_staging_on_success(&mut manager).await?;
355 }
356 RegionRoleState::Leader(RegionLeaderState::Writable) => {
357 info!("Region {} already in normal leader mode", self.region_id);
359 }
360 _ => {
361 return Err(RegionStateSnafu {
363 region_id: self.region_id,
364 state: current_state,
365 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
366 }
367 .build());
368 }
369 }
370 }
371
372 SettableRegionRoleState::StagingLeader => {
373 match current_state {
376 RegionRoleState::Leader(RegionLeaderState::Writable) => {
377 info!("Entering staging mode for region {}", self.region_id);
378 self.set_staging(&mut manager).await?;
379 }
380 RegionRoleState::Leader(RegionLeaderState::Staging) => {
381 info!("Region {} already in staging mode", self.region_id);
383 }
384 _ => {
385 return Err(RegionStateSnafu {
386 region_id: self.region_id,
387 state: current_state,
388 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
389 }
390 .build());
391 }
392 }
393 }
394
395 SettableRegionRoleState::Follower => {
396 match current_state {
398 RegionRoleState::Leader(RegionLeaderState::Staging) => {
399 info!(
400 "Exiting staging and demoting region {} to follower",
401 self.region_id
402 );
403 self.exit_staging()?;
404 self.set_role(RegionRole::Follower);
405 }
406 RegionRoleState::Leader(_) => {
407 info!("Demoting region {} from leader to follower", self.region_id);
408 self.set_role(RegionRole::Follower);
409 }
410 RegionRoleState::Follower => {
411 info!("Region {} already in follower mode", self.region_id);
413 }
414 }
415 }
416
417 SettableRegionRoleState::DowngradingLeader => {
418 match current_state {
420 RegionRoleState::Leader(RegionLeaderState::Staging) => {
421 info!(
422 "Exiting staging and entering downgrade for region {}",
423 self.region_id
424 );
425 self.exit_staging()?;
426 self.set_role(RegionRole::DowngradingLeader);
427 }
428 RegionRoleState::Leader(RegionLeaderState::Writable) => {
429 info!("Starting downgrade for region {}", self.region_id);
430 self.set_role(RegionRole::DowngradingLeader);
431 }
432 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
433 info!("Region {} already in downgrading mode", self.region_id);
435 }
436 _ => {
437 warn!(
438 "Cannot start downgrade for region {} from state {:?}",
439 self.region_id, current_state
440 );
441 }
442 }
443 }
444 }
445
446 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
448 let manifest_meta = &manager.manifest().metadata;
450 let current_meta = &self.version().metadata;
451 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
452 let action = RegionMetaAction::Change(RegionChange {
453 metadata: current_meta.clone(),
454 sst_format: self.sst_format(),
455 });
456 let result = manager
457 .update(
458 RegionMetaActionList::with_action(action),
459 RegionRoleState::Leader(RegionLeaderState::Writable),
460 )
461 .await;
462
463 match result {
464 Ok(version) => {
465 info!(
466 "Successfully persisted backfilled metadata for region {}, version: {}",
467 self.region_id, version
468 );
469 }
470 Err(e) => {
471 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
472 }
473 }
474 }
475 }
476
477 drop(manager);
478
479 Ok(())
480 }
481
482 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
485 if let Err(e) = self
486 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
487 {
488 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
489 }
490 }
491
492 pub(crate) fn region_statistic(&self) -> RegionStatistic {
494 let version = self.version();
495 let memtables = &version.memtables;
496 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
497
498 let sst_usage = version.ssts.sst_usage();
499 let index_usage = version.ssts.index_usage();
500 let flushed_entry_id = version.flushed_entry_id;
501
502 let wal_usage = self.estimated_wal_usage(memtable_usage);
503 let manifest_usage = self.stats.total_manifest_size();
504 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
505 let num_files = version.ssts.num_files();
506 let manifest_version = self.stats.manifest_version();
507
508 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
509 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
510
511 RegionStatistic {
512 num_rows,
513 memtable_size: memtable_usage,
514 wal_size: wal_usage,
515 manifest_size: manifest_usage,
516 sst_size: sst_usage,
517 sst_num: num_files,
518 index_size: index_usage,
519 manifest: RegionManifestInfo::Mito {
520 manifest_version,
521 flushed_entry_id,
522 },
523 data_topic_latest_entry_id: topic_latest_entry_id,
524 metadata_topic_latest_entry_id: topic_latest_entry_id,
525 written_bytes,
526 }
527 }
528
529 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
532 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
533 }
534
535 fn compare_exchange_state(
538 &self,
539 expect: RegionLeaderState,
540 state: RegionRoleState,
541 ) -> Result<()> {
542 self.manifest_ctx
543 .state
544 .compare_exchange(RegionRoleState::Leader(expect), state)
545 .map_err(|actual| {
546 RegionStateSnafu {
547 region_id: self.region_id,
548 state: actual,
549 expect: RegionRoleState::Leader(expect),
550 }
551 .build()
552 })?;
553 Ok(())
554 }
555
556 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
558 let table_dir = self.table_dir();
559 let path_type = self.access_layer.path_type();
560
561 let visible_ssts = self
562 .version()
563 .ssts
564 .levels()
565 .iter()
566 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
567 .collect::<HashSet<_>>();
568
569 self.manifest_ctx
570 .manifest()
571 .await
572 .files
573 .values()
574 .map(|meta| {
575 let region_id = self.region_id;
576 let origin_region_id = meta.region_id;
577 let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
578 let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
579 (Some(index_file_path), Some(meta.index_file_size))
580 } else {
581 (None, None)
582 };
583 let visible = visible_ssts.contains(&meta.file_id);
584 ManifestSstEntry {
585 table_dir: table_dir.to_string(),
586 region_id,
587 table_id: region_id.table_id(),
588 region_number: region_id.region_number(),
589 region_group: region_id.region_group(),
590 region_sequence: region_id.region_sequence(),
591 file_id: meta.file_id.to_string(),
592 level: meta.level,
593 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
594 file_size: meta.file_size,
595 index_file_path,
596 index_file_size,
597 num_rows: meta.num_rows,
598 num_row_groups: meta.num_row_groups,
599 min_ts: meta.time_range.0,
600 max_ts: meta.time_range.1,
601 sequence: meta.sequence.map(|s| s.get()),
602 origin_region_id,
603 node_id: None,
604 visible,
605 }
606 })
607 .collect()
608 }
609
610 pub(crate) async fn exit_staging_on_success(
612 &self,
613 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
614 ) -> Result<()> {
615 let current_state = self.manifest_ctx.current_state();
616 ensure!(
617 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
618 RegionStateSnafu {
619 region_id: self.region_id,
620 state: current_state,
621 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
622 }
623 );
624
625 let merged_actions = match manager.merge_staged_actions(current_state).await? {
627 Some(actions) => actions,
628 None => {
629 info!(
630 "No staged manifests to merge for region {}, exiting staging mode without changes",
631 self.region_id
632 );
633 self.exit_staging()?;
635 return Ok(());
636 }
637 };
638
639 let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
642 let new_version = manager.update(merged_actions.clone(), target_state).await?;
643
644 info!(
645 "Successfully submitted merged staged manifests for region {}, new version: {}",
646 self.region_id, new_version
647 );
648
649 let merged_edit = merged_actions.into_region_edit();
651 self.version_control
652 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
653
654 manager.store().clear_staging_manifests().await?;
656 self.exit_staging()?;
657
658 Ok(())
659 }
660}
661
662#[derive(Debug)]
664pub(crate) struct ManifestContext {
665 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
667 state: AtomicCell<RegionRoleState>,
670}
671
672impl ManifestContext {
673 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
674 ManifestContext {
675 manifest_manager: tokio::sync::RwLock::new(manager),
676 state: AtomicCell::new(state),
677 }
678 }
679
680 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
681 self.manifest_manager
682 .read()
683 .await
684 .manifest()
685 .manifest_version
686 }
687
688 pub(crate) async fn has_update(&self) -> Result<bool> {
689 self.manifest_manager.read().await.has_update().await
690 }
691
692 pub(crate) fn current_state(&self) -> RegionRoleState {
694 self.state.load()
695 }
696
697 pub(crate) async fn install_manifest_to(
703 &self,
704 version: ManifestVersion,
705 ) -> Result<Arc<RegionManifest>> {
706 let mut manager = self.manifest_manager.write().await;
707 manager.install_manifest_to(version).await?;
708
709 Ok(manager.manifest())
710 }
711
712 pub(crate) async fn update_manifest(
714 &self,
715 expect_state: RegionLeaderState,
716 action_list: RegionMetaActionList,
717 ) -> Result<ManifestVersion> {
718 let mut manager = self.manifest_manager.write().await;
720 let manifest = manager.manifest();
722 let current_state = self.state.load();
725
726 if expect_state != RegionLeaderState::Downgrading {
731 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
732 info!(
733 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
734 manifest.metadata.region_id, expect_state
735 );
736 }
737 ensure!(
738 current_state == RegionRoleState::Leader(expect_state)
739 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
740 UpdateManifestSnafu {
741 region_id: manifest.metadata.region_id,
742 state: current_state,
743 }
744 );
745 } else {
746 ensure!(
747 current_state == RegionRoleState::Leader(expect_state),
748 RegionStateSnafu {
749 region_id: manifest.metadata.region_id,
750 state: current_state,
751 expect: RegionRoleState::Leader(expect_state),
752 }
753 );
754 }
755
756 for action in &action_list.actions {
757 let RegionMetaAction::Edit(edit) = &action else {
759 continue;
760 };
761
762 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
764 continue;
765 };
766
767 if let Some(flushed_entry_id) = edit.flushed_entry_id {
769 ensure!(
770 truncated_entry_id < flushed_entry_id,
771 RegionTruncatedSnafu {
772 region_id: manifest.metadata.region_id,
773 }
774 );
775 }
776
777 if !edit.files_to_remove.is_empty() {
779 for file in &edit.files_to_remove {
781 ensure!(
782 manifest.files.contains_key(&file.file_id),
783 RegionTruncatedSnafu {
784 region_id: manifest.metadata.region_id,
785 }
786 );
787 }
788 }
789 }
790
791 let version = manager.update(action_list, current_state).await.inspect_err(
793 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
794 )?;
795
796 if self.state.load() == RegionRoleState::Follower {
797 warn!(
798 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
799 manifest.metadata.region_id
800 );
801 }
802
803 Ok(version)
804 }
805
806 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
828 match next_role {
829 RegionRole::Follower => {
830 match self.state.fetch_update(|state| {
831 if !matches!(state, RegionRoleState::Follower) {
832 Some(RegionRoleState::Follower)
833 } else {
834 None
835 }
836 }) {
837 Ok(state) => info!(
838 "Convert region {} to follower, previous role state: {:?}",
839 region_id, state
840 ),
841 Err(state) => {
842 if state != RegionRoleState::Follower {
843 warn!(
844 "Failed to convert region {} to follower, current role state: {:?}",
845 region_id, state
846 )
847 }
848 }
849 }
850 }
851 RegionRole::Leader => {
852 match self.state.fetch_update(|state| {
853 if matches!(
854 state,
855 RegionRoleState::Follower
856 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
857 ) {
858 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
859 } else {
860 None
861 }
862 }) {
863 Ok(state) => info!(
864 "Convert region {} to leader, previous role state: {:?}",
865 region_id, state
866 ),
867 Err(state) => {
868 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
869 warn!(
870 "Failed to convert region {} to leader, current role state: {:?}",
871 region_id, state
872 )
873 }
874 }
875 }
876 }
877 RegionRole::DowngradingLeader => {
878 match self.state.compare_exchange(
879 RegionRoleState::Leader(RegionLeaderState::Writable),
880 RegionRoleState::Leader(RegionLeaderState::Downgrading),
881 ) {
882 Ok(state) => info!(
883 "Convert region {} to downgrading region, previous role state: {:?}",
884 region_id, state
885 ),
886 Err(state) => {
887 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
888 warn!(
889 "Failed to convert region {} to downgrading leader, current role state: {:?}",
890 region_id, state
891 )
892 }
893 }
894 }
895 }
896 }
897 }
898
899 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
900 self.manifest_manager.read().await.manifest()
901 }
902}
903
904pub(crate) type ManifestContextRef = Arc<ManifestContext>;
905
906#[derive(Debug, Default)]
908pub(crate) struct RegionMap {
909 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
910}
911
912impl RegionMap {
913 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
915 let regions = self.regions.read().unwrap();
916 regions.contains_key(®ion_id)
917 }
918
919 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
921 let mut regions = self.regions.write().unwrap();
922 regions.insert(region.region_id, region);
923 }
924
925 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
927 let regions = self.regions.read().unwrap();
928 regions.get(®ion_id).cloned()
929 }
930
931 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
935 let region = self
936 .get_region(region_id)
937 .context(RegionNotFoundSnafu { region_id })?;
938 ensure!(
939 region.is_writable(),
940 RegionStateSnafu {
941 region_id,
942 state: region.state(),
943 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
944 }
945 );
946 Ok(region)
947 }
948
949 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
953 let region = self
954 .get_region(region_id)
955 .context(RegionNotFoundSnafu { region_id })?;
956 ensure!(
957 region.is_follower(),
958 RegionStateSnafu {
959 region_id,
960 state: region.state(),
961 expect: RegionRoleState::Follower,
962 }
963 );
964
965 Ok(region)
966 }
967
968 pub(crate) fn get_region_or<F: OnFailure>(
972 &self,
973 region_id: RegionId,
974 cb: &mut F,
975 ) -> Option<MitoRegionRef> {
976 match self
977 .get_region(region_id)
978 .context(RegionNotFoundSnafu { region_id })
979 {
980 Ok(region) => Some(region),
981 Err(e) => {
982 cb.on_failure(e);
983 None
984 }
985 }
986 }
987
988 pub(crate) fn writable_region_or<F: OnFailure>(
992 &self,
993 region_id: RegionId,
994 cb: &mut F,
995 ) -> Option<MitoRegionRef> {
996 match self.writable_region(region_id) {
997 Ok(region) => Some(region),
998 Err(e) => {
999 cb.on_failure(e);
1000 None
1001 }
1002 }
1003 }
1004
1005 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1009 let region = self.writable_region(region_id)?;
1010 if region.is_staging() {
1011 return Err(crate::error::RegionStateSnafu {
1012 region_id,
1013 state: region.state(),
1014 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1015 }
1016 .build());
1017 }
1018 Ok(region)
1019 }
1020
1021 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1025 let region = self
1026 .get_region(region_id)
1027 .context(RegionNotFoundSnafu { region_id })?;
1028 ensure!(
1029 region.is_flushable(),
1030 FlushableRegionStateSnafu {
1031 region_id,
1032 state: region.state(),
1033 }
1034 );
1035 Ok(region)
1036 }
1037
1038 pub(crate) fn flushable_region_or<F: OnFailure>(
1042 &self,
1043 region_id: RegionId,
1044 cb: &mut F,
1045 ) -> Option<MitoRegionRef> {
1046 match self.flushable_region(region_id) {
1047 Ok(region) => Some(region),
1048 Err(e) => {
1049 cb.on_failure(e);
1050 None
1051 }
1052 }
1053 }
1054
1055 pub(crate) fn remove_region(&self, region_id: RegionId) {
1057 let mut regions = self.regions.write().unwrap();
1058 regions.remove(®ion_id);
1059 }
1060
1061 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1063 let regions = self.regions.read().unwrap();
1064 regions.values().cloned().collect()
1065 }
1066
1067 pub(crate) fn clear(&self) {
1069 self.regions.write().unwrap().clear();
1070 }
1071}
1072
1073pub(crate) type RegionMapRef = Arc<RegionMap>;
1074
1075#[derive(Debug, Default)]
1077pub(crate) struct OpeningRegions {
1078 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1079}
1080
1081impl OpeningRegions {
1082 pub(crate) fn wait_for_opening_region(
1084 &self,
1085 region_id: RegionId,
1086 sender: OptionOutputTx,
1087 ) -> Option<OptionOutputTx> {
1088 let mut regions = self.regions.write().unwrap();
1089 match regions.entry(region_id) {
1090 Entry::Occupied(mut senders) => {
1091 senders.get_mut().push(sender);
1092 None
1093 }
1094 Entry::Vacant(_) => Some(sender),
1095 }
1096 }
1097
1098 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1100 let regions = self.regions.read().unwrap();
1101 regions.contains_key(®ion_id)
1102 }
1103
1104 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1106 let mut regions = self.regions.write().unwrap();
1107 regions.insert(region, vec![sender]);
1108 }
1109
1110 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1112 let mut regions = self.regions.write().unwrap();
1113 regions.remove(®ion_id).unwrap_or_default()
1114 }
1115
1116 #[cfg(test)]
1117 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1118 let regions = self.regions.read().unwrap();
1119 if let Some(senders) = regions.get(®ion_id) {
1120 senders.len()
1121 } else {
1122 0
1123 }
1124 }
1125}
1126
1127pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1128
1129#[derive(Default, Debug, Clone)]
1131pub(crate) struct ManifestStats {
1132 total_manifest_size: Arc<AtomicU64>,
1133 manifest_version: Arc<AtomicU64>,
1134}
1135
1136impl ManifestStats {
1137 fn total_manifest_size(&self) -> u64 {
1138 self.total_manifest_size.load(Ordering::Relaxed)
1139 }
1140
1141 fn manifest_version(&self) -> u64 {
1142 self.manifest_version.load(Ordering::Relaxed)
1143 }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use std::sync::Arc;
1149 use std::sync::atomic::AtomicU64;
1150
1151 use common_datasource::compression::CompressionType;
1152 use common_test_util::temp_dir::create_temp_dir;
1153 use crossbeam_utils::atomic::AtomicCell;
1154 use object_store::ObjectStore;
1155 use object_store::services::Fs;
1156 use store_api::logstore::provider::Provider;
1157 use store_api::region_engine::RegionRole;
1158 use store_api::region_request::PathType;
1159 use store_api::storage::RegionId;
1160
1161 use crate::access_layer::AccessLayer;
1162 use crate::manifest::action::RegionMetaActionList;
1163 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1164 use crate::region::{
1165 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1166 };
1167 use crate::sst::FormatType;
1168 use crate::sst::index::intermediate::IntermediateManager;
1169 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1170 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1171 use crate::test_util::scheduler_util::SchedulerEnv;
1172 use crate::test_util::version_util::VersionControlBuilder;
1173 use crate::time_provider::StdTimeProvider;
1174
1175 #[test]
1176 fn test_region_state_lock_free() {
1177 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1178 }
1179
1180 #[tokio::test]
1181 async fn test_set_region_state() {
1182 let env = SchedulerEnv::new().await;
1183 let builder = VersionControlBuilder::new();
1184 let version_control = Arc::new(builder.build());
1185 let manifest_ctx = env
1186 .mock_manifest_context(version_control.current().version.metadata.clone())
1187 .await;
1188
1189 let region_id = RegionId::new(1024, 0);
1190 manifest_ctx.set_role(RegionRole::Follower, region_id);
1192 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1193
1194 manifest_ctx.set_role(RegionRole::Leader, region_id);
1196 assert_eq!(
1197 manifest_ctx.state.load(),
1198 RegionRoleState::Leader(RegionLeaderState::Writable)
1199 );
1200
1201 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1203 assert_eq!(
1204 manifest_ctx.state.load(),
1205 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1206 );
1207
1208 manifest_ctx.set_role(RegionRole::Follower, region_id);
1210 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1211
1212 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1214 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1215
1216 manifest_ctx.set_role(RegionRole::Leader, region_id);
1218 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1219 assert_eq!(
1220 manifest_ctx.state.load(),
1221 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1222 );
1223
1224 manifest_ctx.set_role(RegionRole::Leader, region_id);
1226 assert_eq!(
1227 manifest_ctx.state.load(),
1228 RegionRoleState::Leader(RegionLeaderState::Writable)
1229 );
1230 }
1231
1232 #[tokio::test]
1233 async fn test_staging_state_validation() {
1234 let env = SchedulerEnv::new().await;
1235 let builder = VersionControlBuilder::new();
1236 let version_control = Arc::new(builder.build());
1237
1238 let staging_ctx = {
1240 let manager = RegionManifestManager::new(
1241 version_control.current().version.metadata.clone(),
1242 0,
1243 RegionManifestOptions {
1244 manifest_dir: "".to_string(),
1245 object_store: env.access_layer.object_store().clone(),
1246 compress_type: CompressionType::Uncompressed,
1247 checkpoint_distance: 10,
1248 remove_file_options: Default::default(),
1249 },
1250 Default::default(),
1251 Default::default(),
1252 FormatType::PrimaryKey,
1253 )
1254 .await
1255 .unwrap();
1256 Arc::new(ManifestContext::new(
1257 manager,
1258 RegionRoleState::Leader(RegionLeaderState::Staging),
1259 ))
1260 };
1261
1262 assert_eq!(
1264 staging_ctx.current_state(),
1265 RegionRoleState::Leader(RegionLeaderState::Staging)
1266 );
1267
1268 let writable_ctx = env
1270 .mock_manifest_context(version_control.current().version.metadata.clone())
1271 .await;
1272
1273 assert_eq!(
1274 writable_ctx.current_state(),
1275 RegionRoleState::Leader(RegionLeaderState::Writable)
1276 );
1277 }
1278
1279 #[tokio::test]
1280 async fn test_staging_state_transitions() {
1281 let builder = VersionControlBuilder::new();
1282 let version_control = Arc::new(builder.build());
1283 let metadata = version_control.current().version.metadata.clone();
1284
1285 let temp_dir = create_temp_dir("");
1287 let path_str = temp_dir.path().display().to_string();
1288 let fs_builder = Fs::default().root(&path_str);
1289 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1290
1291 let index_aux_path = temp_dir.path().join("index_aux");
1292 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1293 .await
1294 .unwrap();
1295 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1296 .await
1297 .unwrap();
1298
1299 let access_layer = Arc::new(AccessLayer::new(
1300 "",
1301 PathType::Bare,
1302 object_store,
1303 puffin_mgr,
1304 intm_mgr,
1305 ));
1306
1307 let manager = RegionManifestManager::new(
1308 metadata.clone(),
1309 0,
1310 RegionManifestOptions {
1311 manifest_dir: "".to_string(),
1312 object_store: access_layer.object_store().clone(),
1313 compress_type: CompressionType::Uncompressed,
1314 checkpoint_distance: 10,
1315 remove_file_options: Default::default(),
1316 },
1317 Default::default(),
1318 Default::default(),
1319 FormatType::PrimaryKey,
1320 )
1321 .await
1322 .unwrap();
1323
1324 let manifest_ctx = Arc::new(ManifestContext::new(
1325 manager,
1326 RegionRoleState::Leader(RegionLeaderState::Writable),
1327 ));
1328
1329 let region = MitoRegion {
1330 region_id: metadata.region_id,
1331 version_control,
1332 access_layer,
1333 manifest_ctx: manifest_ctx.clone(),
1334 file_purger: crate::test_util::new_noop_file_purger(),
1335 provider: Provider::noop_provider(),
1336 last_flush_millis: Default::default(),
1337 last_compaction_millis: Default::default(),
1338 time_provider: Arc::new(StdTimeProvider),
1339 topic_latest_entry_id: Default::default(),
1340 written_bytes: Arc::new(AtomicU64::new(0)),
1341 memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1342 sst_format: FormatType::PrimaryKey,
1343 stats: ManifestStats::default(),
1344 };
1345
1346 assert_eq!(
1348 region.state(),
1349 RegionRoleState::Leader(RegionLeaderState::Writable)
1350 );
1351 assert!(!region.is_staging());
1352
1353 let mut manager = manifest_ctx.manifest_manager.write().await;
1355 region.set_staging(&mut manager).await.unwrap();
1356 drop(manager);
1357 assert_eq!(
1358 region.state(),
1359 RegionRoleState::Leader(RegionLeaderState::Staging)
1360 );
1361 assert!(region.is_staging());
1362
1363 region.exit_staging().unwrap();
1365 assert_eq!(
1366 region.state(),
1367 RegionRoleState::Leader(RegionLeaderState::Writable)
1368 );
1369 assert!(!region.is_staging());
1370
1371 {
1373 let manager = manifest_ctx.manifest_manager.write().await;
1375 let dummy_actions = RegionMetaActionList::new(vec![]);
1376 let dummy_bytes = dummy_actions.encode().unwrap();
1377
1378 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1380 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1381 drop(manager);
1382
1383 let manager = manifest_ctx.manifest_manager.read().await;
1385 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1386 assert_eq!(
1387 dirty_manifests.len(),
1388 2,
1389 "Should have 2 dirty staging files"
1390 );
1391 drop(manager);
1392
1393 let mut manager = manifest_ctx.manifest_manager.write().await;
1395 region.set_staging(&mut manager).await.unwrap();
1396 drop(manager);
1397
1398 let manager = manifest_ctx.manifest_manager.read().await;
1400 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1401 assert_eq!(
1402 cleaned_manifests.len(),
1403 0,
1404 "Dirty staging files should be cleaned up"
1405 );
1406 drop(manager);
1407
1408 region.exit_staging().unwrap();
1410 }
1411
1412 let mut manager = manifest_ctx.manifest_manager.write().await;
1414 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1416 let mut manager = manifest_ctx.manifest_manager.write().await;
1417 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1419 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1422}