1pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
34};
35use store_api::storage::{RegionId, SequenceNumber};
36use store_api::ManifestVersion;
37
38use crate::access_layer::AccessLayerRef;
39use crate::error::{
40 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
41 UpdateManifestSnafu,
42};
43use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::RegionManifestManager;
45use crate::memtable::MemtableBuilderRef;
46use crate::region::version::{VersionControlRef, VersionRef};
47use crate::request::{OnFailure, OptionOutputTx};
48use crate::sst::file_purger::FilePurgerRef;
49use crate::time_provider::TimeProviderRef;
50
51const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
53
54#[derive(Debug)]
56pub struct RegionUsage {
57 pub region_id: RegionId,
58 pub wal_usage: u64,
59 pub sst_usage: u64,
60 pub manifest_usage: u64,
61}
62
63impl RegionUsage {
64 pub fn disk_usage(&self) -> u64 {
65 self.wal_usage + self.sst_usage + self.manifest_usage
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum RegionLeaderState {
71 Writable,
73 Altering,
75 Dropping,
77 Truncating,
79 Editing,
81 Downgrading,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RegionRoleState {
87 Leader(RegionLeaderState),
88 Follower,
89}
90
91#[derive(Debug)]
97pub struct MitoRegion {
98 pub(crate) region_id: RegionId,
103
104 pub(crate) version_control: VersionControlRef,
108 pub(crate) access_layer: AccessLayerRef,
110 pub(crate) manifest_ctx: ManifestContextRef,
112 pub(crate) file_purger: FilePurgerRef,
114 pub(crate) provider: Provider,
116 last_flush_millis: AtomicI64,
118 last_compaction_millis: AtomicI64,
120 time_provider: TimeProviderRef,
122 pub(crate) topic_latest_entry_id: AtomicU64,
132 pub(crate) memtable_builder: MemtableBuilderRef,
134 stats: ManifestStats,
136}
137
138pub type MitoRegionRef = Arc<MitoRegion>;
139
140impl MitoRegion {
141 pub(crate) async fn stop(&self) {
143 self.manifest_ctx
144 .manifest_manager
145 .write()
146 .await
147 .stop()
148 .await;
149
150 info!(
151 "Stopped region manifest manager, region_id: {}",
152 self.region_id
153 );
154 }
155
156 pub(crate) fn metadata(&self) -> RegionMetadataRef {
158 let version_data = self.version_control.current();
159 version_data.version.metadata.clone()
160 }
161
162 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
164 let version_data = self.version_control.current();
165 version_data.version.metadata.primary_key_encoding
166 }
167
168 pub(crate) fn version(&self) -> VersionRef {
170 let version_data = self.version_control.current();
171 version_data.version
172 }
173
174 pub(crate) fn last_flush_millis(&self) -> i64 {
176 self.last_flush_millis.load(Ordering::Relaxed)
177 }
178
179 pub(crate) fn update_flush_millis(&self) {
181 let now = self.time_provider.current_time_millis();
182 self.last_flush_millis.store(now, Ordering::Relaxed);
183 }
184
185 pub(crate) fn last_compaction_millis(&self) -> i64 {
187 self.last_compaction_millis.load(Ordering::Relaxed)
188 }
189
190 pub(crate) fn update_compaction_millis(&self) {
192 let now = self.time_provider.current_time_millis();
193 self.last_compaction_millis.store(now, Ordering::Relaxed);
194 }
195
196 pub(crate) fn table_dir(&self) -> &str {
198 self.access_layer.table_dir()
199 }
200
201 pub(crate) fn is_writable(&self) -> bool {
203 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
204 }
205
206 pub(crate) fn is_flushable(&self) -> bool {
208 matches!(
209 self.manifest_ctx.state.load(),
210 RegionRoleState::Leader(RegionLeaderState::Writable)
211 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
212 )
213 }
214
215 pub(crate) fn is_downgrading(&self) -> bool {
217 matches!(
218 self.manifest_ctx.state.load(),
219 RegionRoleState::Leader(RegionLeaderState::Downgrading)
220 )
221 }
222
223 pub fn region_id(&self) -> RegionId {
224 self.region_id
225 }
226
227 pub fn find_committed_sequence(&self) -> SequenceNumber {
228 self.version_control.committed_sequence()
229 }
230
231 pub fn is_follower(&self) -> bool {
233 self.manifest_ctx.state.load() == RegionRoleState::Follower
234 }
235
236 pub(crate) fn state(&self) -> RegionRoleState {
238 self.manifest_ctx.state.load()
239 }
240
241 pub(crate) fn set_role(&self, next_role: RegionRole) {
243 self.manifest_ctx.set_role(next_role, self.region_id);
244 }
245
246 pub(crate) fn set_altering(&self) -> Result<()> {
249 self.compare_exchange_state(
250 RegionLeaderState::Writable,
251 RegionRoleState::Leader(RegionLeaderState::Altering),
252 )
253 }
254
255 pub(crate) fn set_dropping(&self) -> Result<()> {
258 self.compare_exchange_state(
259 RegionLeaderState::Writable,
260 RegionRoleState::Leader(RegionLeaderState::Dropping),
261 )
262 }
263
264 pub(crate) fn set_truncating(&self) -> Result<()> {
267 self.compare_exchange_state(
268 RegionLeaderState::Writable,
269 RegionRoleState::Leader(RegionLeaderState::Truncating),
270 )
271 }
272
273 pub(crate) fn set_editing(&self) -> Result<()> {
276 self.compare_exchange_state(
277 RegionLeaderState::Writable,
278 RegionRoleState::Leader(RegionLeaderState::Editing),
279 )
280 }
281
282 pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
284 let _manager = self.manifest_ctx.manifest_manager.write().await;
285 self.set_role(state.into());
288 }
289
290 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
293 if let Err(e) = self
294 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
295 {
296 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
297 }
298 }
299
300 pub(crate) fn region_statistic(&self) -> RegionStatistic {
302 let version = self.version();
303 let memtables = &version.memtables;
304 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
305
306 let sst_usage = version.ssts.sst_usage();
307 let index_usage = version.ssts.index_usage();
308 let flushed_entry_id = version.flushed_entry_id;
309
310 let wal_usage = self.estimated_wal_usage(memtable_usage);
311 let manifest_usage = self.stats.total_manifest_size();
312 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
313 let num_files = version.ssts.num_files();
314 let manifest_version = self.stats.manifest_version();
315
316 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
317
318 RegionStatistic {
319 num_rows,
320 memtable_size: memtable_usage,
321 wal_size: wal_usage,
322 manifest_size: manifest_usage,
323 sst_size: sst_usage,
324 sst_num: num_files,
325 index_size: index_usage,
326 manifest: RegionManifestInfo::Mito {
327 manifest_version,
328 flushed_entry_id,
329 },
330 data_topic_latest_entry_id: topic_latest_entry_id,
331 metadata_topic_latest_entry_id: topic_latest_entry_id,
332 }
333 }
334
335 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
338 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
339 }
340
341 fn compare_exchange_state(
344 &self,
345 expect: RegionLeaderState,
346 state: RegionRoleState,
347 ) -> Result<()> {
348 self.manifest_ctx
349 .state
350 .compare_exchange(RegionRoleState::Leader(expect), state)
351 .map_err(|actual| {
352 RegionStateSnafu {
353 region_id: self.region_id,
354 state: actual,
355 expect: RegionRoleState::Leader(expect),
356 }
357 .build()
358 })?;
359 Ok(())
360 }
361}
362
363#[derive(Debug)]
365pub(crate) struct ManifestContext {
366 manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
368 state: AtomicCell<RegionRoleState>,
371}
372
373impl ManifestContext {
374 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
375 ManifestContext {
376 manifest_manager: tokio::sync::RwLock::new(manager),
377 state: AtomicCell::new(state),
378 }
379 }
380
381 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
382 self.manifest_manager
383 .read()
384 .await
385 .manifest()
386 .manifest_version
387 }
388
389 pub(crate) async fn has_update(&self) -> Result<bool> {
390 self.manifest_manager.read().await.has_update().await
391 }
392
393 pub(crate) async fn install_manifest_to(
399 &self,
400 version: ManifestVersion,
401 ) -> Result<Arc<RegionManifest>> {
402 let mut manager = self.manifest_manager.write().await;
403 manager.install_manifest_to(version).await?;
404
405 Ok(manager.manifest())
406 }
407
408 pub(crate) async fn update_manifest(
410 &self,
411 expect_state: RegionLeaderState,
412 action_list: RegionMetaActionList,
413 ) -> Result<ManifestVersion> {
414 let mut manager = self.manifest_manager.write().await;
416 let manifest = manager.manifest();
418 let current_state = self.state.load();
421
422 if expect_state != RegionLeaderState::Downgrading {
427 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
428 info!(
429 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
430 manifest.metadata.region_id, expect_state
431 );
432 }
433 ensure!(
434 current_state == RegionRoleState::Leader(expect_state)
435 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
436 UpdateManifestSnafu {
437 region_id: manifest.metadata.region_id,
438 state: current_state,
439 }
440 );
441 } else {
442 ensure!(
443 current_state == RegionRoleState::Leader(expect_state),
444 RegionStateSnafu {
445 region_id: manifest.metadata.region_id,
446 state: current_state,
447 expect: RegionRoleState::Leader(expect_state),
448 }
449 );
450 }
451
452 for action in &action_list.actions {
453 let RegionMetaAction::Edit(edit) = &action else {
455 continue;
456 };
457
458 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
460 continue;
461 };
462
463 if let Some(flushed_entry_id) = edit.flushed_entry_id {
465 ensure!(
466 truncated_entry_id < flushed_entry_id,
467 RegionTruncatedSnafu {
468 region_id: manifest.metadata.region_id,
469 }
470 );
471 }
472
473 if !edit.files_to_remove.is_empty() {
475 for file in &edit.files_to_remove {
477 ensure!(
478 manifest.files.contains_key(&file.file_id),
479 RegionTruncatedSnafu {
480 region_id: manifest.metadata.region_id,
481 }
482 );
483 }
484 }
485 }
486
487 let version = manager.update(action_list).await.inspect_err(
489 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
490 )?;
491
492 if self.state.load() == RegionRoleState::Follower {
493 warn!(
494 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
495 manifest.metadata.region_id
496 );
497 }
498
499 Ok(version)
500 }
501
502 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
524 match next_role {
525 RegionRole::Follower => {
526 match self.state.fetch_update(|state| {
527 if !matches!(state, RegionRoleState::Follower) {
528 Some(RegionRoleState::Follower)
529 } else {
530 None
531 }
532 }) {
533 Ok(state) => info!(
534 "Convert region {} to follower, previous role state: {:?}",
535 region_id, state
536 ),
537 Err(state) => {
538 if state != RegionRoleState::Follower {
539 warn!(
540 "Failed to convert region {} to follower, current role state: {:?}",
541 region_id, state
542 )
543 }
544 }
545 }
546 }
547 RegionRole::Leader => {
548 match self.state.fetch_update(|state| {
549 if matches!(
550 state,
551 RegionRoleState::Follower
552 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
553 ) {
554 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
555 } else {
556 None
557 }
558 }) {
559 Ok(state) => info!(
560 "Convert region {} to leader, previous role state: {:?}",
561 region_id, state
562 ),
563 Err(state) => {
564 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
565 warn!(
566 "Failed to convert region {} to leader, current role state: {:?}",
567 region_id, state
568 )
569 }
570 }
571 }
572 }
573 RegionRole::DowngradingLeader => {
574 match self.state.compare_exchange(
575 RegionRoleState::Leader(RegionLeaderState::Writable),
576 RegionRoleState::Leader(RegionLeaderState::Downgrading),
577 ) {
578 Ok(state) => info!(
579 "Convert region {} to downgrading region, previous role state: {:?}",
580 region_id, state
581 ),
582 Err(state) => {
583 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
584 warn!(
585 "Failed to convert region {} to downgrading leader, current role state: {:?}",
586 region_id, state
587 )
588 }
589 }
590 }
591 }
592 }
593 }
594}
595
596#[cfg(test)]
597impl ManifestContext {
598 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
599 self.manifest_manager.read().await.manifest()
600 }
601}
602
603pub(crate) type ManifestContextRef = Arc<ManifestContext>;
604
605#[derive(Debug, Default)]
607pub(crate) struct RegionMap {
608 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
609}
610
611impl RegionMap {
612 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
614 let regions = self.regions.read().unwrap();
615 regions.contains_key(®ion_id)
616 }
617
618 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
620 let mut regions = self.regions.write().unwrap();
621 regions.insert(region.region_id, region);
622 }
623
624 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
626 let regions = self.regions.read().unwrap();
627 regions.get(®ion_id).cloned()
628 }
629
630 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
634 let region = self
635 .get_region(region_id)
636 .context(RegionNotFoundSnafu { region_id })?;
637 ensure!(
638 region.is_writable(),
639 RegionStateSnafu {
640 region_id,
641 state: region.state(),
642 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
643 }
644 );
645 Ok(region)
646 }
647
648 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
652 let region = self
653 .get_region(region_id)
654 .context(RegionNotFoundSnafu { region_id })?;
655 ensure!(
656 region.is_follower(),
657 RegionStateSnafu {
658 region_id,
659 state: region.state(),
660 expect: RegionRoleState::Follower,
661 }
662 );
663
664 Ok(region)
665 }
666
667 pub(crate) fn get_region_or<F: OnFailure>(
671 &self,
672 region_id: RegionId,
673 cb: &mut F,
674 ) -> Option<MitoRegionRef> {
675 match self
676 .get_region(region_id)
677 .context(RegionNotFoundSnafu { region_id })
678 {
679 Ok(region) => Some(region),
680 Err(e) => {
681 cb.on_failure(e);
682 None
683 }
684 }
685 }
686
687 pub(crate) fn writable_region_or<F: OnFailure>(
691 &self,
692 region_id: RegionId,
693 cb: &mut F,
694 ) -> Option<MitoRegionRef> {
695 match self.writable_region(region_id) {
696 Ok(region) => Some(region),
697 Err(e) => {
698 cb.on_failure(e);
699 None
700 }
701 }
702 }
703
704 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
708 let region = self
709 .get_region(region_id)
710 .context(RegionNotFoundSnafu { region_id })?;
711 ensure!(
712 region.is_flushable(),
713 FlushableRegionStateSnafu {
714 region_id,
715 state: region.state(),
716 }
717 );
718 Ok(region)
719 }
720
721 pub(crate) fn flushable_region_or<F: OnFailure>(
725 &self,
726 region_id: RegionId,
727 cb: &mut F,
728 ) -> Option<MitoRegionRef> {
729 match self.flushable_region(region_id) {
730 Ok(region) => Some(region),
731 Err(e) => {
732 cb.on_failure(e);
733 None
734 }
735 }
736 }
737
738 pub(crate) fn remove_region(&self, region_id: RegionId) {
740 let mut regions = self.regions.write().unwrap();
741 regions.remove(®ion_id);
742 }
743
744 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
746 let regions = self.regions.read().unwrap();
747 regions.values().cloned().collect()
748 }
749
750 pub(crate) fn clear(&self) {
752 self.regions.write().unwrap().clear();
753 }
754}
755
756pub(crate) type RegionMapRef = Arc<RegionMap>;
757
758#[derive(Debug, Default)]
760pub(crate) struct OpeningRegions {
761 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
762}
763
764impl OpeningRegions {
765 pub(crate) fn wait_for_opening_region(
767 &self,
768 region_id: RegionId,
769 sender: OptionOutputTx,
770 ) -> Option<OptionOutputTx> {
771 let mut regions = self.regions.write().unwrap();
772 match regions.entry(region_id) {
773 Entry::Occupied(mut senders) => {
774 senders.get_mut().push(sender);
775 None
776 }
777 Entry::Vacant(_) => Some(sender),
778 }
779 }
780
781 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
783 let regions = self.regions.read().unwrap();
784 regions.contains_key(®ion_id)
785 }
786
787 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
789 let mut regions = self.regions.write().unwrap();
790 regions.insert(region, vec![sender]);
791 }
792
793 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
795 let mut regions = self.regions.write().unwrap();
796 regions.remove(®ion_id).unwrap_or_default()
797 }
798
799 #[cfg(test)]
800 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
801 let regions = self.regions.read().unwrap();
802 if let Some(senders) = regions.get(®ion_id) {
803 senders.len()
804 } else {
805 0
806 }
807 }
808}
809
810pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
811
812#[derive(Default, Debug, Clone)]
814pub(crate) struct ManifestStats {
815 total_manifest_size: Arc<AtomicU64>,
816 manifest_version: Arc<AtomicU64>,
817}
818
819impl ManifestStats {
820 fn total_manifest_size(&self) -> u64 {
821 self.total_manifest_size.load(Ordering::Relaxed)
822 }
823
824 fn manifest_version(&self) -> u64 {
825 self.manifest_version.load(Ordering::Relaxed)
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use std::sync::Arc;
832
833 use crossbeam_utils::atomic::AtomicCell;
834 use store_api::region_engine::RegionRole;
835 use store_api::storage::RegionId;
836
837 use crate::region::{RegionLeaderState, RegionRoleState};
838 use crate::test_util::scheduler_util::SchedulerEnv;
839 use crate::test_util::version_util::VersionControlBuilder;
840
841 #[test]
842 fn test_region_state_lock_free() {
843 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
844 }
845
846 #[tokio::test]
847 async fn test_set_region_state() {
848 let env = SchedulerEnv::new().await;
849 let builder = VersionControlBuilder::new();
850 let version_control = Arc::new(builder.build());
851 let manifest_ctx = env
852 .mock_manifest_context(version_control.current().version.metadata.clone())
853 .await;
854
855 let region_id = RegionId::new(1024, 0);
856 manifest_ctx.set_role(RegionRole::Follower, region_id);
858 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
859
860 manifest_ctx.set_role(RegionRole::Leader, region_id);
862 assert_eq!(
863 manifest_ctx.state.load(),
864 RegionRoleState::Leader(RegionLeaderState::Writable)
865 );
866
867 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
869 assert_eq!(
870 manifest_ctx.state.load(),
871 RegionRoleState::Leader(RegionLeaderState::Downgrading)
872 );
873
874 manifest_ctx.set_role(RegionRole::Follower, region_id);
876 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
877
878 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
880 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
881
882 manifest_ctx.set_role(RegionRole::Leader, region_id);
884 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
885 assert_eq!(
886 manifest_ctx.state.load(),
887 RegionRoleState::Leader(RegionLeaderState::Downgrading)
888 );
889
890 manifest_ctx.set_role(RegionRole::Leader, region_id);
892 assert_eq!(
893 manifest_ctx.state.load(),
894 RegionRoleState::Leader(RegionLeaderState::Writable)
895 );
896 }
897}