1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, Mutex, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44 UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::file_purger::FilePurgerRef;
53use crate::sst::location::{index_file_path, sst_file_path};
54use crate::time_provider::TimeProviderRef;
55
56const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
58
59#[derive(Debug)]
61pub struct RegionUsage {
62 pub region_id: RegionId,
63 pub wal_usage: u64,
64 pub sst_usage: u64,
65 pub manifest_usage: u64,
66}
67
68impl RegionUsage {
69 pub fn disk_usage(&self) -> u64 {
70 self.wal_usage + self.sst_usage + self.manifest_usage
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum RegionLeaderState {
76 Writable,
78 Staging,
80 EnteringStaging,
82 Altering,
84 Dropping,
86 Truncating,
88 Editing,
90 Downgrading,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum RegionRoleState {
96 Leader(RegionLeaderState),
97 Follower,
98}
99
100#[derive(Debug)]
106pub struct MitoRegion {
107 pub(crate) region_id: RegionId,
112
113 pub(crate) version_control: VersionControlRef,
117 pub(crate) access_layer: AccessLayerRef,
119 pub(crate) manifest_ctx: ManifestContextRef,
121 pub(crate) file_purger: FilePurgerRef,
123 pub(crate) provider: Provider,
125 last_flush_millis: AtomicI64,
127 last_compaction_millis: AtomicI64,
129 time_provider: TimeProviderRef,
131 pub(crate) topic_latest_entry_id: AtomicU64,
141 pub(crate) written_bytes: Arc<AtomicU64>,
143 pub(crate) staging_partition_expr: Mutex<Option<String>>,
151 stats: ManifestStats,
153}
154
155pub type MitoRegionRef = Arc<MitoRegion>;
156
157impl MitoRegion {
158 pub(crate) async fn stop(&self) {
160 self.manifest_ctx
161 .manifest_manager
162 .write()
163 .await
164 .stop()
165 .await;
166
167 info!(
168 "Stopped region manifest manager, region_id: {}",
169 self.region_id
170 );
171 }
172
173 pub(crate) fn metadata(&self) -> RegionMetadataRef {
175 let version_data = self.version_control.current();
176 version_data.version.metadata.clone()
177 }
178
179 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
181 let version_data = self.version_control.current();
182 version_data.version.metadata.primary_key_encoding
183 }
184
185 pub(crate) fn version(&self) -> VersionRef {
187 let version_data = self.version_control.current();
188 version_data.version
189 }
190
191 pub(crate) fn last_flush_millis(&self) -> i64 {
193 self.last_flush_millis.load(Ordering::Relaxed)
194 }
195
196 pub(crate) fn update_flush_millis(&self) {
198 let now = self.time_provider.current_time_millis();
199 self.last_flush_millis.store(now, Ordering::Relaxed);
200 }
201
202 pub(crate) fn last_compaction_millis(&self) -> i64 {
204 self.last_compaction_millis.load(Ordering::Relaxed)
205 }
206
207 pub(crate) fn update_compaction_millis(&self) {
209 let now = self.time_provider.current_time_millis();
210 self.last_compaction_millis.store(now, Ordering::Relaxed);
211 }
212
213 pub(crate) fn table_dir(&self) -> &str {
215 self.access_layer.table_dir()
216 }
217
218 pub(crate) fn is_writable(&self) -> bool {
220 matches!(
221 self.manifest_ctx.state.load(),
222 RegionRoleState::Leader(RegionLeaderState::Writable)
223 | RegionRoleState::Leader(RegionLeaderState::Staging)
224 )
225 }
226
227 pub(crate) fn is_flushable(&self) -> bool {
229 matches!(
230 self.manifest_ctx.state.load(),
231 RegionRoleState::Leader(RegionLeaderState::Writable)
232 | RegionRoleState::Leader(RegionLeaderState::Staging)
233 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
234 )
235 }
236
237 pub(crate) fn should_abort_index(&self) -> bool {
239 matches!(
240 self.manifest_ctx.state.load(),
241 RegionRoleState::Follower
242 | RegionRoleState::Leader(RegionLeaderState::Dropping)
243 | RegionRoleState::Leader(RegionLeaderState::Truncating)
244 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
245 | RegionRoleState::Leader(RegionLeaderState::Staging)
246 )
247 }
248
249 pub(crate) fn is_downgrading(&self) -> bool {
251 matches!(
252 self.manifest_ctx.state.load(),
253 RegionRoleState::Leader(RegionLeaderState::Downgrading)
254 )
255 }
256
257 #[allow(dead_code)]
259 pub(crate) fn is_staging(&self) -> bool {
260 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
261 }
262
263 pub fn region_id(&self) -> RegionId {
264 self.region_id
265 }
266
267 pub fn find_committed_sequence(&self) -> SequenceNumber {
268 self.version_control.committed_sequence()
269 }
270
271 pub fn is_follower(&self) -> bool {
273 self.manifest_ctx.state.load() == RegionRoleState::Follower
274 }
275
276 pub(crate) fn state(&self) -> RegionRoleState {
278 self.manifest_ctx.state.load()
279 }
280
281 pub(crate) fn set_role(&self, next_role: RegionRole) {
283 self.manifest_ctx.set_role(next_role, self.region_id);
284 }
285
286 pub(crate) fn set_altering(&self) -> Result<()> {
289 self.compare_exchange_state(
290 RegionLeaderState::Writable,
291 RegionRoleState::Leader(RegionLeaderState::Altering),
292 )
293 }
294
295 pub(crate) fn set_dropping(&self) -> Result<()> {
298 self.compare_exchange_state(
299 RegionLeaderState::Writable,
300 RegionRoleState::Leader(RegionLeaderState::Dropping),
301 )
302 }
303
304 pub(crate) fn set_truncating(&self) -> Result<()> {
307 self.compare_exchange_state(
308 RegionLeaderState::Writable,
309 RegionRoleState::Leader(RegionLeaderState::Truncating),
310 )
311 }
312
313 pub(crate) fn set_editing(&self) -> Result<()> {
316 self.compare_exchange_state(
317 RegionLeaderState::Writable,
318 RegionRoleState::Leader(RegionLeaderState::Editing),
319 )
320 }
321
322 pub(crate) async fn set_staging(
328 &self,
329 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
330 ) -> Result<()> {
331 manager.store().clear_staging_manifests().await?;
332
333 self.compare_exchange_state(
334 RegionLeaderState::Writable,
335 RegionRoleState::Leader(RegionLeaderState::Staging),
336 )
337 }
338
339 pub(crate) fn set_entering_staging(&self) -> Result<()> {
341 self.compare_exchange_state(
342 RegionLeaderState::Writable,
343 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
344 )
345 }
346
347 pub fn exit_staging(&self) -> Result<()> {
352 self.compare_exchange_state(
353 RegionLeaderState::Staging,
354 RegionRoleState::Leader(RegionLeaderState::Writable),
355 )
356 }
357
358 pub(crate) async fn set_role_state_gracefully(
360 &self,
361 state: SettableRegionRoleState,
362 ) -> Result<()> {
363 let mut manager = self.manifest_ctx.manifest_manager.write().await;
364 let current_state = self.state();
365
366 match state {
367 SettableRegionRoleState::Leader => {
368 match current_state {
371 RegionRoleState::Leader(RegionLeaderState::Staging) => {
372 info!("Exiting staging mode for region {}", self.region_id);
373 self.exit_staging_on_success(&mut manager).await?;
375 }
376 RegionRoleState::Leader(RegionLeaderState::Writable) => {
377 info!("Region {} already in normal leader mode", self.region_id);
379 }
380 _ => {
381 return Err(RegionStateSnafu {
383 region_id: self.region_id,
384 state: current_state,
385 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
386 }
387 .build());
388 }
389 }
390 }
391
392 SettableRegionRoleState::StagingLeader => {
393 match current_state {
396 RegionRoleState::Leader(RegionLeaderState::Writable) => {
397 info!("Entering staging mode for region {}", self.region_id);
398 self.set_staging(&mut manager).await?;
399 }
400 RegionRoleState::Leader(RegionLeaderState::Staging) => {
401 info!("Region {} already in staging mode", self.region_id);
403 }
404 _ => {
405 return Err(RegionStateSnafu {
406 region_id: self.region_id,
407 state: current_state,
408 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
409 }
410 .build());
411 }
412 }
413 }
414
415 SettableRegionRoleState::Follower => {
416 match current_state {
418 RegionRoleState::Leader(RegionLeaderState::Staging) => {
419 info!(
420 "Exiting staging and demoting region {} to follower",
421 self.region_id
422 );
423 self.exit_staging()?;
424 self.set_role(RegionRole::Follower);
425 }
426 RegionRoleState::Leader(_) => {
427 info!("Demoting region {} from leader to follower", self.region_id);
428 self.set_role(RegionRole::Follower);
429 }
430 RegionRoleState::Follower => {
431 info!("Region {} already in follower mode", self.region_id);
433 }
434 }
435 }
436
437 SettableRegionRoleState::DowngradingLeader => {
438 match current_state {
440 RegionRoleState::Leader(RegionLeaderState::Staging) => {
441 info!(
442 "Exiting staging and entering downgrade for region {}",
443 self.region_id
444 );
445 self.exit_staging()?;
446 self.set_role(RegionRole::DowngradingLeader);
447 }
448 RegionRoleState::Leader(RegionLeaderState::Writable) => {
449 info!("Starting downgrade for region {}", self.region_id);
450 self.set_role(RegionRole::DowngradingLeader);
451 }
452 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
453 info!("Region {} already in downgrading mode", self.region_id);
455 }
456 _ => {
457 warn!(
458 "Cannot start downgrade for region {} from state {:?}",
459 self.region_id, current_state
460 );
461 }
462 }
463 }
464 }
465
466 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
468 let manifest_meta = &manager.manifest().metadata;
470 let current_version = self.version();
471 let current_meta = ¤t_version.metadata;
472 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
473 let action = RegionMetaAction::Change(RegionChange {
474 metadata: current_meta.clone(),
475 sst_format: current_version.options.sst_format.unwrap_or_default(),
476 });
477 let result = manager
478 .update(RegionMetaActionList::with_action(action), false)
479 .await;
480
481 match result {
482 Ok(version) => {
483 info!(
484 "Successfully persisted backfilled metadata for region {}, version: {}",
485 self.region_id, version
486 );
487 }
488 Err(e) => {
489 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
490 }
491 }
492 }
493 }
494
495 drop(manager);
496
497 Ok(())
498 }
499
500 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
503 if let Err(e) = self
504 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
505 {
506 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
507 }
508 }
509
510 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
513 if let Err(e) =
514 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
515 {
516 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
517 }
518 }
519
520 pub(crate) fn region_statistic(&self) -> RegionStatistic {
522 let version = self.version();
523 let memtables = &version.memtables;
524 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
525
526 let sst_usage = version.ssts.sst_usage();
527 let index_usage = version.ssts.index_usage();
528 let flushed_entry_id = version.flushed_entry_id;
529
530 let wal_usage = self.estimated_wal_usage(memtable_usage);
531 let manifest_usage = self.stats.total_manifest_size();
532 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
533 let num_files = version.ssts.num_files();
534 let manifest_version = self.stats.manifest_version();
535 let file_removed_cnt = self.stats.file_removed_cnt();
536
537 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
538 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
539
540 RegionStatistic {
541 num_rows,
542 memtable_size: memtable_usage,
543 wal_size: wal_usage,
544 manifest_size: manifest_usage,
545 sst_size: sst_usage,
546 sst_num: num_files,
547 index_size: index_usage,
548 manifest: RegionManifestInfo::Mito {
549 manifest_version,
550 flushed_entry_id,
551 file_removed_cnt,
552 },
553 data_topic_latest_entry_id: topic_latest_entry_id,
554 metadata_topic_latest_entry_id: topic_latest_entry_id,
555 written_bytes,
556 }
557 }
558
559 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
562 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
563 }
564
565 fn compare_exchange_state(
568 &self,
569 expect: RegionLeaderState,
570 state: RegionRoleState,
571 ) -> Result<()> {
572 self.manifest_ctx
573 .state
574 .compare_exchange(RegionRoleState::Leader(expect), state)
575 .map_err(|actual| {
576 RegionStateSnafu {
577 region_id: self.region_id,
578 state: actual,
579 expect: RegionRoleState::Leader(expect),
580 }
581 .build()
582 })?;
583 Ok(())
584 }
585
586 pub fn access_layer(&self) -> AccessLayerRef {
587 self.access_layer.clone()
588 }
589
590 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
592 let table_dir = self.table_dir();
593 let path_type = self.access_layer.path_type();
594
595 let visible_ssts = self
596 .version()
597 .ssts
598 .levels()
599 .iter()
600 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
601 .collect::<HashSet<_>>();
602
603 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
604 let staging_files = self
605 .manifest_ctx
606 .staging_manifest()
607 .await
608 .map(|m| m.files.clone())
609 .unwrap_or_default();
610 let files = manifest_files
611 .into_iter()
612 .chain(staging_files.into_iter())
613 .collect::<HashMap<_, _>>();
614
615 files
616 .values()
617 .map(|meta| {
618 let region_id = self.region_id;
619 let origin_region_id = meta.region_id;
620 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
621 {
622 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
623 (
624 meta.index_version,
625 Some(index_file_path),
626 Some(meta.index_file_size),
627 )
628 } else {
629 (0, None, None)
630 };
631 let visible = visible_ssts.contains(&meta.file_id);
632 ManifestSstEntry {
633 table_dir: table_dir.to_string(),
634 region_id,
635 table_id: region_id.table_id(),
636 region_number: region_id.region_number(),
637 region_group: region_id.region_group(),
638 region_sequence: region_id.region_sequence(),
639 file_id: meta.file_id.to_string(),
640 index_version,
641 level: meta.level,
642 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
643 file_size: meta.file_size,
644 index_file_path,
645 index_file_size,
646 num_rows: meta.num_rows,
647 num_row_groups: meta.num_row_groups,
648 num_series: Some(meta.num_series),
649 min_ts: meta.time_range.0,
650 max_ts: meta.time_range.1,
651 sequence: meta.sequence.map(|s| s.get()),
652 origin_region_id,
653 node_id: None,
654 visible,
655 }
656 })
657 .collect()
658 }
659
660 pub(crate) async fn exit_staging_on_success(
662 &self,
663 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
664 ) -> Result<()> {
665 let current_state = self.manifest_ctx.current_state();
666 ensure!(
667 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
668 RegionStateSnafu {
669 region_id: self.region_id,
670 state: current_state,
671 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
672 }
673 );
674
675 let merged_actions = match manager.merge_staged_actions(current_state).await? {
677 Some(actions) => actions,
678 None => {
679 info!(
680 "No staged manifests to merge for region {}, exiting staging mode without changes",
681 self.region_id
682 );
683 self.exit_staging()?;
685 return Ok(());
686 }
687 };
688
689 let new_version = manager.update(merged_actions.clone(), false).await?;
692
693 info!(
694 "Successfully submitted merged staged manifests for region {}, new version: {}",
695 self.region_id, new_version
696 );
697
698 let merged_edit = merged_actions.into_region_edit();
700 self.version_control
701 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
702
703 manager.store().clear_staging_manifests().await?;
705 self.exit_staging()?;
706
707 Ok(())
708 }
709}
710
711#[derive(Debug)]
713pub(crate) struct ManifestContext {
714 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
716 state: AtomicCell<RegionRoleState>,
719}
720
721impl ManifestContext {
722 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
723 ManifestContext {
724 manifest_manager: tokio::sync::RwLock::new(manager),
725 state: AtomicCell::new(state),
726 }
727 }
728
729 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
730 self.manifest_manager
731 .read()
732 .await
733 .manifest()
734 .manifest_version
735 }
736
737 pub(crate) async fn has_update(&self) -> Result<bool> {
738 self.manifest_manager.read().await.has_update().await
739 }
740
741 pub(crate) fn current_state(&self) -> RegionRoleState {
743 self.state.load()
744 }
745
746 pub(crate) async fn install_manifest_to(
752 &self,
753 version: ManifestVersion,
754 ) -> Result<Arc<RegionManifest>> {
755 let mut manager = self.manifest_manager.write().await;
756 manager.install_manifest_to(version).await?;
757
758 Ok(manager.manifest())
759 }
760
761 pub(crate) async fn update_manifest(
763 &self,
764 expect_state: RegionLeaderState,
765 action_list: RegionMetaActionList,
766 is_staging: bool,
767 ) -> Result<ManifestVersion> {
768 let mut manager = self.manifest_manager.write().await;
770 let manifest = manager.manifest();
772 let current_state = self.state.load();
775
776 if expect_state != RegionLeaderState::Downgrading {
781 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
782 info!(
783 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
784 manifest.metadata.region_id, expect_state
785 );
786 }
787 ensure!(
788 current_state == RegionRoleState::Leader(expect_state)
789 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
790 UpdateManifestSnafu {
791 region_id: manifest.metadata.region_id,
792 state: current_state,
793 }
794 );
795 } else {
796 ensure!(
797 current_state == RegionRoleState::Leader(expect_state),
798 RegionStateSnafu {
799 region_id: manifest.metadata.region_id,
800 state: current_state,
801 expect: RegionRoleState::Leader(expect_state),
802 }
803 );
804 }
805
806 for action in &action_list.actions {
807 let RegionMetaAction::Edit(edit) = &action else {
809 continue;
810 };
811
812 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
814 continue;
815 };
816
817 if let Some(flushed_entry_id) = edit.flushed_entry_id {
819 ensure!(
820 truncated_entry_id < flushed_entry_id,
821 RegionTruncatedSnafu {
822 region_id: manifest.metadata.region_id,
823 }
824 );
825 }
826
827 if !edit.files_to_remove.is_empty() {
829 for file in &edit.files_to_remove {
831 ensure!(
832 manifest.files.contains_key(&file.file_id),
833 RegionTruncatedSnafu {
834 region_id: manifest.metadata.region_id,
835 }
836 );
837 }
838 }
839 }
840
841 let version = manager.update(action_list, is_staging).await.inspect_err(
843 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
844 )?;
845
846 if self.state.load() == RegionRoleState::Follower {
847 warn!(
848 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
849 manifest.metadata.region_id
850 );
851 }
852
853 Ok(version)
854 }
855
856 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
878 match next_role {
879 RegionRole::Follower => {
880 match self.state.fetch_update(|state| {
881 if !matches!(state, RegionRoleState::Follower) {
882 Some(RegionRoleState::Follower)
883 } else {
884 None
885 }
886 }) {
887 Ok(state) => info!(
888 "Convert region {} to follower, previous role state: {:?}",
889 region_id, state
890 ),
891 Err(state) => {
892 if state != RegionRoleState::Follower {
893 warn!(
894 "Failed to convert region {} to follower, current role state: {:?}",
895 region_id, state
896 )
897 }
898 }
899 }
900 }
901 RegionRole::Leader => {
902 match self.state.fetch_update(|state| {
903 if matches!(
904 state,
905 RegionRoleState::Follower
906 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
907 ) {
908 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
909 } else {
910 None
911 }
912 }) {
913 Ok(state) => info!(
914 "Convert region {} to leader, previous role state: {:?}",
915 region_id, state
916 ),
917 Err(state) => {
918 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
919 warn!(
920 "Failed to convert region {} to leader, current role state: {:?}",
921 region_id, state
922 )
923 }
924 }
925 }
926 }
927 RegionRole::DowngradingLeader => {
928 match self.state.compare_exchange(
929 RegionRoleState::Leader(RegionLeaderState::Writable),
930 RegionRoleState::Leader(RegionLeaderState::Downgrading),
931 ) {
932 Ok(state) => info!(
933 "Convert region {} to downgrading region, previous role state: {:?}",
934 region_id, state
935 ),
936 Err(state) => {
937 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
938 warn!(
939 "Failed to convert region {} to downgrading leader, current role state: {:?}",
940 region_id, state
941 )
942 }
943 }
944 }
945 }
946 }
947 }
948
949 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
951 self.manifest_manager.read().await.manifest()
952 }
953
954 pub(crate) async fn staging_manifest(
956 &self,
957 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
958 self.manifest_manager.read().await.staging_manifest()
959 }
960}
961
962pub(crate) type ManifestContextRef = Arc<ManifestContext>;
963
964#[derive(Debug, Default)]
966pub(crate) struct RegionMap {
967 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
968}
969
970impl RegionMap {
971 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
973 let regions = self.regions.read().unwrap();
974 regions.contains_key(®ion_id)
975 }
976
977 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
979 let mut regions = self.regions.write().unwrap();
980 regions.insert(region.region_id, region);
981 }
982
983 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
985 let regions = self.regions.read().unwrap();
986 regions.get(®ion_id).cloned()
987 }
988
989 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
993 let region = self
994 .get_region(region_id)
995 .context(RegionNotFoundSnafu { region_id })?;
996 ensure!(
997 region.is_writable(),
998 RegionStateSnafu {
999 region_id,
1000 state: region.state(),
1001 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1002 }
1003 );
1004 Ok(region)
1005 }
1006
1007 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1011 let region = self
1012 .get_region(region_id)
1013 .context(RegionNotFoundSnafu { region_id })?;
1014 ensure!(
1015 region.is_follower(),
1016 RegionStateSnafu {
1017 region_id,
1018 state: region.state(),
1019 expect: RegionRoleState::Follower,
1020 }
1021 );
1022
1023 Ok(region)
1024 }
1025
1026 pub(crate) fn get_region_or<F: OnFailure>(
1030 &self,
1031 region_id: RegionId,
1032 cb: &mut F,
1033 ) -> Option<MitoRegionRef> {
1034 match self
1035 .get_region(region_id)
1036 .context(RegionNotFoundSnafu { region_id })
1037 {
1038 Ok(region) => Some(region),
1039 Err(e) => {
1040 cb.on_failure(e);
1041 None
1042 }
1043 }
1044 }
1045
1046 pub(crate) fn writable_region_or<F: OnFailure>(
1050 &self,
1051 region_id: RegionId,
1052 cb: &mut F,
1053 ) -> Option<MitoRegionRef> {
1054 match self.writable_region(region_id) {
1055 Ok(region) => Some(region),
1056 Err(e) => {
1057 cb.on_failure(e);
1058 None
1059 }
1060 }
1061 }
1062
1063 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1067 let region = self.writable_region(region_id)?;
1068 if region.is_staging() {
1069 return Err(crate::error::RegionStateSnafu {
1070 region_id,
1071 state: region.state(),
1072 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1073 }
1074 .build());
1075 }
1076 Ok(region)
1077 }
1078
1079 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1083 let region = self
1084 .get_region(region_id)
1085 .context(RegionNotFoundSnafu { region_id })?;
1086 ensure!(
1087 region.is_staging(),
1088 RegionStateSnafu {
1089 region_id,
1090 state: region.state(),
1091 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1092 }
1093 );
1094 Ok(region)
1095 }
1096
1097 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1101 let region = self
1102 .get_region(region_id)
1103 .context(RegionNotFoundSnafu { region_id })?;
1104 ensure!(
1105 region.is_flushable(),
1106 FlushableRegionStateSnafu {
1107 region_id,
1108 state: region.state(),
1109 }
1110 );
1111 Ok(region)
1112 }
1113
1114 pub(crate) fn flushable_region_or<F: OnFailure>(
1118 &self,
1119 region_id: RegionId,
1120 cb: &mut F,
1121 ) -> Option<MitoRegionRef> {
1122 match self.flushable_region(region_id) {
1123 Ok(region) => Some(region),
1124 Err(e) => {
1125 cb.on_failure(e);
1126 None
1127 }
1128 }
1129 }
1130
1131 pub(crate) fn remove_region(&self, region_id: RegionId) {
1133 let mut regions = self.regions.write().unwrap();
1134 regions.remove(®ion_id);
1135 }
1136
1137 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1139 let regions = self.regions.read().unwrap();
1140 regions.values().cloned().collect()
1141 }
1142
1143 pub(crate) fn clear(&self) {
1145 self.regions.write().unwrap().clear();
1146 }
1147}
1148
1149pub(crate) type RegionMapRef = Arc<RegionMap>;
1150
1151#[derive(Debug, Default)]
1153pub(crate) struct OpeningRegions {
1154 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1155}
1156
1157impl OpeningRegions {
1158 pub(crate) fn wait_for_opening_region(
1160 &self,
1161 region_id: RegionId,
1162 sender: OptionOutputTx,
1163 ) -> Option<OptionOutputTx> {
1164 let mut regions = self.regions.write().unwrap();
1165 match regions.entry(region_id) {
1166 Entry::Occupied(mut senders) => {
1167 senders.get_mut().push(sender);
1168 None
1169 }
1170 Entry::Vacant(_) => Some(sender),
1171 }
1172 }
1173
1174 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1176 let regions = self.regions.read().unwrap();
1177 regions.contains_key(®ion_id)
1178 }
1179
1180 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1182 let mut regions = self.regions.write().unwrap();
1183 regions.insert(region, vec![sender]);
1184 }
1185
1186 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1188 let mut regions = self.regions.write().unwrap();
1189 regions.remove(®ion_id).unwrap_or_default()
1190 }
1191
1192 #[cfg(test)]
1193 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1194 let regions = self.regions.read().unwrap();
1195 if let Some(senders) = regions.get(®ion_id) {
1196 senders.len()
1197 } else {
1198 0
1199 }
1200 }
1201}
1202
1203pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1204
1205#[derive(Debug, Default)]
1207pub(crate) struct CatchupRegions {
1208 regions: RwLock<HashSet<RegionId>>,
1209}
1210
1211impl CatchupRegions {
1212 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1214 let regions = self.regions.read().unwrap();
1215 regions.contains(®ion_id)
1216 }
1217
1218 pub(crate) fn insert_region(&self, region_id: RegionId) {
1220 let mut regions = self.regions.write().unwrap();
1221 regions.insert(region_id);
1222 }
1223
1224 pub(crate) fn remove_region(&self, region_id: RegionId) {
1226 let mut regions = self.regions.write().unwrap();
1227 regions.remove(®ion_id);
1228 }
1229}
1230
1231pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1232
1233#[derive(Default, Debug, Clone)]
1235pub struct ManifestStats {
1236 pub(crate) total_manifest_size: Arc<AtomicU64>,
1237 pub(crate) manifest_version: Arc<AtomicU64>,
1238 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1239}
1240
1241impl ManifestStats {
1242 fn total_manifest_size(&self) -> u64 {
1243 self.total_manifest_size.load(Ordering::Relaxed)
1244 }
1245
1246 fn manifest_version(&self) -> u64 {
1247 self.manifest_version.load(Ordering::Relaxed)
1248 }
1249
1250 fn file_removed_cnt(&self) -> u64 {
1251 self.file_removed_cnt.load(Ordering::Relaxed)
1252 }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257 use std::sync::atomic::AtomicU64;
1258 use std::sync::{Arc, Mutex};
1259
1260 use common_datasource::compression::CompressionType;
1261 use common_test_util::temp_dir::create_temp_dir;
1262 use crossbeam_utils::atomic::AtomicCell;
1263 use object_store::ObjectStore;
1264 use object_store::services::Fs;
1265 use store_api::logstore::provider::Provider;
1266 use store_api::region_engine::RegionRole;
1267 use store_api::region_request::PathType;
1268 use store_api::storage::RegionId;
1269
1270 use crate::access_layer::AccessLayer;
1271 use crate::manifest::action::RegionMetaActionList;
1272 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1273 use crate::region::{
1274 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1275 };
1276 use crate::sst::FormatType;
1277 use crate::sst::index::intermediate::IntermediateManager;
1278 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1279 use crate::test_util::scheduler_util::SchedulerEnv;
1280 use crate::test_util::version_util::VersionControlBuilder;
1281 use crate::time_provider::StdTimeProvider;
1282
1283 #[test]
1284 fn test_region_state_lock_free() {
1285 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1286 }
1287
1288 #[tokio::test]
1289 async fn test_set_region_state() {
1290 let env = SchedulerEnv::new().await;
1291 let builder = VersionControlBuilder::new();
1292 let version_control = Arc::new(builder.build());
1293 let manifest_ctx = env
1294 .mock_manifest_context(version_control.current().version.metadata.clone())
1295 .await;
1296
1297 let region_id = RegionId::new(1024, 0);
1298 manifest_ctx.set_role(RegionRole::Follower, region_id);
1300 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1301
1302 manifest_ctx.set_role(RegionRole::Leader, region_id);
1304 assert_eq!(
1305 manifest_ctx.state.load(),
1306 RegionRoleState::Leader(RegionLeaderState::Writable)
1307 );
1308
1309 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1311 assert_eq!(
1312 manifest_ctx.state.load(),
1313 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1314 );
1315
1316 manifest_ctx.set_role(RegionRole::Follower, region_id);
1318 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1319
1320 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1322 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1323
1324 manifest_ctx.set_role(RegionRole::Leader, region_id);
1326 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1327 assert_eq!(
1328 manifest_ctx.state.load(),
1329 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1330 );
1331
1332 manifest_ctx.set_role(RegionRole::Leader, region_id);
1334 assert_eq!(
1335 manifest_ctx.state.load(),
1336 RegionRoleState::Leader(RegionLeaderState::Writable)
1337 );
1338 }
1339
1340 #[tokio::test]
1341 async fn test_staging_state_validation() {
1342 let env = SchedulerEnv::new().await;
1343 let builder = VersionControlBuilder::new();
1344 let version_control = Arc::new(builder.build());
1345
1346 let staging_ctx = {
1348 let manager = RegionManifestManager::new(
1349 version_control.current().version.metadata.clone(),
1350 0,
1351 RegionManifestOptions {
1352 manifest_dir: "".to_string(),
1353 object_store: env.access_layer.object_store().clone(),
1354 compress_type: CompressionType::Uncompressed,
1355 checkpoint_distance: 10,
1356 remove_file_options: Default::default(),
1357 manifest_cache: None,
1358 },
1359 FormatType::PrimaryKey,
1360 &Default::default(),
1361 )
1362 .await
1363 .unwrap();
1364 Arc::new(ManifestContext::new(
1365 manager,
1366 RegionRoleState::Leader(RegionLeaderState::Staging),
1367 ))
1368 };
1369
1370 assert_eq!(
1372 staging_ctx.current_state(),
1373 RegionRoleState::Leader(RegionLeaderState::Staging)
1374 );
1375
1376 let writable_ctx = env
1378 .mock_manifest_context(version_control.current().version.metadata.clone())
1379 .await;
1380
1381 assert_eq!(
1382 writable_ctx.current_state(),
1383 RegionRoleState::Leader(RegionLeaderState::Writable)
1384 );
1385 }
1386
1387 #[tokio::test]
1388 async fn test_staging_state_transitions() {
1389 let builder = VersionControlBuilder::new();
1390 let version_control = Arc::new(builder.build());
1391 let metadata = version_control.current().version.metadata.clone();
1392
1393 let temp_dir = create_temp_dir("");
1395 let path_str = temp_dir.path().display().to_string();
1396 let fs_builder = Fs::default().root(&path_str);
1397 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1398
1399 let index_aux_path = temp_dir.path().join("index_aux");
1400 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1401 .await
1402 .unwrap();
1403 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1404 .await
1405 .unwrap();
1406
1407 let access_layer = Arc::new(AccessLayer::new(
1408 "",
1409 PathType::Bare,
1410 object_store,
1411 puffin_mgr,
1412 intm_mgr,
1413 ));
1414
1415 let manager = RegionManifestManager::new(
1416 metadata.clone(),
1417 0,
1418 RegionManifestOptions {
1419 manifest_dir: "".to_string(),
1420 object_store: access_layer.object_store().clone(),
1421 compress_type: CompressionType::Uncompressed,
1422 checkpoint_distance: 10,
1423 remove_file_options: Default::default(),
1424 manifest_cache: None,
1425 },
1426 FormatType::PrimaryKey,
1427 &Default::default(),
1428 )
1429 .await
1430 .unwrap();
1431
1432 let manifest_ctx = Arc::new(ManifestContext::new(
1433 manager,
1434 RegionRoleState::Leader(RegionLeaderState::Writable),
1435 ));
1436
1437 let region = MitoRegion {
1438 region_id: metadata.region_id,
1439 version_control,
1440 access_layer,
1441 manifest_ctx: manifest_ctx.clone(),
1442 file_purger: crate::test_util::new_noop_file_purger(),
1443 provider: Provider::noop_provider(),
1444 last_flush_millis: Default::default(),
1445 last_compaction_millis: Default::default(),
1446 time_provider: Arc::new(StdTimeProvider),
1447 topic_latest_entry_id: Default::default(),
1448 written_bytes: Arc::new(AtomicU64::new(0)),
1449 stats: ManifestStats::default(),
1450 staging_partition_expr: Mutex::new(None),
1451 };
1452
1453 assert_eq!(
1455 region.state(),
1456 RegionRoleState::Leader(RegionLeaderState::Writable)
1457 );
1458 assert!(!region.is_staging());
1459
1460 let mut manager = manifest_ctx.manifest_manager.write().await;
1462 region.set_staging(&mut manager).await.unwrap();
1463 drop(manager);
1464 assert_eq!(
1465 region.state(),
1466 RegionRoleState::Leader(RegionLeaderState::Staging)
1467 );
1468 assert!(region.is_staging());
1469
1470 region.exit_staging().unwrap();
1472 assert_eq!(
1473 region.state(),
1474 RegionRoleState::Leader(RegionLeaderState::Writable)
1475 );
1476 assert!(!region.is_staging());
1477
1478 {
1480 let manager = manifest_ctx.manifest_manager.write().await;
1482 let dummy_actions = RegionMetaActionList::new(vec![]);
1483 let dummy_bytes = dummy_actions.encode().unwrap();
1484
1485 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1487 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1488 drop(manager);
1489
1490 let manager = manifest_ctx.manifest_manager.read().await;
1492 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1493 assert_eq!(
1494 dirty_manifests.len(),
1495 2,
1496 "Should have 2 dirty staging files"
1497 );
1498 drop(manager);
1499
1500 let mut manager = manifest_ctx.manifest_manager.write().await;
1502 region.set_staging(&mut manager).await.unwrap();
1503 drop(manager);
1504
1505 let manager = manifest_ctx.manifest_manager.read().await;
1507 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1508 assert_eq!(
1509 cleaned_manifests.len(),
1510 0,
1511 "Dirty staging files should be cleaned up"
1512 );
1513 drop(manager);
1514
1515 region.exit_staging().unwrap();
1517 }
1518
1519 let mut manager = manifest_ctx.manifest_manager.write().await;
1521 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1523 let mut manager = manifest_ctx.manifest_manager.write().await;
1524 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1526 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1529}