1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44 UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::memtable::MemtableBuilderRef;
51use crate::region::version::{VersionControlRef, VersionRef};
52use crate::request::{OnFailure, OptionOutputTx};
53use crate::sst::FormatType;
54use crate::sst::file_purger::FilePurgerRef;
55use crate::sst::location::{index_file_path, sst_file_path};
56use crate::time_provider::TimeProviderRef;
57
58const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
60
61#[derive(Debug)]
63pub struct RegionUsage {
64 pub region_id: RegionId,
65 pub wal_usage: u64,
66 pub sst_usage: u64,
67 pub manifest_usage: u64,
68}
69
70impl RegionUsage {
71 pub fn disk_usage(&self) -> u64 {
72 self.wal_usage + self.sst_usage + self.manifest_usage
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum RegionLeaderState {
78 Writable,
80 Staging,
82 Altering,
84 Dropping,
86 Truncating,
88 Editing,
90 Downgrading,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum RegionRoleState {
96 Leader(RegionLeaderState),
97 Follower,
98}
99
100#[derive(Debug)]
106pub struct MitoRegion {
107 pub(crate) region_id: RegionId,
112
113 pub(crate) version_control: VersionControlRef,
117 pub(crate) access_layer: AccessLayerRef,
119 pub(crate) manifest_ctx: ManifestContextRef,
121 pub(crate) file_purger: FilePurgerRef,
123 pub(crate) provider: Provider,
125 last_flush_millis: AtomicI64,
127 last_compaction_millis: AtomicI64,
129 time_provider: TimeProviderRef,
131 pub(crate) topic_latest_entry_id: AtomicU64,
141 pub(crate) written_bytes: Arc<AtomicU64>,
143 pub(crate) memtable_builder: MemtableBuilderRef,
145 pub(crate) sst_format: FormatType,
147 stats: ManifestStats,
149}
150
151pub type MitoRegionRef = Arc<MitoRegion>;
152
153impl MitoRegion {
154 pub(crate) async fn stop(&self) {
156 self.manifest_ctx
157 .manifest_manager
158 .write()
159 .await
160 .stop()
161 .await;
162
163 info!(
164 "Stopped region manifest manager, region_id: {}",
165 self.region_id
166 );
167 }
168
169 pub(crate) fn metadata(&self) -> RegionMetadataRef {
171 let version_data = self.version_control.current();
172 version_data.version.metadata.clone()
173 }
174
175 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
177 let version_data = self.version_control.current();
178 version_data.version.metadata.primary_key_encoding
179 }
180
181 pub(crate) fn version(&self) -> VersionRef {
183 let version_data = self.version_control.current();
184 version_data.version
185 }
186
187 pub(crate) fn last_flush_millis(&self) -> i64 {
189 self.last_flush_millis.load(Ordering::Relaxed)
190 }
191
192 pub(crate) fn update_flush_millis(&self) {
194 let now = self.time_provider.current_time_millis();
195 self.last_flush_millis.store(now, Ordering::Relaxed);
196 }
197
198 pub(crate) fn last_compaction_millis(&self) -> i64 {
200 self.last_compaction_millis.load(Ordering::Relaxed)
201 }
202
203 pub(crate) fn sst_format(&self) -> FormatType {
205 self.sst_format
206 }
207
208 pub(crate) fn update_compaction_millis(&self) {
210 let now = self.time_provider.current_time_millis();
211 self.last_compaction_millis.store(now, Ordering::Relaxed);
212 }
213
214 pub(crate) fn table_dir(&self) -> &str {
216 self.access_layer.table_dir()
217 }
218
219 pub(crate) fn is_writable(&self) -> bool {
221 matches!(
222 self.manifest_ctx.state.load(),
223 RegionRoleState::Leader(RegionLeaderState::Writable)
224 | RegionRoleState::Leader(RegionLeaderState::Staging)
225 )
226 }
227
228 pub(crate) fn is_flushable(&self) -> bool {
230 matches!(
231 self.manifest_ctx.state.load(),
232 RegionRoleState::Leader(RegionLeaderState::Writable)
233 | RegionRoleState::Leader(RegionLeaderState::Staging)
234 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
235 )
236 }
237
238 pub(crate) fn should_abort_index(&self) -> bool {
240 matches!(
241 self.manifest_ctx.state.load(),
242 RegionRoleState::Follower
243 | RegionRoleState::Leader(RegionLeaderState::Dropping)
244 | RegionRoleState::Leader(RegionLeaderState::Truncating)
245 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
246 | RegionRoleState::Leader(RegionLeaderState::Staging)
247 )
248 }
249
250 pub(crate) fn is_downgrading(&self) -> bool {
252 matches!(
253 self.manifest_ctx.state.load(),
254 RegionRoleState::Leader(RegionLeaderState::Downgrading)
255 )
256 }
257
258 #[allow(dead_code)]
260 pub(crate) fn is_staging(&self) -> bool {
261 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
262 }
263
264 pub fn region_id(&self) -> RegionId {
265 self.region_id
266 }
267
268 pub fn find_committed_sequence(&self) -> SequenceNumber {
269 self.version_control.committed_sequence()
270 }
271
272 pub fn is_follower(&self) -> bool {
274 self.manifest_ctx.state.load() == RegionRoleState::Follower
275 }
276
277 pub(crate) fn state(&self) -> RegionRoleState {
279 self.manifest_ctx.state.load()
280 }
281
282 pub(crate) fn set_role(&self, next_role: RegionRole) {
284 self.manifest_ctx.set_role(next_role, self.region_id);
285 }
286
287 pub(crate) fn set_altering(&self) -> Result<()> {
290 self.compare_exchange_state(
291 RegionLeaderState::Writable,
292 RegionRoleState::Leader(RegionLeaderState::Altering),
293 )
294 }
295
296 pub(crate) fn set_dropping(&self) -> Result<()> {
299 self.compare_exchange_state(
300 RegionLeaderState::Writable,
301 RegionRoleState::Leader(RegionLeaderState::Dropping),
302 )
303 }
304
305 pub(crate) fn set_truncating(&self) -> Result<()> {
308 self.compare_exchange_state(
309 RegionLeaderState::Writable,
310 RegionRoleState::Leader(RegionLeaderState::Truncating),
311 )
312 }
313
314 pub(crate) fn set_editing(&self) -> Result<()> {
317 self.compare_exchange_state(
318 RegionLeaderState::Writable,
319 RegionRoleState::Leader(RegionLeaderState::Editing),
320 )
321 }
322
323 pub(crate) async fn set_staging(
329 &self,
330 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
331 ) -> Result<()> {
332 manager.store().clear_staging_manifests().await?;
333
334 self.compare_exchange_state(
335 RegionLeaderState::Writable,
336 RegionRoleState::Leader(RegionLeaderState::Staging),
337 )
338 }
339
340 fn exit_staging(&self) -> Result<()> {
345 self.compare_exchange_state(
346 RegionLeaderState::Staging,
347 RegionRoleState::Leader(RegionLeaderState::Writable),
348 )
349 }
350
351 pub(crate) async fn set_role_state_gracefully(
353 &self,
354 state: SettableRegionRoleState,
355 ) -> Result<()> {
356 let mut manager = self.manifest_ctx.manifest_manager.write().await;
357 let current_state = self.state();
358
359 match state {
360 SettableRegionRoleState::Leader => {
361 match current_state {
364 RegionRoleState::Leader(RegionLeaderState::Staging) => {
365 info!("Exiting staging mode for region {}", self.region_id);
366 self.exit_staging_on_success(&mut manager).await?;
368 }
369 RegionRoleState::Leader(RegionLeaderState::Writable) => {
370 info!("Region {} already in normal leader mode", self.region_id);
372 }
373 _ => {
374 return Err(RegionStateSnafu {
376 region_id: self.region_id,
377 state: current_state,
378 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
379 }
380 .build());
381 }
382 }
383 }
384
385 SettableRegionRoleState::StagingLeader => {
386 match current_state {
389 RegionRoleState::Leader(RegionLeaderState::Writable) => {
390 info!("Entering staging mode for region {}", self.region_id);
391 self.set_staging(&mut manager).await?;
392 }
393 RegionRoleState::Leader(RegionLeaderState::Staging) => {
394 info!("Region {} already in staging mode", self.region_id);
396 }
397 _ => {
398 return Err(RegionStateSnafu {
399 region_id: self.region_id,
400 state: current_state,
401 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
402 }
403 .build());
404 }
405 }
406 }
407
408 SettableRegionRoleState::Follower => {
409 match current_state {
411 RegionRoleState::Leader(RegionLeaderState::Staging) => {
412 info!(
413 "Exiting staging and demoting region {} to follower",
414 self.region_id
415 );
416 self.exit_staging()?;
417 self.set_role(RegionRole::Follower);
418 }
419 RegionRoleState::Leader(_) => {
420 info!("Demoting region {} from leader to follower", self.region_id);
421 self.set_role(RegionRole::Follower);
422 }
423 RegionRoleState::Follower => {
424 info!("Region {} already in follower mode", self.region_id);
426 }
427 }
428 }
429
430 SettableRegionRoleState::DowngradingLeader => {
431 match current_state {
433 RegionRoleState::Leader(RegionLeaderState::Staging) => {
434 info!(
435 "Exiting staging and entering downgrade for region {}",
436 self.region_id
437 );
438 self.exit_staging()?;
439 self.set_role(RegionRole::DowngradingLeader);
440 }
441 RegionRoleState::Leader(RegionLeaderState::Writable) => {
442 info!("Starting downgrade for region {}", self.region_id);
443 self.set_role(RegionRole::DowngradingLeader);
444 }
445 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
446 info!("Region {} already in downgrading mode", self.region_id);
448 }
449 _ => {
450 warn!(
451 "Cannot start downgrade for region {} from state {:?}",
452 self.region_id, current_state
453 );
454 }
455 }
456 }
457 }
458
459 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
461 let manifest_meta = &manager.manifest().metadata;
463 let current_meta = &self.version().metadata;
464 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
465 let action = RegionMetaAction::Change(RegionChange {
466 metadata: current_meta.clone(),
467 sst_format: self.sst_format(),
468 });
469 let result = manager
470 .update(
471 RegionMetaActionList::with_action(action),
472 RegionRoleState::Leader(RegionLeaderState::Writable),
473 )
474 .await;
475
476 match result {
477 Ok(version) => {
478 info!(
479 "Successfully persisted backfilled metadata for region {}, version: {}",
480 self.region_id, version
481 );
482 }
483 Err(e) => {
484 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
485 }
486 }
487 }
488 }
489
490 drop(manager);
491
492 Ok(())
493 }
494
495 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
498 if let Err(e) = self
499 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
500 {
501 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
502 }
503 }
504
505 pub(crate) fn region_statistic(&self) -> RegionStatistic {
507 let version = self.version();
508 let memtables = &version.memtables;
509 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
510
511 let sst_usage = version.ssts.sst_usage();
512 let index_usage = version.ssts.index_usage();
513 let flushed_entry_id = version.flushed_entry_id;
514
515 let wal_usage = self.estimated_wal_usage(memtable_usage);
516 let manifest_usage = self.stats.total_manifest_size();
517 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
518 let num_files = version.ssts.num_files();
519 let manifest_version = self.stats.manifest_version();
520
521 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
522 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
523
524 RegionStatistic {
525 num_rows,
526 memtable_size: memtable_usage,
527 wal_size: wal_usage,
528 manifest_size: manifest_usage,
529 sst_size: sst_usage,
530 sst_num: num_files,
531 index_size: index_usage,
532 manifest: RegionManifestInfo::Mito {
533 manifest_version,
534 flushed_entry_id,
535 },
536 data_topic_latest_entry_id: topic_latest_entry_id,
537 metadata_topic_latest_entry_id: topic_latest_entry_id,
538 written_bytes,
539 }
540 }
541
542 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
545 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
546 }
547
548 fn compare_exchange_state(
551 &self,
552 expect: RegionLeaderState,
553 state: RegionRoleState,
554 ) -> Result<()> {
555 self.manifest_ctx
556 .state
557 .compare_exchange(RegionRoleState::Leader(expect), state)
558 .map_err(|actual| {
559 RegionStateSnafu {
560 region_id: self.region_id,
561 state: actual,
562 expect: RegionRoleState::Leader(expect),
563 }
564 .build()
565 })?;
566 Ok(())
567 }
568
569 pub fn access_layer(&self) -> AccessLayerRef {
570 self.access_layer.clone()
571 }
572
573 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
575 let table_dir = self.table_dir();
576 let path_type = self.access_layer.path_type();
577
578 let visible_ssts = self
579 .version()
580 .ssts
581 .levels()
582 .iter()
583 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
584 .collect::<HashSet<_>>();
585
586 self.manifest_ctx
587 .manifest()
588 .await
589 .files
590 .values()
591 .map(|meta| {
592 let region_id = self.region_id;
593 let origin_region_id = meta.region_id;
594 let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
595 {
596 let index_file_path =
597 index_file_path(table_dir, meta.index_file_id(), path_type);
598 (
599 Some(meta.index_file_id().file_id().to_string()),
600 Some(index_file_path),
601 Some(meta.index_file_size),
602 )
603 } else {
604 (None, None, None)
605 };
606 let visible = visible_ssts.contains(&meta.file_id);
607 ManifestSstEntry {
608 table_dir: table_dir.to_string(),
609 region_id,
610 table_id: region_id.table_id(),
611 region_number: region_id.region_number(),
612 region_group: region_id.region_group(),
613 region_sequence: region_id.region_sequence(),
614 file_id: meta.file_id.to_string(),
615 index_file_id,
616 level: meta.level,
617 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
618 file_size: meta.file_size,
619 index_file_path,
620 index_file_size,
621 num_rows: meta.num_rows,
622 num_row_groups: meta.num_row_groups,
623 num_series: Some(meta.num_series),
624 min_ts: meta.time_range.0,
625 max_ts: meta.time_range.1,
626 sequence: meta.sequence.map(|s| s.get()),
627 origin_region_id,
628 node_id: None,
629 visible,
630 }
631 })
632 .collect()
633 }
634
635 pub(crate) async fn exit_staging_on_success(
637 &self,
638 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
639 ) -> Result<()> {
640 let current_state = self.manifest_ctx.current_state();
641 ensure!(
642 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
643 RegionStateSnafu {
644 region_id: self.region_id,
645 state: current_state,
646 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
647 }
648 );
649
650 let merged_actions = match manager.merge_staged_actions(current_state).await? {
652 Some(actions) => actions,
653 None => {
654 info!(
655 "No staged manifests to merge for region {}, exiting staging mode without changes",
656 self.region_id
657 );
658 self.exit_staging()?;
660 return Ok(());
661 }
662 };
663
664 let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
667 let new_version = manager.update(merged_actions.clone(), target_state).await?;
668
669 info!(
670 "Successfully submitted merged staged manifests for region {}, new version: {}",
671 self.region_id, new_version
672 );
673
674 let merged_edit = merged_actions.into_region_edit();
676 self.version_control
677 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
678
679 manager.store().clear_staging_manifests().await?;
681 self.exit_staging()?;
682
683 Ok(())
684 }
685}
686
687#[derive(Debug)]
689pub(crate) struct ManifestContext {
690 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
692 state: AtomicCell<RegionRoleState>,
695}
696
697impl ManifestContext {
698 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
699 ManifestContext {
700 manifest_manager: tokio::sync::RwLock::new(manager),
701 state: AtomicCell::new(state),
702 }
703 }
704
705 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
706 self.manifest_manager
707 .read()
708 .await
709 .manifest()
710 .manifest_version
711 }
712
713 pub(crate) async fn has_update(&self) -> Result<bool> {
714 self.manifest_manager.read().await.has_update().await
715 }
716
717 pub(crate) fn current_state(&self) -> RegionRoleState {
719 self.state.load()
720 }
721
722 pub(crate) async fn install_manifest_to(
728 &self,
729 version: ManifestVersion,
730 ) -> Result<Arc<RegionManifest>> {
731 let mut manager = self.manifest_manager.write().await;
732 manager.install_manifest_to(version).await?;
733
734 Ok(manager.manifest())
735 }
736
737 pub(crate) async fn update_manifest(
739 &self,
740 expect_state: RegionLeaderState,
741 action_list: RegionMetaActionList,
742 ) -> Result<ManifestVersion> {
743 let mut manager = self.manifest_manager.write().await;
745 let manifest = manager.manifest();
747 let current_state = self.state.load();
750
751 if expect_state != RegionLeaderState::Downgrading {
756 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
757 info!(
758 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
759 manifest.metadata.region_id, expect_state
760 );
761 }
762 ensure!(
763 current_state == RegionRoleState::Leader(expect_state)
764 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
765 UpdateManifestSnafu {
766 region_id: manifest.metadata.region_id,
767 state: current_state,
768 }
769 );
770 } else {
771 ensure!(
772 current_state == RegionRoleState::Leader(expect_state),
773 RegionStateSnafu {
774 region_id: manifest.metadata.region_id,
775 state: current_state,
776 expect: RegionRoleState::Leader(expect_state),
777 }
778 );
779 }
780
781 for action in &action_list.actions {
782 let RegionMetaAction::Edit(edit) = &action else {
784 continue;
785 };
786
787 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
789 continue;
790 };
791
792 if let Some(flushed_entry_id) = edit.flushed_entry_id {
794 ensure!(
795 truncated_entry_id < flushed_entry_id,
796 RegionTruncatedSnafu {
797 region_id: manifest.metadata.region_id,
798 }
799 );
800 }
801
802 if !edit.files_to_remove.is_empty() {
804 for file in &edit.files_to_remove {
806 ensure!(
807 manifest.files.contains_key(&file.file_id),
808 RegionTruncatedSnafu {
809 region_id: manifest.metadata.region_id,
810 }
811 );
812 }
813 }
814 }
815
816 let version = manager.update(action_list, current_state).await.inspect_err(
818 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
819 )?;
820
821 if self.state.load() == RegionRoleState::Follower {
822 warn!(
823 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
824 manifest.metadata.region_id
825 );
826 }
827
828 Ok(version)
829 }
830
831 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
853 match next_role {
854 RegionRole::Follower => {
855 match self.state.fetch_update(|state| {
856 if !matches!(state, RegionRoleState::Follower) {
857 Some(RegionRoleState::Follower)
858 } else {
859 None
860 }
861 }) {
862 Ok(state) => info!(
863 "Convert region {} to follower, previous role state: {:?}",
864 region_id, state
865 ),
866 Err(state) => {
867 if state != RegionRoleState::Follower {
868 warn!(
869 "Failed to convert region {} to follower, current role state: {:?}",
870 region_id, state
871 )
872 }
873 }
874 }
875 }
876 RegionRole::Leader => {
877 match self.state.fetch_update(|state| {
878 if matches!(
879 state,
880 RegionRoleState::Follower
881 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
882 ) {
883 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
884 } else {
885 None
886 }
887 }) {
888 Ok(state) => info!(
889 "Convert region {} to leader, previous role state: {:?}",
890 region_id, state
891 ),
892 Err(state) => {
893 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
894 warn!(
895 "Failed to convert region {} to leader, current role state: {:?}",
896 region_id, state
897 )
898 }
899 }
900 }
901 }
902 RegionRole::DowngradingLeader => {
903 match self.state.compare_exchange(
904 RegionRoleState::Leader(RegionLeaderState::Writable),
905 RegionRoleState::Leader(RegionLeaderState::Downgrading),
906 ) {
907 Ok(state) => info!(
908 "Convert region {} to downgrading region, previous role state: {:?}",
909 region_id, state
910 ),
911 Err(state) => {
912 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
913 warn!(
914 "Failed to convert region {} to downgrading leader, current role state: {:?}",
915 region_id, state
916 )
917 }
918 }
919 }
920 }
921 }
922 }
923
924 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
925 self.manifest_manager.read().await.manifest()
926 }
927}
928
929pub(crate) type ManifestContextRef = Arc<ManifestContext>;
930
931#[derive(Debug, Default)]
933pub(crate) struct RegionMap {
934 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
935}
936
937impl RegionMap {
938 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
940 let regions = self.regions.read().unwrap();
941 regions.contains_key(®ion_id)
942 }
943
944 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
946 let mut regions = self.regions.write().unwrap();
947 regions.insert(region.region_id, region);
948 }
949
950 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
952 let regions = self.regions.read().unwrap();
953 regions.get(®ion_id).cloned()
954 }
955
956 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
960 let region = self
961 .get_region(region_id)
962 .context(RegionNotFoundSnafu { region_id })?;
963 ensure!(
964 region.is_writable(),
965 RegionStateSnafu {
966 region_id,
967 state: region.state(),
968 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
969 }
970 );
971 Ok(region)
972 }
973
974 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
978 let region = self
979 .get_region(region_id)
980 .context(RegionNotFoundSnafu { region_id })?;
981 ensure!(
982 region.is_follower(),
983 RegionStateSnafu {
984 region_id,
985 state: region.state(),
986 expect: RegionRoleState::Follower,
987 }
988 );
989
990 Ok(region)
991 }
992
993 pub(crate) fn get_region_or<F: OnFailure>(
997 &self,
998 region_id: RegionId,
999 cb: &mut F,
1000 ) -> Option<MitoRegionRef> {
1001 match self
1002 .get_region(region_id)
1003 .context(RegionNotFoundSnafu { region_id })
1004 {
1005 Ok(region) => Some(region),
1006 Err(e) => {
1007 cb.on_failure(e);
1008 None
1009 }
1010 }
1011 }
1012
1013 pub(crate) fn writable_region_or<F: OnFailure>(
1017 &self,
1018 region_id: RegionId,
1019 cb: &mut F,
1020 ) -> Option<MitoRegionRef> {
1021 match self.writable_region(region_id) {
1022 Ok(region) => Some(region),
1023 Err(e) => {
1024 cb.on_failure(e);
1025 None
1026 }
1027 }
1028 }
1029
1030 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1034 let region = self.writable_region(region_id)?;
1035 if region.is_staging() {
1036 return Err(crate::error::RegionStateSnafu {
1037 region_id,
1038 state: region.state(),
1039 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1040 }
1041 .build());
1042 }
1043 Ok(region)
1044 }
1045
1046 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1050 let region = self
1051 .get_region(region_id)
1052 .context(RegionNotFoundSnafu { region_id })?;
1053 ensure!(
1054 region.is_flushable(),
1055 FlushableRegionStateSnafu {
1056 region_id,
1057 state: region.state(),
1058 }
1059 );
1060 Ok(region)
1061 }
1062
1063 pub(crate) fn flushable_region_or<F: OnFailure>(
1067 &self,
1068 region_id: RegionId,
1069 cb: &mut F,
1070 ) -> Option<MitoRegionRef> {
1071 match self.flushable_region(region_id) {
1072 Ok(region) => Some(region),
1073 Err(e) => {
1074 cb.on_failure(e);
1075 None
1076 }
1077 }
1078 }
1079
1080 pub(crate) fn remove_region(&self, region_id: RegionId) {
1082 let mut regions = self.regions.write().unwrap();
1083 regions.remove(®ion_id);
1084 }
1085
1086 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1088 let regions = self.regions.read().unwrap();
1089 regions.values().cloned().collect()
1090 }
1091
1092 pub(crate) fn clear(&self) {
1094 self.regions.write().unwrap().clear();
1095 }
1096}
1097
1098pub(crate) type RegionMapRef = Arc<RegionMap>;
1099
1100#[derive(Debug, Default)]
1102pub(crate) struct OpeningRegions {
1103 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1104}
1105
1106impl OpeningRegions {
1107 pub(crate) fn wait_for_opening_region(
1109 &self,
1110 region_id: RegionId,
1111 sender: OptionOutputTx,
1112 ) -> Option<OptionOutputTx> {
1113 let mut regions = self.regions.write().unwrap();
1114 match regions.entry(region_id) {
1115 Entry::Occupied(mut senders) => {
1116 senders.get_mut().push(sender);
1117 None
1118 }
1119 Entry::Vacant(_) => Some(sender),
1120 }
1121 }
1122
1123 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1125 let regions = self.regions.read().unwrap();
1126 regions.contains_key(®ion_id)
1127 }
1128
1129 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1131 let mut regions = self.regions.write().unwrap();
1132 regions.insert(region, vec![sender]);
1133 }
1134
1135 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1137 let mut regions = self.regions.write().unwrap();
1138 regions.remove(®ion_id).unwrap_or_default()
1139 }
1140
1141 #[cfg(test)]
1142 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1143 let regions = self.regions.read().unwrap();
1144 if let Some(senders) = regions.get(®ion_id) {
1145 senders.len()
1146 } else {
1147 0
1148 }
1149 }
1150}
1151
1152pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1153
1154#[derive(Debug, Default)]
1156pub(crate) struct CatchupRegions {
1157 regions: RwLock<HashSet<RegionId>>,
1158}
1159
1160impl CatchupRegions {
1161 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1163 let regions = self.regions.read().unwrap();
1164 regions.contains(®ion_id)
1165 }
1166
1167 pub(crate) fn insert_region(&self, region_id: RegionId) {
1169 let mut regions = self.regions.write().unwrap();
1170 regions.insert(region_id);
1171 }
1172
1173 pub(crate) fn remove_region(&self, region_id: RegionId) {
1175 let mut regions = self.regions.write().unwrap();
1176 regions.remove(®ion_id);
1177 }
1178}
1179
1180pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1181
1182#[derive(Default, Debug, Clone)]
1184pub(crate) struct ManifestStats {
1185 total_manifest_size: Arc<AtomicU64>,
1186 manifest_version: Arc<AtomicU64>,
1187}
1188
1189impl ManifestStats {
1190 fn total_manifest_size(&self) -> u64 {
1191 self.total_manifest_size.load(Ordering::Relaxed)
1192 }
1193
1194 fn manifest_version(&self) -> u64 {
1195 self.manifest_version.load(Ordering::Relaxed)
1196 }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 use std::sync::Arc;
1202 use std::sync::atomic::AtomicU64;
1203
1204 use common_datasource::compression::CompressionType;
1205 use common_test_util::temp_dir::create_temp_dir;
1206 use crossbeam_utils::atomic::AtomicCell;
1207 use object_store::ObjectStore;
1208 use object_store::services::Fs;
1209 use store_api::logstore::provider::Provider;
1210 use store_api::region_engine::RegionRole;
1211 use store_api::region_request::PathType;
1212 use store_api::storage::RegionId;
1213
1214 use crate::access_layer::AccessLayer;
1215 use crate::manifest::action::RegionMetaActionList;
1216 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1217 use crate::region::{
1218 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1219 };
1220 use crate::sst::FormatType;
1221 use crate::sst::index::intermediate::IntermediateManager;
1222 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1223 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1224 use crate::test_util::scheduler_util::SchedulerEnv;
1225 use crate::test_util::version_util::VersionControlBuilder;
1226 use crate::time_provider::StdTimeProvider;
1227
1228 #[test]
1229 fn test_region_state_lock_free() {
1230 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1231 }
1232
1233 #[tokio::test]
1234 async fn test_set_region_state() {
1235 let env = SchedulerEnv::new().await;
1236 let builder = VersionControlBuilder::new();
1237 let version_control = Arc::new(builder.build());
1238 let manifest_ctx = env
1239 .mock_manifest_context(version_control.current().version.metadata.clone())
1240 .await;
1241
1242 let region_id = RegionId::new(1024, 0);
1243 manifest_ctx.set_role(RegionRole::Follower, region_id);
1245 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1246
1247 manifest_ctx.set_role(RegionRole::Leader, region_id);
1249 assert_eq!(
1250 manifest_ctx.state.load(),
1251 RegionRoleState::Leader(RegionLeaderState::Writable)
1252 );
1253
1254 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1256 assert_eq!(
1257 manifest_ctx.state.load(),
1258 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1259 );
1260
1261 manifest_ctx.set_role(RegionRole::Follower, region_id);
1263 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1264
1265 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1267 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1268
1269 manifest_ctx.set_role(RegionRole::Leader, region_id);
1271 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1272 assert_eq!(
1273 manifest_ctx.state.load(),
1274 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1275 );
1276
1277 manifest_ctx.set_role(RegionRole::Leader, region_id);
1279 assert_eq!(
1280 manifest_ctx.state.load(),
1281 RegionRoleState::Leader(RegionLeaderState::Writable)
1282 );
1283 }
1284
1285 #[tokio::test]
1286 async fn test_staging_state_validation() {
1287 let env = SchedulerEnv::new().await;
1288 let builder = VersionControlBuilder::new();
1289 let version_control = Arc::new(builder.build());
1290
1291 let staging_ctx = {
1293 let manager = RegionManifestManager::new(
1294 version_control.current().version.metadata.clone(),
1295 0,
1296 RegionManifestOptions {
1297 manifest_dir: "".to_string(),
1298 object_store: env.access_layer.object_store().clone(),
1299 compress_type: CompressionType::Uncompressed,
1300 checkpoint_distance: 10,
1301 remove_file_options: Default::default(),
1302 },
1303 Default::default(),
1304 Default::default(),
1305 FormatType::PrimaryKey,
1306 )
1307 .await
1308 .unwrap();
1309 Arc::new(ManifestContext::new(
1310 manager,
1311 RegionRoleState::Leader(RegionLeaderState::Staging),
1312 ))
1313 };
1314
1315 assert_eq!(
1317 staging_ctx.current_state(),
1318 RegionRoleState::Leader(RegionLeaderState::Staging)
1319 );
1320
1321 let writable_ctx = env
1323 .mock_manifest_context(version_control.current().version.metadata.clone())
1324 .await;
1325
1326 assert_eq!(
1327 writable_ctx.current_state(),
1328 RegionRoleState::Leader(RegionLeaderState::Writable)
1329 );
1330 }
1331
1332 #[tokio::test]
1333 async fn test_staging_state_transitions() {
1334 let builder = VersionControlBuilder::new();
1335 let version_control = Arc::new(builder.build());
1336 let metadata = version_control.current().version.metadata.clone();
1337
1338 let temp_dir = create_temp_dir("");
1340 let path_str = temp_dir.path().display().to_string();
1341 let fs_builder = Fs::default().root(&path_str);
1342 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1343
1344 let index_aux_path = temp_dir.path().join("index_aux");
1345 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1346 .await
1347 .unwrap();
1348 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1349 .await
1350 .unwrap();
1351
1352 let access_layer = Arc::new(AccessLayer::new(
1353 "",
1354 PathType::Bare,
1355 object_store,
1356 puffin_mgr,
1357 intm_mgr,
1358 ));
1359
1360 let manager = RegionManifestManager::new(
1361 metadata.clone(),
1362 0,
1363 RegionManifestOptions {
1364 manifest_dir: "".to_string(),
1365 object_store: access_layer.object_store().clone(),
1366 compress_type: CompressionType::Uncompressed,
1367 checkpoint_distance: 10,
1368 remove_file_options: Default::default(),
1369 },
1370 Default::default(),
1371 Default::default(),
1372 FormatType::PrimaryKey,
1373 )
1374 .await
1375 .unwrap();
1376
1377 let manifest_ctx = Arc::new(ManifestContext::new(
1378 manager,
1379 RegionRoleState::Leader(RegionLeaderState::Writable),
1380 ));
1381
1382 let region = MitoRegion {
1383 region_id: metadata.region_id,
1384 version_control,
1385 access_layer,
1386 manifest_ctx: manifest_ctx.clone(),
1387 file_purger: crate::test_util::new_noop_file_purger(),
1388 provider: Provider::noop_provider(),
1389 last_flush_millis: Default::default(),
1390 last_compaction_millis: Default::default(),
1391 time_provider: Arc::new(StdTimeProvider),
1392 topic_latest_entry_id: Default::default(),
1393 written_bytes: Arc::new(AtomicU64::new(0)),
1394 memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1395 sst_format: FormatType::PrimaryKey,
1396 stats: ManifestStats::default(),
1397 };
1398
1399 assert_eq!(
1401 region.state(),
1402 RegionRoleState::Leader(RegionLeaderState::Writable)
1403 );
1404 assert!(!region.is_staging());
1405
1406 let mut manager = manifest_ctx.manifest_manager.write().await;
1408 region.set_staging(&mut manager).await.unwrap();
1409 drop(manager);
1410 assert_eq!(
1411 region.state(),
1412 RegionRoleState::Leader(RegionLeaderState::Staging)
1413 );
1414 assert!(region.is_staging());
1415
1416 region.exit_staging().unwrap();
1418 assert_eq!(
1419 region.state(),
1420 RegionRoleState::Leader(RegionLeaderState::Writable)
1421 );
1422 assert!(!region.is_staging());
1423
1424 {
1426 let manager = manifest_ctx.manifest_manager.write().await;
1428 let dummy_actions = RegionMetaActionList::new(vec![]);
1429 let dummy_bytes = dummy_actions.encode().unwrap();
1430
1431 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1433 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1434 drop(manager);
1435
1436 let manager = manifest_ctx.manifest_manager.read().await;
1438 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1439 assert_eq!(
1440 dirty_manifests.len(),
1441 2,
1442 "Should have 2 dirty staging files"
1443 );
1444 drop(manager);
1445
1446 let mut manager = manifest_ctx.manifest_manager.write().await;
1448 region.set_staging(&mut manager).await.unwrap();
1449 drop(manager);
1450
1451 let manager = manifest_ctx.manifest_manager.read().await;
1453 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1454 assert_eq!(
1455 cleaned_manifests.len(),
1456 0,
1457 "Dirty staging files should be cleaned up"
1458 );
1459 drop(manager);
1460
1461 region.exit_staging().unwrap();
1463 }
1464
1465 let mut manager = manifest_ctx.manifest_manager.write().await;
1467 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1469 let mut manager = manifest_ctx.manifest_manager.write().await;
1470 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1472 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1475}