1pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
34};
35use store_api::sst_entry::ManifestSstEntry;
36use store_api::storage::{RegionId, SequenceNumber};
37use store_api::ManifestVersion;
38
39use crate::access_layer::AccessLayerRef;
40use crate::error::{
41 FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
42 UpdateManifestSnafu,
43};
44use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
45use crate::manifest::manager::RegionManifestManager;
46use crate::memtable::MemtableBuilderRef;
47use crate::region::version::{VersionControlRef, VersionRef};
48use crate::request::{OnFailure, OptionOutputTx};
49use crate::sst::file_purger::FilePurgerRef;
50use crate::sst::location::{index_file_path, sst_file_path};
51use crate::time_provider::TimeProviderRef;
52
53const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
55
56#[derive(Debug)]
58pub struct RegionUsage {
59 pub region_id: RegionId,
60 pub wal_usage: u64,
61 pub sst_usage: u64,
62 pub manifest_usage: u64,
63}
64
65impl RegionUsage {
66 pub fn disk_usage(&self) -> u64 {
67 self.wal_usage + self.sst_usage + self.manifest_usage
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum RegionLeaderState {
73 Writable,
75 Staging,
77 Altering,
79 Dropping,
81 Truncating,
83 Editing,
85 Downgrading,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum RegionRoleState {
91 Leader(RegionLeaderState),
92 Follower,
93}
94
95#[derive(Debug)]
101pub struct MitoRegion {
102 pub(crate) region_id: RegionId,
107
108 pub(crate) version_control: VersionControlRef,
112 pub(crate) access_layer: AccessLayerRef,
114 pub(crate) manifest_ctx: ManifestContextRef,
116 pub(crate) file_purger: FilePurgerRef,
118 pub(crate) provider: Provider,
120 last_flush_millis: AtomicI64,
122 last_compaction_millis: AtomicI64,
124 time_provider: TimeProviderRef,
126 pub(crate) topic_latest_entry_id: AtomicU64,
136 pub(crate) written_bytes: Arc<AtomicU64>,
138 pub(crate) memtable_builder: MemtableBuilderRef,
140 stats: ManifestStats,
142}
143
144pub type MitoRegionRef = Arc<MitoRegion>;
145
146impl MitoRegion {
147 pub(crate) async fn stop(&self) {
149 self.manifest_ctx
150 .manifest_manager
151 .write()
152 .await
153 .stop()
154 .await;
155
156 info!(
157 "Stopped region manifest manager, region_id: {}",
158 self.region_id
159 );
160 }
161
162 pub(crate) fn metadata(&self) -> RegionMetadataRef {
164 let version_data = self.version_control.current();
165 version_data.version.metadata.clone()
166 }
167
168 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
170 let version_data = self.version_control.current();
171 version_data.version.metadata.primary_key_encoding
172 }
173
174 pub(crate) fn version(&self) -> VersionRef {
176 let version_data = self.version_control.current();
177 version_data.version
178 }
179
180 pub(crate) fn last_flush_millis(&self) -> i64 {
182 self.last_flush_millis.load(Ordering::Relaxed)
183 }
184
185 pub(crate) fn update_flush_millis(&self) {
187 let now = self.time_provider.current_time_millis();
188 self.last_flush_millis.store(now, Ordering::Relaxed);
189 }
190
191 pub(crate) fn last_compaction_millis(&self) -> i64 {
193 self.last_compaction_millis.load(Ordering::Relaxed)
194 }
195
196 pub(crate) fn update_compaction_millis(&self) {
198 let now = self.time_provider.current_time_millis();
199 self.last_compaction_millis.store(now, Ordering::Relaxed);
200 }
201
202 pub(crate) fn table_dir(&self) -> &str {
204 self.access_layer.table_dir()
205 }
206
207 pub(crate) fn is_writable(&self) -> bool {
209 matches!(
210 self.manifest_ctx.state.load(),
211 RegionRoleState::Leader(RegionLeaderState::Writable)
212 | RegionRoleState::Leader(RegionLeaderState::Staging)
213 )
214 }
215
216 pub(crate) fn is_flushable(&self) -> bool {
218 matches!(
219 self.manifest_ctx.state.load(),
220 RegionRoleState::Leader(RegionLeaderState::Writable)
221 | RegionRoleState::Leader(RegionLeaderState::Staging)
222 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
223 )
224 }
225
226 pub(crate) fn is_downgrading(&self) -> bool {
228 matches!(
229 self.manifest_ctx.state.load(),
230 RegionRoleState::Leader(RegionLeaderState::Downgrading)
231 )
232 }
233
234 #[allow(dead_code)]
236 pub(crate) fn is_staging(&self) -> bool {
237 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
238 }
239
240 pub fn region_id(&self) -> RegionId {
241 self.region_id
242 }
243
244 pub fn find_committed_sequence(&self) -> SequenceNumber {
245 self.version_control.committed_sequence()
246 }
247
248 pub fn is_follower(&self) -> bool {
250 self.manifest_ctx.state.load() == RegionRoleState::Follower
251 }
252
253 pub(crate) fn state(&self) -> RegionRoleState {
255 self.manifest_ctx.state.load()
256 }
257
258 pub(crate) fn set_role(&self, next_role: RegionRole) {
260 self.manifest_ctx.set_role(next_role, self.region_id);
261 }
262
263 pub(crate) fn set_altering(&self) -> Result<()> {
266 self.compare_exchange_state(
267 RegionLeaderState::Writable,
268 RegionRoleState::Leader(RegionLeaderState::Altering),
269 )
270 }
271
272 pub(crate) fn set_dropping(&self) -> Result<()> {
275 self.compare_exchange_state(
276 RegionLeaderState::Writable,
277 RegionRoleState::Leader(RegionLeaderState::Dropping),
278 )
279 }
280
281 pub(crate) fn set_truncating(&self) -> Result<()> {
284 self.compare_exchange_state(
285 RegionLeaderState::Writable,
286 RegionRoleState::Leader(RegionLeaderState::Truncating),
287 )
288 }
289
290 pub(crate) fn set_editing(&self) -> Result<()> {
293 self.compare_exchange_state(
294 RegionLeaderState::Writable,
295 RegionRoleState::Leader(RegionLeaderState::Editing),
296 )
297 }
298
299 pub(crate) fn set_staging(&self) -> Result<()> {
303 self.compare_exchange_state(
304 RegionLeaderState::Writable,
305 RegionRoleState::Leader(RegionLeaderState::Staging),
306 )
307 }
308
309 pub(crate) fn exit_staging(&self) -> Result<()> {
313 self.compare_exchange_state(
314 RegionLeaderState::Staging,
315 RegionRoleState::Leader(RegionLeaderState::Writable),
316 )
317 }
318
319 pub(crate) async fn set_role_state_gracefully(
321 &self,
322 state: SettableRegionRoleState,
323 ) -> Result<()> {
324 let _manager = self.manifest_ctx.manifest_manager.write().await;
325 let current_state = self.state();
326
327 match state {
328 SettableRegionRoleState::Leader => {
329 match current_state {
332 RegionRoleState::Leader(RegionLeaderState::Staging) => {
333 info!("Exiting staging mode for region {}", self.region_id);
334 self.exit_staging()?;
335 }
336 RegionRoleState::Leader(RegionLeaderState::Writable) => {
337 info!("Region {} already in normal leader mode", self.region_id);
339 }
340 _ => {
341 return Err(RegionStateSnafu {
343 region_id: self.region_id,
344 state: current_state,
345 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
346 }
347 .build());
348 }
349 }
350 }
351
352 SettableRegionRoleState::StagingLeader => {
353 match current_state {
356 RegionRoleState::Leader(RegionLeaderState::Writable) => {
357 info!("Entering staging mode for region {}", self.region_id);
358 self.set_staging()?;
359 }
360 RegionRoleState::Leader(RegionLeaderState::Staging) => {
361 info!("Region {} already in staging mode", self.region_id);
363 }
364 _ => {
365 return Err(RegionStateSnafu {
366 region_id: self.region_id,
367 state: current_state,
368 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
369 }
370 .build());
371 }
372 }
373 }
374
375 SettableRegionRoleState::Follower => {
376 match current_state {
378 RegionRoleState::Leader(RegionLeaderState::Staging) => {
379 info!(
380 "Exiting staging and demoting region {} to follower",
381 self.region_id
382 );
383 self.exit_staging()?;
384 self.set_role(RegionRole::Follower);
385 }
386 RegionRoleState::Leader(_) => {
387 info!("Demoting region {} from leader to follower", self.region_id);
388 self.set_role(RegionRole::Follower);
389 }
390 RegionRoleState::Follower => {
391 info!("Region {} already in follower mode", self.region_id);
393 }
394 }
395 }
396
397 SettableRegionRoleState::DowngradingLeader => {
398 match current_state {
400 RegionRoleState::Leader(RegionLeaderState::Staging) => {
401 info!(
402 "Exiting staging and entering downgrade for region {}",
403 self.region_id
404 );
405 self.exit_staging()?;
406 self.set_role(RegionRole::DowngradingLeader);
407 }
408 RegionRoleState::Leader(RegionLeaderState::Writable) => {
409 info!("Starting downgrade for region {}", self.region_id);
410 self.set_role(RegionRole::DowngradingLeader);
411 }
412 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
413 info!("Region {} already in downgrading mode", self.region_id);
415 }
416 _ => {
417 warn!(
418 "Cannot start downgrade for region {} from state {:?}",
419 self.region_id, current_state
420 );
421 }
422 }
423 }
424 }
425
426 Ok(())
427 }
428
429 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
432 if let Err(e) = self
433 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
434 {
435 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
436 }
437 }
438
439 pub(crate) fn region_statistic(&self) -> RegionStatistic {
441 let version = self.version();
442 let memtables = &version.memtables;
443 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
444
445 let sst_usage = version.ssts.sst_usage();
446 let index_usage = version.ssts.index_usage();
447 let flushed_entry_id = version.flushed_entry_id;
448
449 let wal_usage = self.estimated_wal_usage(memtable_usage);
450 let manifest_usage = self.stats.total_manifest_size();
451 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
452 let num_files = version.ssts.num_files();
453 let manifest_version = self.stats.manifest_version();
454
455 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
456 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
457
458 RegionStatistic {
459 num_rows,
460 memtable_size: memtable_usage,
461 wal_size: wal_usage,
462 manifest_size: manifest_usage,
463 sst_size: sst_usage,
464 sst_num: num_files,
465 index_size: index_usage,
466 manifest: RegionManifestInfo::Mito {
467 manifest_version,
468 flushed_entry_id,
469 },
470 data_topic_latest_entry_id: topic_latest_entry_id,
471 metadata_topic_latest_entry_id: topic_latest_entry_id,
472 written_bytes,
473 }
474 }
475
476 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
479 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
480 }
481
482 fn compare_exchange_state(
485 &self,
486 expect: RegionLeaderState,
487 state: RegionRoleState,
488 ) -> Result<()> {
489 self.manifest_ctx
490 .state
491 .compare_exchange(RegionRoleState::Leader(expect), state)
492 .map_err(|actual| {
493 RegionStateSnafu {
494 region_id: self.region_id,
495 state: actual,
496 expect: RegionRoleState::Leader(expect),
497 }
498 .build()
499 })?;
500 Ok(())
501 }
502
503 pub fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
505 let table_dir = self.table_dir();
506 let path_type = self.access_layer.path_type();
507 self.version()
508 .ssts
509 .levels()
510 .iter()
511 .flat_map(|level| {
512 level.files().map(|file| {
513 let meta = file.meta_ref();
514 let region_id = meta.region_id;
515 let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
516 let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
517 (Some(index_file_path), Some(meta.index_file_size))
518 } else {
519 (None, None)
520 };
521 ManifestSstEntry {
522 table_dir: table_dir.to_string(),
523 region_id,
524 table_id: region_id.table_id(),
525 region_number: region_id.region_number(),
526 region_group: region_id.region_group(),
527 region_sequence: region_id.region_sequence(),
528 file_id: meta.file_id.to_string(),
529 level: meta.level,
530 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
531 file_size: meta.file_size,
532 index_file_path,
533 index_file_size,
534 num_rows: meta.num_rows,
535 num_row_groups: meta.num_row_groups,
536 min_ts: meta.time_range.0,
537 max_ts: meta.time_range.1,
538 sequence: meta.sequence.map(|s| s.get()),
539 }
540 })
541 })
542 .collect()
543 }
544}
545
546#[derive(Debug)]
548pub(crate) struct ManifestContext {
549 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
551 state: AtomicCell<RegionRoleState>,
554}
555
556impl ManifestContext {
557 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
558 ManifestContext {
559 manifest_manager: tokio::sync::RwLock::new(manager),
560 state: AtomicCell::new(state),
561 }
562 }
563
564 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
565 self.manifest_manager
566 .read()
567 .await
568 .manifest()
569 .manifest_version
570 }
571
572 pub(crate) async fn has_update(&self) -> Result<bool> {
573 self.manifest_manager.read().await.has_update().await
574 }
575
576 pub(crate) fn current_state(&self) -> RegionRoleState {
578 self.state.load()
579 }
580
581 pub(crate) async fn install_manifest_to(
587 &self,
588 version: ManifestVersion,
589 ) -> Result<Arc<RegionManifest>> {
590 let mut manager = self.manifest_manager.write().await;
591 manager.install_manifest_to(version).await?;
592
593 Ok(manager.manifest())
594 }
595
596 pub(crate) async fn update_manifest(
598 &self,
599 expect_state: RegionLeaderState,
600 action_list: RegionMetaActionList,
601 ) -> Result<ManifestVersion> {
602 let mut manager = self.manifest_manager.write().await;
604 let manifest = manager.manifest();
606 let current_state = self.state.load();
609
610 if expect_state != RegionLeaderState::Downgrading {
615 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
616 info!(
617 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
618 manifest.metadata.region_id, expect_state
619 );
620 }
621 ensure!(
622 current_state == RegionRoleState::Leader(expect_state)
623 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
624 UpdateManifestSnafu {
625 region_id: manifest.metadata.region_id,
626 state: current_state,
627 }
628 );
629 } else {
630 ensure!(
631 current_state == RegionRoleState::Leader(expect_state),
632 RegionStateSnafu {
633 region_id: manifest.metadata.region_id,
634 state: current_state,
635 expect: RegionRoleState::Leader(expect_state),
636 }
637 );
638 }
639
640 for action in &action_list.actions {
641 let RegionMetaAction::Edit(edit) = &action else {
643 continue;
644 };
645
646 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
648 continue;
649 };
650
651 if let Some(flushed_entry_id) = edit.flushed_entry_id {
653 ensure!(
654 truncated_entry_id < flushed_entry_id,
655 RegionTruncatedSnafu {
656 region_id: manifest.metadata.region_id,
657 }
658 );
659 }
660
661 if !edit.files_to_remove.is_empty() {
663 for file in &edit.files_to_remove {
665 ensure!(
666 manifest.files.contains_key(&file.file_id),
667 RegionTruncatedSnafu {
668 region_id: manifest.metadata.region_id,
669 }
670 );
671 }
672 }
673 }
674
675 let version = manager.update(action_list, current_state).await.inspect_err(
677 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
678 )?;
679
680 if self.state.load() == RegionRoleState::Follower {
681 warn!(
682 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
683 manifest.metadata.region_id
684 );
685 }
686
687 Ok(version)
688 }
689
690 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
712 match next_role {
713 RegionRole::Follower => {
714 match self.state.fetch_update(|state| {
715 if !matches!(state, RegionRoleState::Follower) {
716 Some(RegionRoleState::Follower)
717 } else {
718 None
719 }
720 }) {
721 Ok(state) => info!(
722 "Convert region {} to follower, previous role state: {:?}",
723 region_id, state
724 ),
725 Err(state) => {
726 if state != RegionRoleState::Follower {
727 warn!(
728 "Failed to convert region {} to follower, current role state: {:?}",
729 region_id, state
730 )
731 }
732 }
733 }
734 }
735 RegionRole::Leader => {
736 match self.state.fetch_update(|state| {
737 if matches!(
738 state,
739 RegionRoleState::Follower
740 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
741 ) {
742 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
743 } else {
744 None
745 }
746 }) {
747 Ok(state) => info!(
748 "Convert region {} to leader, previous role state: {:?}",
749 region_id, state
750 ),
751 Err(state) => {
752 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
753 warn!(
754 "Failed to convert region {} to leader, current role state: {:?}",
755 region_id, state
756 )
757 }
758 }
759 }
760 }
761 RegionRole::DowngradingLeader => {
762 match self.state.compare_exchange(
763 RegionRoleState::Leader(RegionLeaderState::Writable),
764 RegionRoleState::Leader(RegionLeaderState::Downgrading),
765 ) {
766 Ok(state) => info!(
767 "Convert region {} to downgrading region, previous role state: {:?}",
768 region_id, state
769 ),
770 Err(state) => {
771 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
772 warn!(
773 "Failed to convert region {} to downgrading leader, current role state: {:?}",
774 region_id, state
775 )
776 }
777 }
778 }
779 }
780 }
781 }
782
783 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
784 self.manifest_manager.read().await.manifest()
785 }
786}
787
788pub(crate) type ManifestContextRef = Arc<ManifestContext>;
789
790#[derive(Debug, Default)]
792pub(crate) struct RegionMap {
793 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
794}
795
796impl RegionMap {
797 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
799 let regions = self.regions.read().unwrap();
800 regions.contains_key(®ion_id)
801 }
802
803 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
805 let mut regions = self.regions.write().unwrap();
806 regions.insert(region.region_id, region);
807 }
808
809 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
811 let regions = self.regions.read().unwrap();
812 regions.get(®ion_id).cloned()
813 }
814
815 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
819 let region = self
820 .get_region(region_id)
821 .context(RegionNotFoundSnafu { region_id })?;
822 ensure!(
823 region.is_writable(),
824 RegionStateSnafu {
825 region_id,
826 state: region.state(),
827 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
828 }
829 );
830 Ok(region)
831 }
832
833 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
837 let region = self
838 .get_region(region_id)
839 .context(RegionNotFoundSnafu { region_id })?;
840 ensure!(
841 region.is_follower(),
842 RegionStateSnafu {
843 region_id,
844 state: region.state(),
845 expect: RegionRoleState::Follower,
846 }
847 );
848
849 Ok(region)
850 }
851
852 pub(crate) fn get_region_or<F: OnFailure>(
856 &self,
857 region_id: RegionId,
858 cb: &mut F,
859 ) -> Option<MitoRegionRef> {
860 match self
861 .get_region(region_id)
862 .context(RegionNotFoundSnafu { region_id })
863 {
864 Ok(region) => Some(region),
865 Err(e) => {
866 cb.on_failure(e);
867 None
868 }
869 }
870 }
871
872 pub(crate) fn writable_region_or<F: OnFailure>(
876 &self,
877 region_id: RegionId,
878 cb: &mut F,
879 ) -> Option<MitoRegionRef> {
880 match self.writable_region(region_id) {
881 Ok(region) => Some(region),
882 Err(e) => {
883 cb.on_failure(e);
884 None
885 }
886 }
887 }
888
889 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
893 let region = self.writable_region(region_id)?;
894 if region.is_staging() {
895 return Err(crate::error::RegionStateSnafu {
896 region_id,
897 state: region.state(),
898 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
899 }
900 .build());
901 }
902 Ok(region)
903 }
904
905 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
909 let region = self
910 .get_region(region_id)
911 .context(RegionNotFoundSnafu { region_id })?;
912 ensure!(
913 region.is_flushable(),
914 FlushableRegionStateSnafu {
915 region_id,
916 state: region.state(),
917 }
918 );
919 Ok(region)
920 }
921
922 pub(crate) fn flushable_region_or<F: OnFailure>(
926 &self,
927 region_id: RegionId,
928 cb: &mut F,
929 ) -> Option<MitoRegionRef> {
930 match self.flushable_region(region_id) {
931 Ok(region) => Some(region),
932 Err(e) => {
933 cb.on_failure(e);
934 None
935 }
936 }
937 }
938
939 pub(crate) fn remove_region(&self, region_id: RegionId) {
941 let mut regions = self.regions.write().unwrap();
942 regions.remove(®ion_id);
943 }
944
945 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
947 let regions = self.regions.read().unwrap();
948 regions.values().cloned().collect()
949 }
950
951 pub(crate) fn clear(&self) {
953 self.regions.write().unwrap().clear();
954 }
955}
956
957pub(crate) type RegionMapRef = Arc<RegionMap>;
958
959#[derive(Debug, Default)]
961pub(crate) struct OpeningRegions {
962 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
963}
964
965impl OpeningRegions {
966 pub(crate) fn wait_for_opening_region(
968 &self,
969 region_id: RegionId,
970 sender: OptionOutputTx,
971 ) -> Option<OptionOutputTx> {
972 let mut regions = self.regions.write().unwrap();
973 match regions.entry(region_id) {
974 Entry::Occupied(mut senders) => {
975 senders.get_mut().push(sender);
976 None
977 }
978 Entry::Vacant(_) => Some(sender),
979 }
980 }
981
982 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
984 let regions = self.regions.read().unwrap();
985 regions.contains_key(®ion_id)
986 }
987
988 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
990 let mut regions = self.regions.write().unwrap();
991 regions.insert(region, vec![sender]);
992 }
993
994 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
996 let mut regions = self.regions.write().unwrap();
997 regions.remove(®ion_id).unwrap_or_default()
998 }
999
1000 #[cfg(test)]
1001 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1002 let regions = self.regions.read().unwrap();
1003 if let Some(senders) = regions.get(®ion_id) {
1004 senders.len()
1005 } else {
1006 0
1007 }
1008 }
1009}
1010
1011pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1012
1013#[derive(Default, Debug, Clone)]
1015pub(crate) struct ManifestStats {
1016 total_manifest_size: Arc<AtomicU64>,
1017 manifest_version: Arc<AtomicU64>,
1018}
1019
1020impl ManifestStats {
1021 fn total_manifest_size(&self) -> u64 {
1022 self.total_manifest_size.load(Ordering::Relaxed)
1023 }
1024
1025 fn manifest_version(&self) -> u64 {
1026 self.manifest_version.load(Ordering::Relaxed)
1027 }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032 use std::sync::atomic::AtomicU64;
1033 use std::sync::Arc;
1034
1035 use common_datasource::compression::CompressionType;
1036 use common_test_util::temp_dir::create_temp_dir;
1037 use crossbeam_utils::atomic::AtomicCell;
1038 use object_store::services::Fs;
1039 use object_store::ObjectStore;
1040 use store_api::logstore::provider::Provider;
1041 use store_api::region_engine::RegionRole;
1042 use store_api::region_request::PathType;
1043 use store_api::storage::RegionId;
1044
1045 use crate::access_layer::AccessLayer;
1046 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1047 use crate::region::{
1048 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1049 };
1050 use crate::sst::index::intermediate::IntermediateManager;
1051 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1052 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1053 use crate::test_util::scheduler_util::SchedulerEnv;
1054 use crate::test_util::version_util::VersionControlBuilder;
1055 use crate::time_provider::StdTimeProvider;
1056
1057 #[test]
1058 fn test_region_state_lock_free() {
1059 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_set_region_state() {
1064 let env = SchedulerEnv::new().await;
1065 let builder = VersionControlBuilder::new();
1066 let version_control = Arc::new(builder.build());
1067 let manifest_ctx = env
1068 .mock_manifest_context(version_control.current().version.metadata.clone())
1069 .await;
1070
1071 let region_id = RegionId::new(1024, 0);
1072 manifest_ctx.set_role(RegionRole::Follower, region_id);
1074 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1075
1076 manifest_ctx.set_role(RegionRole::Leader, region_id);
1078 assert_eq!(
1079 manifest_ctx.state.load(),
1080 RegionRoleState::Leader(RegionLeaderState::Writable)
1081 );
1082
1083 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1085 assert_eq!(
1086 manifest_ctx.state.load(),
1087 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1088 );
1089
1090 manifest_ctx.set_role(RegionRole::Follower, region_id);
1092 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1093
1094 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1096 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1097
1098 manifest_ctx.set_role(RegionRole::Leader, region_id);
1100 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1101 assert_eq!(
1102 manifest_ctx.state.load(),
1103 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1104 );
1105
1106 manifest_ctx.set_role(RegionRole::Leader, region_id);
1108 assert_eq!(
1109 manifest_ctx.state.load(),
1110 RegionRoleState::Leader(RegionLeaderState::Writable)
1111 );
1112 }
1113
1114 #[tokio::test]
1115 async fn test_staging_state_validation() {
1116 let env = SchedulerEnv::new().await;
1117 let builder = VersionControlBuilder::new();
1118 let version_control = Arc::new(builder.build());
1119
1120 let staging_ctx = {
1122 let manager = RegionManifestManager::new(
1123 version_control.current().version.metadata.clone(),
1124 0,
1125 RegionManifestOptions {
1126 manifest_dir: "".to_string(),
1127 object_store: env.access_layer.object_store().clone(),
1128 compress_type: CompressionType::Uncompressed,
1129 checkpoint_distance: 10,
1130 remove_file_options: Default::default(),
1131 },
1132 Default::default(),
1133 Default::default(),
1134 )
1135 .await
1136 .unwrap();
1137 Arc::new(ManifestContext::new(
1138 manager,
1139 RegionRoleState::Leader(RegionLeaderState::Staging),
1140 ))
1141 };
1142
1143 assert_eq!(
1145 staging_ctx.current_state(),
1146 RegionRoleState::Leader(RegionLeaderState::Staging)
1147 );
1148
1149 let writable_ctx = env
1151 .mock_manifest_context(version_control.current().version.metadata.clone())
1152 .await;
1153
1154 assert_eq!(
1155 writable_ctx.current_state(),
1156 RegionRoleState::Leader(RegionLeaderState::Writable)
1157 );
1158 }
1159
1160 #[tokio::test]
1161 async fn test_staging_state_transitions() {
1162 let builder = VersionControlBuilder::new();
1163 let version_control = Arc::new(builder.build());
1164 let metadata = version_control.current().version.metadata.clone();
1165
1166 let temp_dir = create_temp_dir("");
1168 let path_str = temp_dir.path().display().to_string();
1169 let fs_builder = Fs::default().root(&path_str);
1170 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1171
1172 let index_aux_path = temp_dir.path().join("index_aux");
1173 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1174 .await
1175 .unwrap();
1176 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1177 .await
1178 .unwrap();
1179
1180 let access_layer = Arc::new(AccessLayer::new(
1181 "",
1182 PathType::Bare,
1183 object_store,
1184 puffin_mgr,
1185 intm_mgr,
1186 ));
1187
1188 let manager = RegionManifestManager::new(
1189 metadata.clone(),
1190 0,
1191 RegionManifestOptions {
1192 manifest_dir: "".to_string(),
1193 object_store: access_layer.object_store().clone(),
1194 compress_type: CompressionType::Uncompressed,
1195 checkpoint_distance: 10,
1196 remove_file_options: Default::default(),
1197 },
1198 Default::default(),
1199 Default::default(),
1200 )
1201 .await
1202 .unwrap();
1203
1204 let manifest_ctx = Arc::new(ManifestContext::new(
1205 manager,
1206 RegionRoleState::Leader(RegionLeaderState::Writable),
1207 ));
1208
1209 let region = MitoRegion {
1210 region_id: metadata.region_id,
1211 version_control,
1212 access_layer,
1213 manifest_ctx,
1214 file_purger: crate::test_util::new_noop_file_purger(),
1215 provider: Provider::noop_provider(),
1216 last_flush_millis: Default::default(),
1217 last_compaction_millis: Default::default(),
1218 time_provider: Arc::new(StdTimeProvider),
1219 topic_latest_entry_id: Default::default(),
1220 written_bytes: Arc::new(AtomicU64::new(0)),
1221 memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1222 stats: ManifestStats::default(),
1223 };
1224
1225 assert_eq!(
1227 region.state(),
1228 RegionRoleState::Leader(RegionLeaderState::Writable)
1229 );
1230 assert!(!region.is_staging());
1231
1232 region.set_staging().unwrap();
1234 assert_eq!(
1235 region.state(),
1236 RegionRoleState::Leader(RegionLeaderState::Staging)
1237 );
1238 assert!(region.is_staging());
1239
1240 region.exit_staging().unwrap();
1242 assert_eq!(
1243 region.state(),
1244 RegionRoleState::Leader(RegionLeaderState::Writable)
1245 );
1246 assert!(!region.is_staging());
1247
1248 assert!(region.set_staging().is_ok()); assert!(region.set_staging().is_err()); assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1254}