1pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::manifest::ManifestVersion;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{
34 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
35};
36use store_api::storage::RegionId;
37
38use crate::access_layer::AccessLayerRef;
39use crate::error::{
40 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
41 UpdateManifestSnafu,
42};
43use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::RegionManifestManager;
45use crate::memtable::MemtableBuilderRef;
46use crate::region::version::{VersionControlRef, VersionRef};
47use crate::request::{OnFailure, OptionOutputTx};
48use crate::sst::file_purger::FilePurgerRef;
49use crate::time_provider::TimeProviderRef;
50
51const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
53
54#[derive(Debug)]
56pub struct RegionUsage {
57 pub region_id: RegionId,
58 pub wal_usage: u64,
59 pub sst_usage: u64,
60 pub manifest_usage: u64,
61}
62
63impl RegionUsage {
64 pub fn disk_usage(&self) -> u64 {
65 self.wal_usage + self.sst_usage + self.manifest_usage
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum RegionLeaderState {
71 Writable,
73 Altering,
75 Dropping,
77 Truncating,
79 Editing,
81 Downgrading,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RegionRoleState {
87 Leader(RegionLeaderState),
88 Follower,
89}
90
91#[derive(Debug)]
97pub(crate) struct MitoRegion {
98 pub(crate) region_id: RegionId,
103
104 pub(crate) version_control: VersionControlRef,
108 pub(crate) access_layer: AccessLayerRef,
110 pub(crate) manifest_ctx: ManifestContextRef,
112 pub(crate) file_purger: FilePurgerRef,
114 pub(crate) provider: Provider,
116 last_flush_millis: AtomicI64,
118 last_compaction_millis: AtomicI64,
120 time_provider: TimeProviderRef,
122 pub(crate) topic_latest_entry_id: AtomicU64,
132 pub(crate) memtable_builder: MemtableBuilderRef,
134 stats: ManifestStats,
136}
137
138pub(crate) type MitoRegionRef = Arc<MitoRegion>;
139
140impl MitoRegion {
141 pub(crate) async fn stop(&self) {
143 self.manifest_ctx
144 .manifest_manager
145 .write()
146 .await
147 .stop()
148 .await;
149
150 info!(
151 "Stopped region manifest manager, region_id: {}",
152 self.region_id
153 );
154 }
155
156 pub(crate) fn metadata(&self) -> RegionMetadataRef {
158 let version_data = self.version_control.current();
159 version_data.version.metadata.clone()
160 }
161
162 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
164 let version_data = self.version_control.current();
165 version_data.version.metadata.primary_key_encoding
166 }
167
168 pub(crate) fn version(&self) -> VersionRef {
170 let version_data = self.version_control.current();
171 version_data.version
172 }
173
174 pub(crate) fn last_flush_millis(&self) -> i64 {
176 self.last_flush_millis.load(Ordering::Relaxed)
177 }
178
179 pub(crate) fn update_flush_millis(&self) {
181 let now = self.time_provider.current_time_millis();
182 self.last_flush_millis.store(now, Ordering::Relaxed);
183 }
184
185 pub(crate) fn last_compaction_millis(&self) -> i64 {
187 self.last_compaction_millis.load(Ordering::Relaxed)
188 }
189
190 pub(crate) fn update_compaction_millis(&self) {
192 let now = self.time_provider.current_time_millis();
193 self.last_compaction_millis.store(now, Ordering::Relaxed);
194 }
195
196 pub(crate) fn region_dir(&self) -> &str {
198 self.access_layer.region_dir()
199 }
200
201 pub(crate) fn is_writable(&self) -> bool {
203 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
204 }
205
206 pub(crate) fn is_flushable(&self) -> bool {
208 matches!(
209 self.manifest_ctx.state.load(),
210 RegionRoleState::Leader(RegionLeaderState::Writable)
211 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
212 )
213 }
214
215 pub(crate) fn is_downgrading(&self) -> bool {
217 matches!(
218 self.manifest_ctx.state.load(),
219 RegionRoleState::Leader(RegionLeaderState::Downgrading)
220 )
221 }
222
223 pub(crate) fn is_follower(&self) -> bool {
225 self.manifest_ctx.state.load() == RegionRoleState::Follower
226 }
227
228 pub(crate) fn state(&self) -> RegionRoleState {
230 self.manifest_ctx.state.load()
231 }
232
233 pub(crate) fn set_role(&self, next_role: RegionRole) {
235 self.manifest_ctx.set_role(next_role, self.region_id);
236 }
237
238 pub(crate) fn set_altering(&self) -> Result<()> {
241 self.compare_exchange_state(
242 RegionLeaderState::Writable,
243 RegionRoleState::Leader(RegionLeaderState::Altering),
244 )
245 }
246
247 pub(crate) fn set_dropping(&self) -> Result<()> {
250 self.compare_exchange_state(
251 RegionLeaderState::Writable,
252 RegionRoleState::Leader(RegionLeaderState::Dropping),
253 )
254 }
255
256 pub(crate) fn set_truncating(&self) -> Result<()> {
259 self.compare_exchange_state(
260 RegionLeaderState::Writable,
261 RegionRoleState::Leader(RegionLeaderState::Truncating),
262 )
263 }
264
265 pub(crate) fn set_editing(&self) -> Result<()> {
268 self.compare_exchange_state(
269 RegionLeaderState::Writable,
270 RegionRoleState::Leader(RegionLeaderState::Editing),
271 )
272 }
273
274 pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
276 let _manager = self.manifest_ctx.manifest_manager.write().await;
277 self.set_role(state.into());
280 }
281
282 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
285 if let Err(e) = self
286 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
287 {
288 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
289 }
290 }
291
292 pub(crate) fn region_statistic(&self) -> RegionStatistic {
294 let version = self.version();
295 let memtables = &version.memtables;
296 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
297
298 let sst_usage = version.ssts.sst_usage();
299 let index_usage = version.ssts.index_usage();
300 let flushed_entry_id = version.flushed_entry_id;
301
302 let wal_usage = self.estimated_wal_usage(memtable_usage);
303 let manifest_usage = self.stats.total_manifest_size();
304 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
305 let manifest_version = self.stats.manifest_version();
306
307 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
308
309 RegionStatistic {
310 num_rows,
311 memtable_size: memtable_usage,
312 wal_size: wal_usage,
313 manifest_size: manifest_usage,
314 sst_size: sst_usage,
315 index_size: index_usage,
316 manifest: RegionManifestInfo::Mito {
317 manifest_version,
318 flushed_entry_id,
319 },
320 data_topic_latest_entry_id: topic_latest_entry_id,
321 metadata_topic_latest_entry_id: topic_latest_entry_id,
322 }
323 }
324
325 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
328 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
329 }
330
331 fn compare_exchange_state(
334 &self,
335 expect: RegionLeaderState,
336 state: RegionRoleState,
337 ) -> Result<()> {
338 self.manifest_ctx
339 .state
340 .compare_exchange(RegionRoleState::Leader(expect), state)
341 .map_err(|actual| {
342 RegionStateSnafu {
343 region_id: self.region_id,
344 state: actual,
345 expect: RegionRoleState::Leader(expect),
346 }
347 .build()
348 })?;
349 Ok(())
350 }
351}
352
353#[derive(Debug)]
355pub(crate) struct ManifestContext {
356 manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
358 state: AtomicCell<RegionRoleState>,
361}
362
363impl ManifestContext {
364 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
365 ManifestContext {
366 manifest_manager: tokio::sync::RwLock::new(manager),
367 state: AtomicCell::new(state),
368 }
369 }
370
371 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
372 self.manifest_manager
373 .read()
374 .await
375 .manifest()
376 .manifest_version
377 }
378
379 pub(crate) async fn has_update(&self) -> Result<bool> {
380 self.manifest_manager.read().await.has_update().await
381 }
382
383 pub(crate) async fn install_manifest_to(
389 &self,
390 version: ManifestVersion,
391 ) -> Result<Arc<RegionManifest>> {
392 let mut manager = self.manifest_manager.write().await;
393 manager.install_manifest_to(version).await?;
394
395 Ok(manager.manifest())
396 }
397
398 pub(crate) async fn update_manifest(
400 &self,
401 expect_state: RegionLeaderState,
402 action_list: RegionMetaActionList,
403 ) -> Result<ManifestVersion> {
404 let mut manager = self.manifest_manager.write().await;
406 let manifest = manager.manifest();
408 let current_state = self.state.load();
411
412 if expect_state != RegionLeaderState::Downgrading {
417 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
418 info!(
419 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
420 manifest.metadata.region_id, expect_state
421 );
422 }
423 ensure!(
424 current_state == RegionRoleState::Leader(expect_state)
425 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
426 UpdateManifestSnafu {
427 region_id: manifest.metadata.region_id,
428 state: current_state,
429 }
430 );
431 } else {
432 ensure!(
433 current_state == RegionRoleState::Leader(expect_state),
434 RegionStateSnafu {
435 region_id: manifest.metadata.region_id,
436 state: current_state,
437 expect: RegionRoleState::Leader(expect_state),
438 }
439 );
440 }
441
442 for action in &action_list.actions {
443 let RegionMetaAction::Edit(edit) = &action else {
445 continue;
446 };
447
448 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
450 continue;
451 };
452
453 if let Some(flushed_entry_id) = edit.flushed_entry_id {
455 ensure!(
456 truncated_entry_id < flushed_entry_id,
457 RegionTruncatedSnafu {
458 region_id: manifest.metadata.region_id,
459 }
460 );
461 }
462
463 if !edit.files_to_remove.is_empty() {
465 for file in &edit.files_to_remove {
467 ensure!(
468 manifest.files.contains_key(&file.file_id),
469 RegionTruncatedSnafu {
470 region_id: manifest.metadata.region_id,
471 }
472 );
473 }
474 }
475 }
476
477 let version = manager.update(action_list).await.inspect_err(
479 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
480 )?;
481
482 if self.state.load() == RegionRoleState::Follower {
483 warn!(
484 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
485 manifest.metadata.region_id
486 );
487 }
488
489 Ok(version)
490 }
491
492 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
514 match next_role {
515 RegionRole::Follower => {
516 match self.state.fetch_update(|state| {
517 if !matches!(state, RegionRoleState::Follower) {
518 Some(RegionRoleState::Follower)
519 } else {
520 None
521 }
522 }) {
523 Ok(state) => info!(
524 "Convert region {} to follower, previous role state: {:?}",
525 region_id, state
526 ),
527 Err(state) => {
528 if state != RegionRoleState::Follower {
529 warn!(
530 "Failed to convert region {} to follower, current role state: {:?}",
531 region_id, state
532 )
533 }
534 }
535 }
536 }
537 RegionRole::Leader => {
538 match self.state.fetch_update(|state| {
539 if matches!(
540 state,
541 RegionRoleState::Follower
542 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
543 ) {
544 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
545 } else {
546 None
547 }
548 }) {
549 Ok(state) => info!(
550 "Convert region {} to leader, previous role state: {:?}",
551 region_id, state
552 ),
553 Err(state) => {
554 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
555 warn!(
556 "Failed to convert region {} to leader, current role state: {:?}",
557 region_id, state
558 )
559 }
560 }
561 }
562 }
563 RegionRole::DowngradingLeader => {
564 match self.state.compare_exchange(
565 RegionRoleState::Leader(RegionLeaderState::Writable),
566 RegionRoleState::Leader(RegionLeaderState::Downgrading),
567 ) {
568 Ok(state) => info!(
569 "Convert region {} to downgrading region, previous role state: {:?}",
570 region_id, state
571 ),
572 Err(state) => {
573 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
574 warn!(
575 "Failed to convert region {} to downgrading leader, current role state: {:?}",
576 region_id, state
577 )
578 }
579 }
580 }
581 }
582 }
583 }
584}
585
586#[cfg(test)]
587impl ManifestContext {
588 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
589 self.manifest_manager.read().await.manifest()
590 }
591}
592
593pub(crate) type ManifestContextRef = Arc<ManifestContext>;
594
595#[derive(Debug, Default)]
597pub(crate) struct RegionMap {
598 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
599}
600
601impl RegionMap {
602 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
604 let regions = self.regions.read().unwrap();
605 regions.contains_key(®ion_id)
606 }
607
608 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
610 let mut regions = self.regions.write().unwrap();
611 regions.insert(region.region_id, region);
612 }
613
614 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
616 let regions = self.regions.read().unwrap();
617 regions.get(®ion_id).cloned()
618 }
619
620 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
624 let region = self
625 .get_region(region_id)
626 .context(RegionNotFoundSnafu { region_id })?;
627 ensure!(
628 region.is_writable(),
629 RegionStateSnafu {
630 region_id,
631 state: region.state(),
632 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
633 }
634 );
635 Ok(region)
636 }
637
638 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
642 let region = self
643 .get_region(region_id)
644 .context(RegionNotFoundSnafu { region_id })?;
645 ensure!(
646 region.is_follower(),
647 RegionStateSnafu {
648 region_id,
649 state: region.state(),
650 expect: RegionRoleState::Follower,
651 }
652 );
653
654 Ok(region)
655 }
656
657 pub(crate) fn get_region_or<F: OnFailure>(
661 &self,
662 region_id: RegionId,
663 cb: &mut F,
664 ) -> Option<MitoRegionRef> {
665 match self
666 .get_region(region_id)
667 .context(RegionNotFoundSnafu { region_id })
668 {
669 Ok(region) => Some(region),
670 Err(e) => {
671 cb.on_failure(e);
672 None
673 }
674 }
675 }
676
677 pub(crate) fn writable_region_or<F: OnFailure>(
681 &self,
682 region_id: RegionId,
683 cb: &mut F,
684 ) -> Option<MitoRegionRef> {
685 match self.writable_region(region_id) {
686 Ok(region) => Some(region),
687 Err(e) => {
688 cb.on_failure(e);
689 None
690 }
691 }
692 }
693
694 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
698 let region = self
699 .get_region(region_id)
700 .context(RegionNotFoundSnafu { region_id })?;
701 ensure!(
702 region.is_flushable(),
703 FlushableRegionStateSnafu {
704 region_id,
705 state: region.state(),
706 }
707 );
708 Ok(region)
709 }
710
711 pub(crate) fn flushable_region_or<F: OnFailure>(
715 &self,
716 region_id: RegionId,
717 cb: &mut F,
718 ) -> Option<MitoRegionRef> {
719 match self.flushable_region(region_id) {
720 Ok(region) => Some(region),
721 Err(e) => {
722 cb.on_failure(e);
723 None
724 }
725 }
726 }
727
728 pub(crate) fn remove_region(&self, region_id: RegionId) {
730 let mut regions = self.regions.write().unwrap();
731 regions.remove(®ion_id);
732 }
733
734 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
736 let regions = self.regions.read().unwrap();
737 regions.values().cloned().collect()
738 }
739
740 pub(crate) fn clear(&self) {
742 self.regions.write().unwrap().clear();
743 }
744}
745
746pub(crate) type RegionMapRef = Arc<RegionMap>;
747
748#[derive(Debug, Default)]
750pub(crate) struct OpeningRegions {
751 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
752}
753
754impl OpeningRegions {
755 pub(crate) fn wait_for_opening_region(
757 &self,
758 region_id: RegionId,
759 sender: OptionOutputTx,
760 ) -> Option<OptionOutputTx> {
761 let mut regions = self.regions.write().unwrap();
762 match regions.entry(region_id) {
763 Entry::Occupied(mut senders) => {
764 senders.get_mut().push(sender);
765 None
766 }
767 Entry::Vacant(_) => Some(sender),
768 }
769 }
770
771 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
773 let regions = self.regions.read().unwrap();
774 regions.contains_key(®ion_id)
775 }
776
777 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
779 let mut regions = self.regions.write().unwrap();
780 regions.insert(region, vec![sender]);
781 }
782
783 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
785 let mut regions = self.regions.write().unwrap();
786 regions.remove(®ion_id).unwrap_or_default()
787 }
788
789 #[cfg(test)]
790 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
791 let regions = self.regions.read().unwrap();
792 if let Some(senders) = regions.get(®ion_id) {
793 senders.len()
794 } else {
795 0
796 }
797 }
798}
799
800pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
801
802#[derive(Default, Debug, Clone)]
804pub(crate) struct ManifestStats {
805 total_manifest_size: Arc<AtomicU64>,
806 manifest_version: Arc<AtomicU64>,
807}
808
809impl ManifestStats {
810 fn total_manifest_size(&self) -> u64 {
811 self.total_manifest_size.load(Ordering::Relaxed)
812 }
813
814 fn manifest_version(&self) -> u64 {
815 self.manifest_version.load(Ordering::Relaxed)
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use std::sync::Arc;
822
823 use crossbeam_utils::atomic::AtomicCell;
824 use store_api::region_engine::RegionRole;
825 use store_api::storage::RegionId;
826
827 use crate::region::{RegionLeaderState, RegionRoleState};
828 use crate::test_util::scheduler_util::SchedulerEnv;
829 use crate::test_util::version_util::VersionControlBuilder;
830
831 #[test]
832 fn test_region_state_lock_free() {
833 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
834 }
835
836 #[tokio::test]
837 async fn test_set_region_state() {
838 let env = SchedulerEnv::new().await;
839 let builder = VersionControlBuilder::new();
840 let version_control = Arc::new(builder.build());
841 let manifest_ctx = env
842 .mock_manifest_context(version_control.current().version.metadata.clone())
843 .await;
844
845 let region_id = RegionId::new(1024, 0);
846 manifest_ctx.set_role(RegionRole::Follower, region_id);
848 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
849
850 manifest_ctx.set_role(RegionRole::Leader, region_id);
852 assert_eq!(
853 manifest_ctx.state.load(),
854 RegionRoleState::Leader(RegionLeaderState::Writable)
855 );
856
857 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
859 assert_eq!(
860 manifest_ctx.state.load(),
861 RegionRoleState::Leader(RegionLeaderState::Downgrading)
862 );
863
864 manifest_ctx.set_role(RegionRole::Follower, region_id);
866 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
867
868 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
870 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
871
872 manifest_ctx.set_role(RegionRole::Leader, region_id);
874 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
875 assert_eq!(
876 manifest_ctx.state.load(),
877 RegionRoleState::Leader(RegionLeaderState::Downgrading)
878 );
879
880 manifest_ctx.set_role(RegionRole::Leader, region_id);
882 assert_eq!(
883 manifest_ctx.state.load(),
884 RegionRoleState::Leader(RegionLeaderState::Writable)
885 );
886 }
887}