1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_telemetry::{error, info, warn};
29use crossbeam_utils::atomic::AtomicCell;
30use partition::expr::PartitionExpr;
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
38};
39use store_api::region_request::PathType;
40use store_api::sst_entry::ManifestSstEntry;
41use store_api::storage::{FileId, RegionId, SequenceNumber};
42use tokio::sync::RwLockWriteGuard;
43pub use utils::*;
44
45use crate::access_layer::AccessLayerRef;
46use crate::error::{
47 InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
48 UnexpectedSnafu, UpdateManifestSnafu,
49};
50use crate::manifest::action::{
51 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
52};
53use crate::manifest::manager::RegionManifestManager;
54use crate::region::version::{VersionControlRef, VersionRef};
55use crate::request::{OnFailure, OptionOutputTx};
56use crate::sst::file::FileMeta;
57use crate::sst::file_purger::FilePurgerRef;
58use crate::sst::location::{index_file_path, sst_file_path};
59use crate::time_provider::TimeProviderRef;
60
61const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
63
64#[derive(Debug)]
66pub struct RegionUsage {
67 pub region_id: RegionId,
68 pub wal_usage: u64,
69 pub sst_usage: u64,
70 pub manifest_usage: u64,
71}
72
73impl RegionUsage {
74 pub fn disk_usage(&self) -> u64 {
75 self.wal_usage + self.sst_usage + self.manifest_usage
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum RegionLeaderState {
81 Writable,
83 Staging,
85 EnteringStaging,
87 Altering,
89 Dropping,
91 Truncating,
93 Editing,
95 Downgrading,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum RegionRoleState {
101 Leader(RegionLeaderState),
102 Follower,
103}
104
105impl RegionRoleState {
106 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
108 match self {
109 RegionRoleState::Leader(leader_state) => Some(leader_state),
110 RegionRoleState::Follower => None,
111 }
112 }
113}
114
115#[derive(Debug)]
121pub struct MitoRegion {
122 pub(crate) region_id: RegionId,
127
128 pub(crate) version_control: VersionControlRef,
132 pub(crate) access_layer: AccessLayerRef,
134 pub(crate) manifest_ctx: ManifestContextRef,
136 pub(crate) file_purger: FilePurgerRef,
138 pub(crate) provider: Provider,
140 last_flush_millis: AtomicI64,
142 last_compaction_millis: AtomicI64,
144 time_provider: TimeProviderRef,
146 pub(crate) topic_latest_entry_id: AtomicU64,
156 pub(crate) written_bytes: Arc<AtomicU64>,
158 pub(crate) staging_partition_expr: Mutex<Option<String>>,
166 stats: ManifestStats,
168}
169
170pub type MitoRegionRef = Arc<MitoRegion>;
171
172impl MitoRegion {
173 pub(crate) async fn stop(&self) {
175 self.manifest_ctx
176 .manifest_manager
177 .write()
178 .await
179 .stop()
180 .await;
181
182 info!(
183 "Stopped region manifest manager, region_id: {}",
184 self.region_id
185 );
186 }
187
188 pub fn metadata(&self) -> RegionMetadataRef {
190 let version_data = self.version_control.current();
191 version_data.version.metadata.clone()
192 }
193
194 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
196 let version_data = self.version_control.current();
197 version_data.version.metadata.primary_key_encoding
198 }
199
200 pub(crate) fn version(&self) -> VersionRef {
202 let version_data = self.version_control.current();
203 version_data.version
204 }
205
206 pub(crate) fn last_flush_millis(&self) -> i64 {
208 self.last_flush_millis.load(Ordering::Relaxed)
209 }
210
211 pub(crate) fn update_flush_millis(&self) {
213 let now = self.time_provider.current_time_millis();
214 self.last_flush_millis.store(now, Ordering::Relaxed);
215 }
216
217 pub(crate) fn last_compaction_millis(&self) -> i64 {
219 self.last_compaction_millis.load(Ordering::Relaxed)
220 }
221
222 pub(crate) fn update_compaction_millis(&self) {
224 let now = self.time_provider.current_time_millis();
225 self.last_compaction_millis.store(now, Ordering::Relaxed);
226 }
227
228 pub(crate) fn table_dir(&self) -> &str {
230 self.access_layer.table_dir()
231 }
232
233 pub(crate) fn path_type(&self) -> PathType {
235 self.access_layer.path_type()
236 }
237
238 pub(crate) fn is_writable(&self) -> bool {
240 matches!(
241 self.manifest_ctx.state.load(),
242 RegionRoleState::Leader(RegionLeaderState::Writable)
243 | RegionRoleState::Leader(RegionLeaderState::Staging)
244 )
245 }
246
247 pub(crate) fn is_flushable(&self) -> bool {
249 matches!(
250 self.manifest_ctx.state.load(),
251 RegionRoleState::Leader(RegionLeaderState::Writable)
252 | RegionRoleState::Leader(RegionLeaderState::Staging)
253 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
254 )
255 }
256
257 pub(crate) fn should_abort_index(&self) -> bool {
259 matches!(
260 self.manifest_ctx.state.load(),
261 RegionRoleState::Follower
262 | RegionRoleState::Leader(RegionLeaderState::Dropping)
263 | RegionRoleState::Leader(RegionLeaderState::Truncating)
264 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
265 | RegionRoleState::Leader(RegionLeaderState::Staging)
266 )
267 }
268
269 pub(crate) fn is_downgrading(&self) -> bool {
271 matches!(
272 self.manifest_ctx.state.load(),
273 RegionRoleState::Leader(RegionLeaderState::Downgrading)
274 )
275 }
276
277 #[allow(dead_code)]
279 pub(crate) fn is_staging(&self) -> bool {
280 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
281 }
282
283 pub fn region_id(&self) -> RegionId {
284 self.region_id
285 }
286
287 pub fn find_committed_sequence(&self) -> SequenceNumber {
288 self.version_control.committed_sequence()
289 }
290
291 pub fn is_follower(&self) -> bool {
293 self.manifest_ctx.state.load() == RegionRoleState::Follower
294 }
295
296 pub(crate) fn state(&self) -> RegionRoleState {
298 self.manifest_ctx.state.load()
299 }
300
301 pub(crate) fn set_role(&self, next_role: RegionRole) {
303 self.manifest_ctx.set_role(next_role, self.region_id);
304 }
305
306 pub(crate) fn set_altering(&self) -> Result<()> {
309 self.compare_exchange_state(
310 RegionLeaderState::Writable,
311 RegionRoleState::Leader(RegionLeaderState::Altering),
312 )
313 }
314
315 pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
318 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
319 }
320
321 pub(crate) fn set_truncating(&self) -> Result<()> {
324 self.compare_exchange_state(
325 RegionLeaderState::Writable,
326 RegionRoleState::Leader(RegionLeaderState::Truncating),
327 )
328 }
329
330 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
333 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
334 }
335
336 pub(crate) async fn set_staging(
342 &self,
343 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
344 ) -> Result<()> {
345 manager.store().clear_staging_manifests().await?;
346
347 self.compare_exchange_state(
348 RegionLeaderState::Writable,
349 RegionRoleState::Leader(RegionLeaderState::Staging),
350 )
351 }
352
353 pub(crate) fn set_entering_staging(&self) -> Result<()> {
355 self.compare_exchange_state(
356 RegionLeaderState::Writable,
357 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
358 )
359 }
360
361 pub fn exit_staging(&self) -> Result<()> {
366 *self.staging_partition_expr.lock().unwrap() = None;
367 self.compare_exchange_state(
368 RegionLeaderState::Staging,
369 RegionRoleState::Leader(RegionLeaderState::Writable),
370 )
371 }
372
373 pub(crate) async fn set_role_state_gracefully(
375 &self,
376 state: SettableRegionRoleState,
377 ) -> Result<()> {
378 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
379 self.manifest_ctx.manifest_manager.write().await;
380 let current_state = self.state();
381
382 match state {
383 SettableRegionRoleState::Leader => {
384 match current_state {
387 RegionRoleState::Leader(RegionLeaderState::Staging) => {
388 info!("Exiting staging mode for region {}", self.region_id);
389 self.exit_staging_on_success(&mut manager).await?;
391 }
392 RegionRoleState::Leader(RegionLeaderState::Writable) => {
393 info!("Region {} already in normal leader mode", self.region_id);
395 }
396 _ => {
397 return Err(RegionStateSnafu {
399 region_id: self.region_id,
400 state: current_state,
401 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
402 }
403 .build());
404 }
405 }
406 }
407
408 SettableRegionRoleState::StagingLeader => {
409 match current_state {
412 RegionRoleState::Leader(RegionLeaderState::Writable) => {
413 info!("Entering staging mode for region {}", self.region_id);
414 self.set_staging(&mut manager).await?;
415 }
416 RegionRoleState::Leader(RegionLeaderState::Staging) => {
417 info!("Region {} already in staging mode", self.region_id);
419 }
420 _ => {
421 return Err(RegionStateSnafu {
422 region_id: self.region_id,
423 state: current_state,
424 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
425 }
426 .build());
427 }
428 }
429 }
430
431 SettableRegionRoleState::Follower => {
432 match current_state {
434 RegionRoleState::Leader(RegionLeaderState::Staging) => {
435 info!(
436 "Exiting staging and demoting region {} to follower",
437 self.region_id
438 );
439 self.exit_staging()?;
440 self.set_role(RegionRole::Follower);
441 }
442 RegionRoleState::Leader(_) => {
443 info!("Demoting region {} from leader to follower", self.region_id);
444 self.set_role(RegionRole::Follower);
445 }
446 RegionRoleState::Follower => {
447 info!("Region {} already in follower mode", self.region_id);
449 }
450 }
451 }
452
453 SettableRegionRoleState::DowngradingLeader => {
454 match current_state {
456 RegionRoleState::Leader(RegionLeaderState::Staging) => {
457 info!(
458 "Exiting staging and entering downgrade for region {}",
459 self.region_id
460 );
461 self.exit_staging()?;
462 self.set_role(RegionRole::DowngradingLeader);
463 }
464 RegionRoleState::Leader(RegionLeaderState::Writable) => {
465 info!("Starting downgrade for region {}", self.region_id);
466 self.set_role(RegionRole::DowngradingLeader);
467 }
468 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
469 info!("Region {} already in downgrading mode", self.region_id);
471 }
472 _ => {
473 warn!(
474 "Cannot start downgrade for region {} from state {:?}",
475 self.region_id, current_state
476 );
477 }
478 }
479 }
480 }
481
482 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
484 let manifest_meta = &manager.manifest().metadata;
486 let current_version = self.version();
487 let current_meta = ¤t_version.metadata;
488 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
489 let action = RegionMetaAction::Change(RegionChange {
490 metadata: current_meta.clone(),
491 sst_format: current_version.options.sst_format.unwrap_or_default(),
492 });
493 let result = manager
494 .update(RegionMetaActionList::with_action(action), false)
495 .await;
496
497 match result {
498 Ok(version) => {
499 info!(
500 "Successfully persisted backfilled metadata for region {}, version: {}",
501 self.region_id, version
502 );
503 }
504 Err(e) => {
505 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
506 }
507 }
508 }
509 }
510
511 drop(manager);
512
513 Ok(())
514 }
515
516 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
519 if let Err(e) = self
520 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
521 {
522 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
523 }
524 }
525
526 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
529 if let Err(e) =
530 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
531 {
532 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
533 }
534 }
535
536 pub(crate) fn region_statistic(&self) -> RegionStatistic {
538 let version = self.version();
539 let memtables = &version.memtables;
540 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
541
542 let sst_usage = version.ssts.sst_usage();
543 let index_usage = version.ssts.index_usage();
544 let flushed_entry_id = version.flushed_entry_id;
545
546 let wal_usage = self.estimated_wal_usage(memtable_usage);
547 let manifest_usage = self.stats.total_manifest_size();
548 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
549 let num_files = version.ssts.num_files();
550 let manifest_version = self.stats.manifest_version();
551 let file_removed_cnt = self.stats.file_removed_cnt();
552
553 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
554 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
555
556 RegionStatistic {
557 num_rows,
558 memtable_size: memtable_usage,
559 wal_size: wal_usage,
560 manifest_size: manifest_usage,
561 sst_size: sst_usage,
562 sst_num: num_files,
563 index_size: index_usage,
564 manifest: RegionManifestInfo::Mito {
565 manifest_version,
566 flushed_entry_id,
567 file_removed_cnt,
568 },
569 data_topic_latest_entry_id: topic_latest_entry_id,
570 metadata_topic_latest_entry_id: topic_latest_entry_id,
571 written_bytes,
572 }
573 }
574
575 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
578 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
579 }
580
581 fn compare_exchange_state(
584 &self,
585 expect: RegionLeaderState,
586 state: RegionRoleState,
587 ) -> Result<()> {
588 self.manifest_ctx
589 .state
590 .compare_exchange(RegionRoleState::Leader(expect), state)
591 .map_err(|actual| {
592 RegionStateSnafu {
593 region_id: self.region_id,
594 state: actual,
595 expect: RegionRoleState::Leader(expect),
596 }
597 .build()
598 })?;
599 Ok(())
600 }
601
602 pub fn access_layer(&self) -> AccessLayerRef {
603 self.access_layer.clone()
604 }
605
606 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
608 let table_dir = self.table_dir();
609 let path_type = self.access_layer.path_type();
610
611 let visible_ssts = self
612 .version()
613 .ssts
614 .levels()
615 .iter()
616 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
617 .collect::<HashSet<_>>();
618
619 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
620 let staging_files = self
621 .manifest_ctx
622 .staging_manifest()
623 .await
624 .map(|m| m.files.clone())
625 .unwrap_or_default();
626 let files = manifest_files
627 .into_iter()
628 .chain(staging_files.into_iter())
629 .collect::<HashMap<_, _>>();
630
631 files
632 .values()
633 .map(|meta| {
634 let region_id = self.region_id;
635 let origin_region_id = meta.region_id;
636 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
637 {
638 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
639 (
640 meta.index_version,
641 Some(index_file_path),
642 Some(meta.index_file_size),
643 )
644 } else {
645 (0, None, None)
646 };
647 let visible = visible_ssts.contains(&meta.file_id);
648 ManifestSstEntry {
649 table_dir: table_dir.to_string(),
650 region_id,
651 table_id: region_id.table_id(),
652 region_number: region_id.region_number(),
653 region_group: region_id.region_group(),
654 region_sequence: region_id.region_sequence(),
655 file_id: meta.file_id.to_string(),
656 index_version,
657 level: meta.level,
658 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
659 file_size: meta.file_size,
660 index_file_path,
661 index_file_size,
662 num_rows: meta.num_rows,
663 num_row_groups: meta.num_row_groups,
664 num_series: Some(meta.num_series),
665 min_ts: meta.time_range.0,
666 max_ts: meta.time_range.1,
667 sequence: meta.sequence.map(|s| s.get()),
668 origin_region_id,
669 node_id: None,
670 visible,
671 }
672 })
673 .collect()
674 }
675
676 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
678 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
679
680 file_ids
681 .iter()
682 .map(|file_id| manifest_files.get(file_id).cloned())
683 .collect::<Vec<_>>()
684 }
685
686 pub(crate) async fn exit_staging_on_success(
688 &self,
689 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
690 ) -> Result<()> {
691 let current_state = self.manifest_ctx.current_state();
692 ensure!(
693 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
694 RegionStateSnafu {
695 region_id: self.region_id,
696 state: current_state,
697 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
698 }
699 );
700
701 let merged_actions = match manager.merge_staged_actions(current_state).await? {
703 Some(actions) => actions,
704 None => {
705 info!(
706 "No staged manifests to merge for region {}, exiting staging mode without changes",
707 self.region_id
708 );
709 self.exit_staging()?;
711 return Ok(());
712 }
713 };
714 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
715 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
716 ensure!(
717 expect_change,
718 UnexpectedSnafu {
719 reason: "expect a change action in merged actions"
720 }
721 );
722 ensure!(
723 expect_edit,
724 UnexpectedSnafu {
725 reason: "expect an edit action in merged actions"
726 }
727 );
728
729 let new_version = manager.update(merged_actions.clone(), false).await?;
732
733 info!(
734 "Successfully submitted merged staged manifests for region {}, new version: {}",
735 self.region_id, new_version
736 );
737
738 let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
740 let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
742 self.version_control.alter_schema(new_metadata);
743 self.version_control
744 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
745
746 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
748 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
749 }
750 self.exit_staging()?;
751
752 Ok(())
753 }
754
755 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
761 let is_staging = self.is_staging();
762 if is_staging {
763 let staging_partition_expr = self.staging_partition_expr.lock().unwrap();
764 if staging_partition_expr.is_none() {
765 warn!(
766 "Staging partition expr is none for region {} in staging state",
767 self.region_id
768 );
769 }
770 staging_partition_expr.clone()
771 } else {
772 let version = self.version();
773 version.metadata.partition_expr.clone()
774 }
775 }
776}
777
778#[derive(Debug)]
780pub(crate) struct ManifestContext {
781 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
783 state: AtomicCell<RegionRoleState>,
786}
787
788impl ManifestContext {
789 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
790 ManifestContext {
791 manifest_manager: tokio::sync::RwLock::new(manager),
792 state: AtomicCell::new(state),
793 }
794 }
795
796 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
797 self.manifest_manager
798 .read()
799 .await
800 .manifest()
801 .manifest_version
802 }
803
804 pub(crate) async fn has_update(&self) -> Result<bool> {
805 self.manifest_manager.read().await.has_update().await
806 }
807
808 pub(crate) fn current_state(&self) -> RegionRoleState {
810 self.state.load()
811 }
812
813 pub(crate) async fn install_manifest_to(
819 &self,
820 version: ManifestVersion,
821 ) -> Result<Arc<RegionManifest>> {
822 let mut manager = self.manifest_manager.write().await;
823 manager.install_manifest_to(version).await?;
824
825 Ok(manager.manifest())
826 }
827
828 pub(crate) async fn update_manifest(
830 &self,
831 expect_state: RegionLeaderState,
832 action_list: RegionMetaActionList,
833 is_staging: bool,
834 ) -> Result<ManifestVersion> {
835 let mut manager = self.manifest_manager.write().await;
837 let manifest = manager.manifest();
839 let current_state = self.state.load();
842
843 if expect_state != RegionLeaderState::Downgrading {
848 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
849 info!(
850 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
851 manifest.metadata.region_id, expect_state
852 );
853 }
854 ensure!(
855 current_state == RegionRoleState::Leader(expect_state)
856 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
857 UpdateManifestSnafu {
858 region_id: manifest.metadata.region_id,
859 state: current_state,
860 }
861 );
862 } else {
863 ensure!(
864 current_state == RegionRoleState::Leader(expect_state),
865 RegionStateSnafu {
866 region_id: manifest.metadata.region_id,
867 state: current_state,
868 expect: RegionRoleState::Leader(expect_state),
869 }
870 );
871 }
872
873 for action in &action_list.actions {
874 let RegionMetaAction::Edit(edit) = &action else {
876 continue;
877 };
878
879 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
881 continue;
882 };
883
884 if let Some(flushed_entry_id) = edit.flushed_entry_id {
886 ensure!(
887 truncated_entry_id < flushed_entry_id,
888 RegionTruncatedSnafu {
889 region_id: manifest.metadata.region_id,
890 }
891 );
892 }
893
894 if !edit.files_to_remove.is_empty() {
896 for file in &edit.files_to_remove {
898 ensure!(
899 manifest.files.contains_key(&file.file_id),
900 RegionTruncatedSnafu {
901 region_id: manifest.metadata.region_id,
902 }
903 );
904 }
905 }
906 }
907
908 let version = manager.update(action_list, is_staging).await.inspect_err(
910 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
911 )?;
912
913 if self.state.load() == RegionRoleState::Follower {
914 warn!(
915 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
916 manifest.metadata.region_id
917 );
918 }
919
920 Ok(version)
921 }
922
923 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
945 match next_role {
946 RegionRole::Follower => {
947 match self.state.fetch_update(|state| {
948 if !matches!(state, RegionRoleState::Follower) {
949 Some(RegionRoleState::Follower)
950 } else {
951 None
952 }
953 }) {
954 Ok(state) => info!(
955 "Convert region {} to follower, previous role state: {:?}",
956 region_id, state
957 ),
958 Err(state) => {
959 if state != RegionRoleState::Follower {
960 warn!(
961 "Failed to convert region {} to follower, current role state: {:?}",
962 region_id, state
963 )
964 }
965 }
966 }
967 }
968 RegionRole::Leader => {
969 match self.state.fetch_update(|state| {
970 if matches!(
971 state,
972 RegionRoleState::Follower
973 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
974 ) {
975 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
976 } else {
977 None
978 }
979 }) {
980 Ok(state) => info!(
981 "Convert region {} to leader, previous role state: {:?}",
982 region_id, state
983 ),
984 Err(state) => {
985 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
986 warn!(
987 "Failed to convert region {} to leader, current role state: {:?}",
988 region_id, state
989 )
990 }
991 }
992 }
993 }
994 RegionRole::DowngradingLeader => {
995 match self.state.compare_exchange(
996 RegionRoleState::Leader(RegionLeaderState::Writable),
997 RegionRoleState::Leader(RegionLeaderState::Downgrading),
998 ) {
999 Ok(state) => info!(
1000 "Convert region {} to downgrading region, previous role state: {:?}",
1001 region_id, state
1002 ),
1003 Err(state) => {
1004 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1005 warn!(
1006 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1007 region_id, state
1008 )
1009 }
1010 }
1011 }
1012 }
1013 }
1014 }
1015
1016 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1018 self.manifest_manager.read().await.manifest()
1019 }
1020
1021 pub(crate) async fn staging_manifest(
1023 &self,
1024 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1025 self.manifest_manager.read().await.staging_manifest()
1026 }
1027}
1028
1029pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1030
1031#[derive(Debug, Default)]
1033pub(crate) struct RegionMap {
1034 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1035}
1036
1037impl RegionMap {
1038 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1040 let regions = self.regions.read().unwrap();
1041 regions.contains_key(®ion_id)
1042 }
1043
1044 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1046 let mut regions = self.regions.write().unwrap();
1047 regions.insert(region.region_id, region);
1048 }
1049
1050 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1052 let regions = self.regions.read().unwrap();
1053 regions.get(®ion_id).cloned()
1054 }
1055
1056 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1060 let region = self
1061 .get_region(region_id)
1062 .context(RegionNotFoundSnafu { region_id })?;
1063 ensure!(
1064 region.is_writable(),
1065 RegionStateSnafu {
1066 region_id,
1067 state: region.state(),
1068 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1069 }
1070 );
1071 Ok(region)
1072 }
1073
1074 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1078 let region = self
1079 .get_region(region_id)
1080 .context(RegionNotFoundSnafu { region_id })?;
1081 ensure!(
1082 region.is_follower(),
1083 RegionStateSnafu {
1084 region_id,
1085 state: region.state(),
1086 expect: RegionRoleState::Follower,
1087 }
1088 );
1089
1090 Ok(region)
1091 }
1092
1093 pub(crate) fn get_region_or<F: OnFailure>(
1097 &self,
1098 region_id: RegionId,
1099 cb: &mut F,
1100 ) -> Option<MitoRegionRef> {
1101 match self
1102 .get_region(region_id)
1103 .context(RegionNotFoundSnafu { region_id })
1104 {
1105 Ok(region) => Some(region),
1106 Err(e) => {
1107 cb.on_failure(e);
1108 None
1109 }
1110 }
1111 }
1112
1113 pub(crate) fn writable_region_or<F: OnFailure>(
1117 &self,
1118 region_id: RegionId,
1119 cb: &mut F,
1120 ) -> Option<MitoRegionRef> {
1121 match self.writable_region(region_id) {
1122 Ok(region) => Some(region),
1123 Err(e) => {
1124 cb.on_failure(e);
1125 None
1126 }
1127 }
1128 }
1129
1130 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1134 let region = self.writable_region(region_id)?;
1135 if region.is_staging() {
1136 return Err(crate::error::RegionStateSnafu {
1137 region_id,
1138 state: region.state(),
1139 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1140 }
1141 .build());
1142 }
1143 Ok(region)
1144 }
1145
1146 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1150 let region = self
1151 .get_region(region_id)
1152 .context(RegionNotFoundSnafu { region_id })?;
1153 ensure!(
1154 region.is_staging(),
1155 RegionStateSnafu {
1156 region_id,
1157 state: region.state(),
1158 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1159 }
1160 );
1161 Ok(region)
1162 }
1163
1164 fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1169 let region = self
1170 .get_region(region_id)
1171 .context(RegionNotFoundSnafu { region_id })?;
1172 if region.is_flushable() {
1173 Ok(Some(region))
1174 } else {
1175 Ok(None)
1176 }
1177 }
1178
1179 pub(crate) fn flushable_region_or<F: OnFailure>(
1184 &self,
1185 region_id: RegionId,
1186 cb: &mut F,
1187 ) -> Option<MitoRegionRef> {
1188 match self.flushable_region(region_id) {
1189 Ok(region) => region,
1190 Err(e) => {
1191 cb.on_failure(e);
1192 None
1193 }
1194 }
1195 }
1196
1197 pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1199 let mut regions = self.regions.write().unwrap();
1200 regions.remove(®ion_id)
1201 }
1202
1203 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1205 let regions = self.regions.read().unwrap();
1206 regions.values().cloned().collect()
1207 }
1208
1209 pub(crate) fn clear(&self) {
1211 self.regions.write().unwrap().clear();
1212 }
1213}
1214
1215pub(crate) type RegionMapRef = Arc<RegionMap>;
1216
1217#[derive(Debug, Default)]
1219pub(crate) struct OpeningRegions {
1220 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1221}
1222
1223impl OpeningRegions {
1224 pub(crate) fn wait_for_opening_region(
1226 &self,
1227 region_id: RegionId,
1228 sender: OptionOutputTx,
1229 ) -> Option<OptionOutputTx> {
1230 let mut regions = self.regions.write().unwrap();
1231 match regions.entry(region_id) {
1232 Entry::Occupied(mut senders) => {
1233 senders.get_mut().push(sender);
1234 None
1235 }
1236 Entry::Vacant(_) => Some(sender),
1237 }
1238 }
1239
1240 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1242 let regions = self.regions.read().unwrap();
1243 regions.contains_key(®ion_id)
1244 }
1245
1246 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1248 let mut regions = self.regions.write().unwrap();
1249 regions.insert(region, vec![sender]);
1250 }
1251
1252 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1254 let mut regions = self.regions.write().unwrap();
1255 regions.remove(®ion_id).unwrap_or_default()
1256 }
1257
1258 #[cfg(test)]
1259 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1260 let regions = self.regions.read().unwrap();
1261 if let Some(senders) = regions.get(®ion_id) {
1262 senders.len()
1263 } else {
1264 0
1265 }
1266 }
1267}
1268
1269pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1270
1271#[derive(Debug, Default)]
1273pub(crate) struct CatchupRegions {
1274 regions: RwLock<HashSet<RegionId>>,
1275}
1276
1277impl CatchupRegions {
1278 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1280 let regions = self.regions.read().unwrap();
1281 regions.contains(®ion_id)
1282 }
1283
1284 pub(crate) fn insert_region(&self, region_id: RegionId) {
1286 let mut regions = self.regions.write().unwrap();
1287 regions.insert(region_id);
1288 }
1289
1290 pub(crate) fn remove_region(&self, region_id: RegionId) {
1292 let mut regions = self.regions.write().unwrap();
1293 regions.remove(®ion_id);
1294 }
1295}
1296
1297pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1298
1299#[derive(Default, Debug, Clone)]
1301pub struct ManifestStats {
1302 pub(crate) total_manifest_size: Arc<AtomicU64>,
1303 pub(crate) manifest_version: Arc<AtomicU64>,
1304 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1305}
1306
1307impl ManifestStats {
1308 fn total_manifest_size(&self) -> u64 {
1309 self.total_manifest_size.load(Ordering::Relaxed)
1310 }
1311
1312 fn manifest_version(&self) -> u64 {
1313 self.manifest_version.load(Ordering::Relaxed)
1314 }
1315
1316 fn file_removed_cnt(&self) -> u64 {
1317 self.file_removed_cnt.load(Ordering::Relaxed)
1318 }
1319}
1320
1321pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1323 match partition_expr_str {
1324 None => Ok(None),
1325 Some("") => Ok(None),
1326 Some(json_str) => {
1327 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1328 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1329 Ok(expr)
1330 }
1331 }
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336 use std::sync::atomic::AtomicU64;
1337 use std::sync::{Arc, Mutex};
1338
1339 use common_datasource::compression::CompressionType;
1340 use common_test_util::temp_dir::create_temp_dir;
1341 use crossbeam_utils::atomic::AtomicCell;
1342 use object_store::ObjectStore;
1343 use object_store::services::Fs;
1344 use store_api::logstore::provider::Provider;
1345 use store_api::region_engine::RegionRole;
1346 use store_api::region_request::PathType;
1347 use store_api::storage::RegionId;
1348
1349 use crate::access_layer::AccessLayer;
1350 use crate::manifest::action::RegionMetaActionList;
1351 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1352 use crate::region::{
1353 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1354 };
1355 use crate::sst::FormatType;
1356 use crate::sst::index::intermediate::IntermediateManager;
1357 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1358 use crate::test_util::scheduler_util::SchedulerEnv;
1359 use crate::test_util::version_util::VersionControlBuilder;
1360 use crate::time_provider::StdTimeProvider;
1361
1362 #[test]
1363 fn test_region_state_lock_free() {
1364 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1365 }
1366
1367 #[tokio::test]
1368 async fn test_set_region_state() {
1369 let env = SchedulerEnv::new().await;
1370 let builder = VersionControlBuilder::new();
1371 let version_control = Arc::new(builder.build());
1372 let manifest_ctx = env
1373 .mock_manifest_context(version_control.current().version.metadata.clone())
1374 .await;
1375
1376 let region_id = RegionId::new(1024, 0);
1377 manifest_ctx.set_role(RegionRole::Follower, region_id);
1379 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1380
1381 manifest_ctx.set_role(RegionRole::Leader, region_id);
1383 assert_eq!(
1384 manifest_ctx.state.load(),
1385 RegionRoleState::Leader(RegionLeaderState::Writable)
1386 );
1387
1388 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1390 assert_eq!(
1391 manifest_ctx.state.load(),
1392 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1393 );
1394
1395 manifest_ctx.set_role(RegionRole::Follower, region_id);
1397 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1398
1399 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1401 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1402
1403 manifest_ctx.set_role(RegionRole::Leader, region_id);
1405 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1406 assert_eq!(
1407 manifest_ctx.state.load(),
1408 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1409 );
1410
1411 manifest_ctx.set_role(RegionRole::Leader, region_id);
1413 assert_eq!(
1414 manifest_ctx.state.load(),
1415 RegionRoleState::Leader(RegionLeaderState::Writable)
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn test_staging_state_validation() {
1421 let env = SchedulerEnv::new().await;
1422 let builder = VersionControlBuilder::new();
1423 let version_control = Arc::new(builder.build());
1424
1425 let staging_ctx = {
1427 let manager = RegionManifestManager::new(
1428 version_control.current().version.metadata.clone(),
1429 0,
1430 RegionManifestOptions {
1431 manifest_dir: "".to_string(),
1432 object_store: env.access_layer.object_store().clone(),
1433 compress_type: CompressionType::Uncompressed,
1434 checkpoint_distance: 10,
1435 remove_file_options: Default::default(),
1436 manifest_cache: None,
1437 },
1438 FormatType::PrimaryKey,
1439 &Default::default(),
1440 )
1441 .await
1442 .unwrap();
1443 Arc::new(ManifestContext::new(
1444 manager,
1445 RegionRoleState::Leader(RegionLeaderState::Staging),
1446 ))
1447 };
1448
1449 assert_eq!(
1451 staging_ctx.current_state(),
1452 RegionRoleState::Leader(RegionLeaderState::Staging)
1453 );
1454
1455 let writable_ctx = env
1457 .mock_manifest_context(version_control.current().version.metadata.clone())
1458 .await;
1459
1460 assert_eq!(
1461 writable_ctx.current_state(),
1462 RegionRoleState::Leader(RegionLeaderState::Writable)
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn test_staging_state_transitions() {
1468 let builder = VersionControlBuilder::new();
1469 let version_control = Arc::new(builder.build());
1470 let metadata = version_control.current().version.metadata.clone();
1471
1472 let temp_dir = create_temp_dir("");
1474 let path_str = temp_dir.path().display().to_string();
1475 let fs_builder = Fs::default().root(&path_str);
1476 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1477
1478 let index_aux_path = temp_dir.path().join("index_aux");
1479 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1480 .await
1481 .unwrap();
1482 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1483 .await
1484 .unwrap();
1485
1486 let access_layer = Arc::new(AccessLayer::new(
1487 "",
1488 PathType::Bare,
1489 object_store,
1490 puffin_mgr,
1491 intm_mgr,
1492 ));
1493
1494 let manager = RegionManifestManager::new(
1495 metadata.clone(),
1496 0,
1497 RegionManifestOptions {
1498 manifest_dir: "".to_string(),
1499 object_store: access_layer.object_store().clone(),
1500 compress_type: CompressionType::Uncompressed,
1501 checkpoint_distance: 10,
1502 remove_file_options: Default::default(),
1503 manifest_cache: None,
1504 },
1505 FormatType::PrimaryKey,
1506 &Default::default(),
1507 )
1508 .await
1509 .unwrap();
1510
1511 let manifest_ctx = Arc::new(ManifestContext::new(
1512 manager,
1513 RegionRoleState::Leader(RegionLeaderState::Writable),
1514 ));
1515
1516 let region = MitoRegion {
1517 region_id: metadata.region_id,
1518 version_control,
1519 access_layer,
1520 manifest_ctx: manifest_ctx.clone(),
1521 file_purger: crate::test_util::new_noop_file_purger(),
1522 provider: Provider::noop_provider(),
1523 last_flush_millis: Default::default(),
1524 last_compaction_millis: Default::default(),
1525 time_provider: Arc::new(StdTimeProvider),
1526 topic_latest_entry_id: Default::default(),
1527 written_bytes: Arc::new(AtomicU64::new(0)),
1528 stats: ManifestStats::default(),
1529 staging_partition_expr: Mutex::new(None),
1530 };
1531
1532 assert_eq!(
1534 region.state(),
1535 RegionRoleState::Leader(RegionLeaderState::Writable)
1536 );
1537 assert!(!region.is_staging());
1538
1539 let mut manager = manifest_ctx.manifest_manager.write().await;
1541 region.set_staging(&mut manager).await.unwrap();
1542 drop(manager);
1543 assert_eq!(
1544 region.state(),
1545 RegionRoleState::Leader(RegionLeaderState::Staging)
1546 );
1547 assert!(region.is_staging());
1548
1549 region.exit_staging().unwrap();
1551 assert_eq!(
1552 region.state(),
1553 RegionRoleState::Leader(RegionLeaderState::Writable)
1554 );
1555 assert!(!region.is_staging());
1556
1557 {
1559 let manager = manifest_ctx.manifest_manager.write().await;
1561 let dummy_actions = RegionMetaActionList::new(vec![]);
1562 let dummy_bytes = dummy_actions.encode().unwrap();
1563
1564 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1566 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1567 drop(manager);
1568
1569 let manager = manifest_ctx.manifest_manager.read().await;
1571 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1572 assert_eq!(
1573 dirty_manifests.len(),
1574 2,
1575 "Should have 2 dirty staging files"
1576 );
1577 drop(manager);
1578
1579 let mut manager = manifest_ctx.manifest_manager.write().await;
1581 region.set_staging(&mut manager).await.unwrap();
1582 drop(manager);
1583
1584 let manager = manifest_ctx.manifest_manager.read().await;
1586 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1587 assert_eq!(
1588 cleaned_manifests.len(),
1589 0,
1590 "Dirty staging files should be cleaned up"
1591 );
1592 drop(manager);
1593
1594 region.exit_staging().unwrap();
1596 }
1597
1598 let mut manager = manifest_ctx.manifest_manager.write().await;
1600 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1602 let mut manager = manifest_ctx.manifest_manager.write().await;
1603 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1605 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1608}